This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c006c184df6 Forward Google Cloud Run container logs to Airflow log via 
verbose flag (#67140)
c006c184df6 is described below

commit c006c184df6e59722120018f28ca616e1580190b
Author: Aaron Chen <[email protected]>
AuthorDate: Wed Jun 17 13:51:10 2026 +0800

    Forward Google Cloud Run container logs to Airflow log via verbose flag 
(#67140)
---
 .../google/docs/operators/cloud/cloud_run.rst      |  32 ++
 .../providers/google/cloud/operators/cloud_run.py  | 103 +++++-
 .../providers/google/cloud/triggers/cloud_run.py   |  15 +
 .../unit/google/cloud/operators/test_cloud_run.py  | 363 ++++++++++++++++++++-
 .../unit/google/cloud/triggers/test_cloud_run.py   |  26 +-
 5 files changed, 523 insertions(+), 16 deletions(-)

diff --git a/providers/google/docs/operators/cloud/cloud_run.rst 
b/providers/google/docs/operators/cloud/cloud_run.rst
index 10dfcb5339e..d63a37e9580 100644
--- a/providers/google/docs/operators/cloud/cloud_run.rst
+++ b/providers/google/docs/operators/cloud/cloud_run.rst
@@ -165,6 +165,38 @@ When using deferrable mode, the operator defers to an 
async trigger that polls t
 REST can be used with deferrable mode, but it may be less efficient than gRPC 
and is generally best reserved for cases where gRPC
 cannot be used.
 
+Capturing container logs in the Airflow task log
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+By default the operator only reports its own status messages; the container's 
``stdout`` /
+``stderr`` is only available in `Cloud Logging 
<https://console.cloud.google.com/logs>`__.
+Pass ``verbose=True`` to have the operator fetch the container log entries 
from Cloud
+Logging once the execution finishes and forward each line into the Airflow 
task log. This
+works in both eager and deferrable mode.
+
+.. code-block:: python
+
+    CloudRunExecuteJobOperator(
+        task_id="execute_cloud_run_job",
+        project_id=PROJECT_ID,
+        region=region,
+        job_name=job_name,
+        verbose=True,
+    )
+
+When enabling ``verbose``:
+
+* The service account used by ``gcp_conn_id`` (or by ``impersonation_chain``) 
must have the
+  ``roles/logging.viewer`` role on the project that runs the job.
+* Each task instance issues at least one Cloud Logging Read API request, with 
additional
+  pages fetched automatically when the execution produces enough log entries 
to span
+  multiple pages. Failed executions wait briefly before fetching logs to catch 
entries
+  that arrive just after the Cloud Run operation reports failure. Plan around 
the
+  project-wide quota of 60 read requests per minute documented at
+  https://cloud.google.com/logging/quotas#api-limits.
+* If the log fetch itself fails (for example missing IAM permission or quota 
exhausted),
+  the operator emits a warning and the task result is unaffected.
+
 You can also specify overrides that allow you to give a new entrypoint command 
to the job and more:
 
 
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunExecuteJobOperator`
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
index 3692457038e..f1c307f55f4 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
@@ -17,11 +17,14 @@
 # under the License.
 from __future__ import annotations
 
+import time
 from collections.abc import Sequence
+from functools import cached_property
 from typing import TYPE_CHECKING, Any, Literal
 
 import google.cloud.exceptions
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
+from google.cloud import logging as gcp_logging
 from google.cloud.run_v2 import Job, Service
 
 from airflow.providers.common.compat.sdk import AirflowException, conf
@@ -36,6 +39,8 @@ if TYPE_CHECKING:
 
     from airflow.providers.common.compat.sdk import Context
 
+FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS = 1
+
 
 class CloudRunCreateJobOperator(GoogleCloudBaseOperator):
     """
@@ -302,6 +307,16 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
     :param transport: Optional. The transport to use for API requests. Can be 
'rest' or 'grpc'.
         If set to None, a transport is chosen automatically. Use 'rest' if 
gRPC is not available
         or fails in your environment (e.g., Docker containers with certain 
network configurations).
+    :param verbose: If True, container ``stdout``/``stderr`` from the Cloud 
Run job execution
+        is fetched from Cloud Logging once the execution finishes and 
forwarded into the
+        Airflow task log. Logs are also forwarded when the execution fails so 
that the
+        container output remains visible for debugging. Requires the service 
account used by
+        ``gcp_conn_id`` to have ``roles/logging.viewer`` on the project. Each 
task instance
+        issues at least one Cloud Logging Read API request (more if the 
execution produces
+        enough log entries to paginate). Failed executions wait briefly before 
fetching logs
+        to catch late-arriving log entries. Requests count against the 
project-wide quota of
+        60 read requests per minute (see 
https://cloud.google.com/logging/quotas#api-limits).
+        Default: ``False``.
     """
 
     operator_extra_links = (CloudRunJobLoggingLink(),)
@@ -330,6 +345,7 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         transport: Literal["rest", "grpc"] | None = None,
+        verbose: bool = False,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -344,15 +360,11 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
         self.deferrable = deferrable
         self.use_regional_endpoint = use_regional_endpoint
         self.transport = transport
+        self.verbose = verbose
         self.operation: operation.Operation | None = None
 
     def execute(self, context: Context):
-        hook: CloudRunHook = CloudRunHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
-            transport=self.transport,
-        )
-        self.operation = hook.execute_job(
+        self.operation = self.hook.execute_job(
             region=self.region,
             project_id=self.project_id,
             job_name=self.job_name,
@@ -371,8 +383,12 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
 
         if not self.deferrable:
             result: Execution = self._wait_for_operation(self.operation)
+            if self.verbose and result.name:
+                if self._has_execution_failed(result):
+                    time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+                self._log_container_output(result.name.rsplit("/", 1)[-1])
             self._fail_if_execution_failed(result)
-            job = hook.get_job(
+            job = self.hook.get_job(
                 job_name=result.job,
                 region=self.region,
                 project_id=self.project_id,
@@ -401,19 +417,19 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
             raise AirflowException("Operation timed out")
 
         if status == RunJobStatus.FAIL.value:
+            if self.verbose and event.get("execution_name"):
+                time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+                self._log_container_output(event["execution_name"])
             error_code = event["operation_error_code"]
             error_message = event["operation_error_message"]
             raise AirflowException(
                 f"Operation failed with error code [{error_code}] and error 
message [{error_message}]"
             )
 
-        hook: CloudRunHook = CloudRunHook(
-            gcp_conn_id=self.gcp_conn_id,
-            impersonation_chain=self.impersonation_chain,
-            transport=self.transport,
-        )
+        if self.verbose and event.get("execution_name"):
+            self._log_container_output(event["execution_name"])
 
-        job = hook.get_job(
+        job = self.hook.get_job(
             job_name=event["job_name"],
             region=self.region,
             project_id=self.project_id,
@@ -421,6 +437,56 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
         )
         return Job.to_dict(job)
 
+    @cached_property
+    def hook(self) -> CloudRunHook:
+        return CloudRunHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+            transport=self.transport,
+        )
+
+    def _log_container_output(self, execution_name: str) -> None:
+        """Forward Cloud Run container logs for ``execution_name`` into the 
Airflow task log."""
+        log_filter = (
+            'resource.type="cloud_run_job" '
+            f'resource.labels.job_name="{self.job_name}" '
+            f'resource.labels.location="{self.region}" '
+            f'labels."run.googleapis.com/execution_name"="{execution_name}" '
+            'NOT logName:"cloudaudit.googleapis.com"'
+        )
+        try:
+            client = gcp_logging.Client(
+                project=self.project_id,
+                credentials=self.hook.get_credentials(),
+            )
+            for entry in client.list_entries(filter_=log_filter, 
order_by=gcp_logging.ASCENDING):
+                payload = entry.payload
+                if isinstance(payload, str):
+                    self.log.info(payload)
+                else:
+                    self.log.info("%s", payload)
+        except GoogleAPICallError as exc:
+            self.log.warning(
+                "Could not fetch container logs from Cloud Logging 
(execution=%s): %s. "
+                "Task result is unaffected.",
+                execution_name,
+                exc,
+            )
+
+    def _log_container_output_for_operation(self, operation: 
operation.Operation) -> None:
+        """Forward container logs using the execution metadata on a Cloud Run 
operation."""
+        if execution_name := 
self._get_execution_name_from_operation(operation):
+            time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+            self._log_container_output(execution_name)
+
+    @staticmethod
+    def _get_execution_name_from_operation(operation: operation.Operation) -> 
str | None:
+        metadata = getattr(operation, "metadata", None)
+        execution_full_name = getattr(metadata, "name", None)
+        if isinstance(execution_full_name, str) and execution_full_name:
+            return execution_full_name.rsplit("/", 1)[-1]
+        return None
+
     def _fail_if_execution_failed(self, execution: Execution):
         task_count = execution.task_count
         succeeded_count = execution.succeeded_count
@@ -432,10 +498,19 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
         if failed_count > 0:
             raise AirflowException("Some tasks failed execution")
 
+    @staticmethod
+    def _has_execution_failed(execution: Execution) -> bool:
+        return (
+            execution.failed_count > 0
+            or execution.succeeded_count + execution.failed_count != 
execution.task_count
+        )
+
     def _wait_for_operation(self, operation: operation.Operation):
         try:
             return operation.result(timeout=self.timeout_seconds)
         except Exception:
+            if self.verbose:
+                self._log_container_output_for_operation(operation)
             error = operation.exception(timeout=self.timeout_seconds)
             raise AirflowException(error)
 
diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index 254b68c85eb..3cf323641ca 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -159,6 +159,9 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
             if operation.done:
                 # An operation can only have one of those two combinations: if 
it is failed, then
                 # the error field will be populated, else, then the response 
field will be.
+                # In both cases the metadata Any may still carry the Execution 
proto, which holds
+                # the short execution name needed for fetching container logs 
from Cloud Logging.
+                execution_name = self._extract_execution_name(operation)
                 if operation.error.SerializeToString():
                     yield TriggerEvent(
                         {
@@ -166,6 +169,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
                             "operation_error_code": operation.error.code,
                             "operation_error_message": operation.error.message,
                             "job_name": self.job_name,
+                            "execution_name": execution_name,
                         }
                     )
                     return
@@ -190,6 +194,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
                                 f"{execution.cancelled_count}."
                             ),
                             "job_name": self.job_name,
+                            "execution_name": execution_name,
                         }
                     )
                     return
@@ -203,6 +208,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
                                 f"{execution.failed_count} of 
task_count={execution.task_count}."
                             ),
                             "job_name": self.job_name,
+                            "execution_name": execution_name,
                         }
                     )
                     return
@@ -210,6 +216,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
                     {
                         "status": RunJobStatus.SUCCESS.value,
                         "job_name": self.job_name,
+                        "execution_name": execution_name,
                     }
                 )
                 return
@@ -235,3 +242,11 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
             impersonation_chain=self.impersonation_chain,
             transport=self.transport,
         )
+
+    @staticmethod
+    def _extract_execution_name(operation: Any) -> str | None:
+        """Return the short execution name from a Cloud Run job LRO operation, 
or ``None``."""
+        execution_pb = Execution.pb(Execution())
+        if operation.metadata.Unpack(execution_pb) and execution_pb.name:
+            return execution_pb.name.rsplit("/", 1)[-1]
+        return None
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py 
b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
index c29af5f9987..00f339e9493 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
@@ -24,7 +24,7 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import Aborted, AlreadyExists, PermissionDenied
 from google.cloud.exceptions import GoogleCloudError
 from google.cloud.run_v2 import Job, Service
 
@@ -42,10 +42,13 @@ from airflow.providers.google.cloud.triggers.cloud_run 
import RunJobStatus
 
 CLOUD_RUN_HOOK_PATH = 
"airflow.providers.google.cloud.operators.cloud_run.CloudRunHook"
 CLOUD_RUN_SERVICE_HOOK_PATH = 
"airflow.providers.google.cloud.operators.cloud_run.CloudRunServiceHook"
+GCP_LOGGING_PATH = 
"airflow.providers.google.cloud.operators.cloud_run.gcp_logging"
 TASK_ID = "test"
 PROJECT_ID = "testproject"
 REGION = "us-central1"
 JOB_NAME = "jobname"
+EXECUTION_NAME = "jobname-abc12"
+EXECUTION_FULL_NAME = 
f"projects/{PROJECT_ID}/locations/{REGION}/jobs/{JOB_NAME}/executions/{EXECUTION_NAME}"
 SERVICE_NAME = "servicename"
 OVERRIDES = {
     "container_overrides": [{"args": ["python", "main.py"]}],
@@ -375,6 +378,364 @@ class TestCloudRunExecuteJobOperator:
         with pytest.raises(AirflowException):
             operator.execute(context=mock.MagicMock())
 
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_false_does_not_call_logging_client(self, 
hook_mock, gcp_logging_mock):
+        """Default behaviour (``verbose=False``) must not touch Cloud 
Logging."""
+        hook_mock.return_value.get_job.return_value = JOB
+        hook_mock.return_value.execute_job.return_value = 
self._mock_operation(3, 3, 0)
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID, project_id=PROJECT_ID, region=REGION, 
job_name=JOB_NAME
+        )
+        operator.execute(context=mock.MagicMock())
+
+        gcp_logging_mock.Client.assert_not_called()
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_forwards_container_logs(self, hook_mock, 
gcp_logging_mock):
+        """``verbose=True`` fetches Cloud Logging entries for the execution 
and logs each one."""
+        execution = self._mock_execution(3, 3, 0)
+        execution.name = EXECUTION_FULL_NAME
+        execution.job = JOB_NAME
+        operation = mock.MagicMock()
+        operation.result.return_value = execution
+        hook_mock.return_value.execute_job.return_value = operation
+        hook_mock.return_value.get_job.return_value = JOB
+
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+            mock.MagicMock(payload="Completed Task #0, Attempt #0"),
+            mock.MagicMock(payload="Container called exit(0)."),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+        operator.execute(context=mock.MagicMock())
+
+        hook_mock.assert_called_once()
+        gcp_logging_mock.Client.assert_called_once_with(
+            project=PROJECT_ID,
+            credentials=hook_mock.return_value.get_credentials.return_value,
+        )
+        log_filter = 
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+        assert f'resource.labels.job_name="{JOB_NAME}"' in log_filter
+        assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in 
log_filter
+        # Audit logs (logName starting with cloudaudit.googleapis.com/...) 
live under the same
+        # cloud_run_job resource and otherwise pollute the Airflow task log 
with structured
+        # AuditLog dumps. The filter must exclude them at the API level.
+        assert 'NOT logName:"cloudaudit.googleapis.com"' in log_filter
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("Starting Task #0, Attempt #0 ..."),
+                mock.call("Completed Task #0, Attempt #0"),
+                mock.call("Container called exit(0)."),
+            ]
+        )
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_forwards_structured_log_payload(self, 
hook_mock, gcp_logging_mock):
+        """Cloud Logging entries can carry a non-string (JSON / proto) 
payload. Those must be
+        forwarded via ``log.info("%s", payload)`` so the structured value 
still reaches the
+        Airflow task log instead of being passed as a non-string ``msg`` to 
the stdlib logger.
+        """
+        execution = self._mock_execution(3, 3, 0)
+        execution.name = EXECUTION_FULL_NAME
+        execution.job = JOB_NAME
+        operation = mock.MagicMock()
+        operation.result.return_value = execution
+        hook_mock.return_value.execute_job.return_value = operation
+        hook_mock.return_value.get_job.return_value = JOB
+
+        structured_payload = {"severity": "ERROR", "message": "boom", "code": 
42}
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+            mock.MagicMock(payload=structured_payload),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+        operator.execute(context=mock.MagicMock())
+
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("Starting Task #0, Attempt #0 ..."),
+                mock.call("%s", structured_payload),
+            ]
+        )
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_logging_api_failure_warns_and_succeeds(self, 
hook_mock, gcp_logging_mock):
+        """A Cloud Logging API failure during log forwarding must not fail the 
task."""
+        execution = self._mock_execution(3, 3, 0)
+        execution.name = EXECUTION_FULL_NAME
+        execution.job = JOB_NAME
+        operation = mock.MagicMock()
+        operation.result.return_value = execution
+        hook_mock.return_value.execute_job.return_value = operation
+        hook_mock.return_value.get_job.return_value = JOB
+
+        gcp_logging_mock.Client.return_value.list_entries.side_effect = 
PermissionDenied(
+            "Missing roles/logging.viewer"
+        )
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+        result = operator.execute(context=mock.MagicMock())
+
+        assert result["name"] == JOB.name
+        hook_mock.assert_called_once()
+        operator._cached_logger.warning.assert_called_once()
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_complete_verbose_true_forwards_container_logs(self, 
hook_mock, gcp_logging_mock):
+        """Deferrable path: ``execute_complete`` fetches and forwards 
container logs too."""
+        hook_mock.return_value.get_job.return_value = JOB
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+            mock.MagicMock(payload="Completed Task #0, Attempt #0"),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            deferrable=True,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+
+        event = {
+            "status": RunJobStatus.SUCCESS.value,
+            "job_name": JOB_NAME,
+            "execution_name": EXECUTION_NAME,
+        }
+        operator.execute_complete(mock.MagicMock(), event)
+
+        hook_mock.assert_called_once()
+        log_filter = 
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+        assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in 
log_filter
+        assert 'NOT logName:"cloudaudit.googleapis.com"' in log_filter
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("Starting Task #0, Attempt #0 ..."),
+                mock.call("Completed Task #0, Attempt #0"),
+            ]
+        )
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_with_failed_execution_still_forwards_logs(
+        self, hook_mock, gcp_logging_mock
+    ):
+        """
+        When the execution has failed tasks (``failed_count > 0``), 
``verbose=True`` MUST
+        still forward the container logs into the Airflow task log BEFORE the 
operator
+        raises the failure. Debugging a failed job is the case the user needs 
the logs the
+        most — see issue #36963.
+        """
+        execution = self._mock_execution(task_count=1, succeeded_count=0, 
failed_count=1)
+        execution.name = EXECUTION_FULL_NAME
+        execution.job = JOB_NAME
+        operation = mock.MagicMock()
+        operation.result.return_value = execution
+        hook_mock.return_value.execute_job.return_value = operation
+
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="2026/05/18 00:00:00 Starting Task #0, 
Attempt #0 ..."),
+            mock.MagicMock(payload="Task failed with exit code 1"),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+
+        with (
+            
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+            pytest.raises(AirflowException, match="Some tasks failed 
execution"),
+        ):
+            operator.execute(context=mock.MagicMock())
+
+        hook_mock.assert_called_once()
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("2026/05/18 00:00:00 Starting Task #0, Attempt #0 
..."),
+                mock.call("Task failed with exit code 1"),
+            ]
+        )
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_with_operation_exception_still_forwards_logs(
+        self, hook_mock, gcp_logging_mock
+    ):
+        """
+        Some Cloud Run job failures make ``operation.result()`` raise before 
returning
+        an Execution. ``verbose=True`` must still use the operation metadata 
to forward
+        container logs before raising the AirflowException.
+        """
+        operation = mock.MagicMock()
+        operation.metadata.name = EXECUTION_FULL_NAME
+        operation.metadata.log_uri = None
+        operation.result.side_effect = Aborted("The container exited with an 
error.")
+        operation.exception.return_value = Aborted("The container exited with 
an error.")
+        hook_mock.return_value.execute_job.return_value = operation
+
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="Starting failing task"),
+            mock.MagicMock(payload="Task failed badly"),
+            mock.MagicMock(payload="Container called exit(1)."),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+
+        with (
+            
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+            pytest.raises(AirflowException, match="The container exited with 
an error"),
+        ):
+            operator.execute(context=mock.MagicMock())
+
+        hook_mock.assert_called_once()
+        log_filter = 
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+        assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in 
log_filter
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("Starting failing task"),
+                mock.call("Task failed badly"),
+                mock.call("Container called exit(1)."),
+            ]
+        )
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def test_execute_verbose_true_with_operation_exception_waits_for_late_logs(
+        self, hook_mock, gcp_logging_mock
+    ):
+        """
+        Cloud Logging can expose entries shortly after the Cloud Run operation 
has
+        already failed. ``verbose=True`` waits briefly before one fetch so 
stdout/stderr
+        that arrive late are less likely to be missed.
+        """
+        operation = mock.MagicMock()
+        operation.metadata.name = EXECUTION_FULL_NAME
+        operation.metadata.log_uri = None
+        operation.result.side_effect = Aborted("The container exited with an 
error.")
+        operation.exception.return_value = Aborted("The container exited with 
an error.")
+        hook_mock.return_value.execute_job.return_value = operation
+
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="Starting failing task"),
+            mock.MagicMock(payload="Task failed badly"),
+            mock.MagicMock(payload="Container called exit(1)."),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+
+        with (
+            
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep") as 
sleep_mock,
+            pytest.raises(AirflowException, match="The container exited with 
an error"),
+        ):
+            operator.execute(context=mock.MagicMock())
+
+        sleep_mock.assert_called_once_with(1)
+        hook_mock.assert_called_once()
+        gcp_logging_mock.Client.return_value.list_entries.assert_called_once()
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("Starting failing task"),
+                mock.call("Task failed badly"),
+                mock.call("Container called exit(1)."),
+            ]
+        )
+        assert operator._cached_logger.info.call_count == 3
+
+    @mock.patch(GCP_LOGGING_PATH)
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def 
test_execute_complete_verbose_true_fail_event_still_forwards_logs(self, 
hook_mock, gcp_logging_mock):
+        """
+        Deferrable path: when the trigger reports a FAIL event with an 
``execution_name``,
+        ``verbose=True`` MUST still forward the container logs before 
re-raising as
+        AirflowException. See issue #36963.
+        """
+        gcp_logging_mock.Client.return_value.list_entries.return_value = [
+            mock.MagicMock(payload="2026/05/18 00:00:00 Starting Task #0, 
Attempt #0 ..."),
+            mock.MagicMock(payload="Task failed with exit code 1"),
+        ]
+
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            deferrable=True,
+            verbose=True,
+        )
+        operator._cached_logger = operator._log = mock.MagicMock()
+
+        event = {
+            "status": RunJobStatus.FAIL.value,
+            "operation_error_code": 13,
+            "operation_error_message": "operation error",
+            "job_name": JOB_NAME,
+            "execution_name": EXECUTION_NAME,
+        }
+
+        with (
+            
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+            pytest.raises(AirflowException, match="Operation failed"),
+        ):
+            operator.execute_complete(mock.MagicMock(), event)
+
+        hook_mock.assert_called_once()
+        operator._cached_logger.info.assert_has_calls(
+            [
+                mock.call("2026/05/18 00:00:00 Starting Task #0, Attempt #0 
..."),
+                mock.call("Task failed with exit code 1"),
+            ]
+        )
+
     def _mock_operation(self, task_count, succeeded_count, failed_count):
         operation = mock.MagicMock()
         operation.result.return_value = self._mock_execution(task_count, 
succeeded_count, failed_count)
diff --git 
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py 
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index 18d4f9e0d8e..21904306980 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -60,6 +60,10 @@ POLL_SLEEP = 0.01
 TIMEOUT = 0.02
 IMPERSONATION_CHAIN = "impersonation_chain"
 USE_REGIONAL_ENDPOINT = True
+EXECUTION_NAME = "jobname-abc12"
+EXECUTION_FULL_NAME = (
+    
f"projects/{PROJECT_ID}/locations/{LOCATION}/jobs/{JOB_NAME}/executions/{EXECUTION_NAME}"
+)
 
 
 @pytest.fixture
@@ -102,8 +106,15 @@ class TestCloudBatchJobFinishedTrigger:
     ):
         """
         Tests the CloudRunJobFinishedTrigger fires once the job execution 
reaches a successful state.
+
+        The success event must carry the short ``execution_name`` parsed from 
the operation's
+        Execution metadata so that ``execute_complete`` can fetch the 
container logs from
+        Cloud Logging (see issue #36963).
         """
 
+        metadata_any = Any()
+        metadata_any.Pack(Execution.pb(Execution(name=EXECUTION_FULL_NAME)))
+
         async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             operation = mock.MagicMock()
             operation.done = True
@@ -111,6 +122,7 @@ class TestCloudBatchJobFinishedTrigger:
             operation.error = Any()
             operation.error.ParseFromString(b"")
             operation.response = _packed_execution_response(task_count=3, 
succeeded_count=3, failed_count=0)
+            operation.metadata = metadata_any
             return operation
 
         mock_hook.return_value.get_operation = _mock_operation
@@ -121,6 +133,7 @@ class TestCloudBatchJobFinishedTrigger:
                 {
                     "status": RunJobStatus.SUCCESS.value,
                     "job_name": JOB_NAME,
+                    "execution_name": EXECUTION_NAME,
                 }
             )
             == actual
@@ -185,14 +198,23 @@ class TestCloudBatchJobFinishedTrigger:
         self, mock_hook, trigger: CloudRunJobFinishedTrigger
     ):
         """
-        Tests the CloudRunJobFinishedTrigger raises an exception once the job 
execution fails.
+        Tests the CloudRunJobFinishedTrigger fires a FAIL event when the 
operation fails.
+
+        The FAIL event must also carry ``execution_name`` (parsed from
+        ``operation.metadata`` whenever it is populated) so that
+        ``execute_complete`` can still pull container logs from Cloud Logging
+        before raising — see issue #36963.
         """
 
+        metadata_any = Any()
+        metadata_any.Pack(Execution.pb(Execution(name=EXECUTION_FULL_NAME)))
+
         async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             operation = mock.MagicMock()
             operation.done = True
             operation.name = "name"
             operation.error = Status(code=13, message="Some message")
+            operation.metadata = metadata_any
             return operation
 
         mock_hook.return_value.get_operation = _mock_operation
@@ -206,6 +228,7 @@ class TestCloudBatchJobFinishedTrigger:
                     "operation_error_code": ERROR_CODE,
                     "operation_error_message": ERROR_MESSAGE,
                     "job_name": JOB_NAME,
+                    "execution_name": EXECUTION_NAME,
                 }
             )
             == actual
@@ -254,6 +277,7 @@ class TestCloudBatchJobFinishedTrigger:
                 {
                     "status": RunJobStatus.SUCCESS.value,
                     "job_name": JOB_NAME,
+                    "execution_name": None,
                 }
             )
             == actual


Reply via email to