This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c006c184df6 Forward Google Cloud Run container logs to Airflow log via
verbose flag (#67140)
c006c184df6 is described below
commit c006c184df6e59722120018f28ca616e1580190b
Author: Aaron Chen <[email protected]>
AuthorDate: Wed Jun 17 13:51:10 2026 +0800
Forward Google Cloud Run container logs to Airflow log via verbose flag
(#67140)
---
.../google/docs/operators/cloud/cloud_run.rst | 32 ++
.../providers/google/cloud/operators/cloud_run.py | 103 +++++-
.../providers/google/cloud/triggers/cloud_run.py | 15 +
.../unit/google/cloud/operators/test_cloud_run.py | 363 ++++++++++++++++++++-
.../unit/google/cloud/triggers/test_cloud_run.py | 26 +-
5 files changed, 523 insertions(+), 16 deletions(-)
diff --git a/providers/google/docs/operators/cloud/cloud_run.rst
b/providers/google/docs/operators/cloud/cloud_run.rst
index 10dfcb5339e..d63a37e9580 100644
--- a/providers/google/docs/operators/cloud/cloud_run.rst
+++ b/providers/google/docs/operators/cloud/cloud_run.rst
@@ -165,6 +165,38 @@ When using deferrable mode, the operator defers to an
async trigger that polls t
REST can be used with deferrable mode, but it may be less efficient than gRPC
and is generally best reserved for cases where gRPC
cannot be used.
+Capturing container logs in the Airflow task log
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+By default the operator only reports its own status messages; the container's
``stdout`` /
+``stderr`` is only available in `Cloud Logging
<https://console.cloud.google.com/logs>`__.
+Pass ``verbose=True`` to have the operator fetch the container log entries
from Cloud
+Logging once the execution finishes and forward each line into the Airflow
task log. This
+works in both eager and deferrable mode.
+
+.. code-block:: python
+
+ CloudRunExecuteJobOperator(
+ task_id="execute_cloud_run_job",
+ project_id=PROJECT_ID,
+ region=region,
+ job_name=job_name,
+ verbose=True,
+ )
+
+When enabling ``verbose``:
+
+* The service account used by ``gcp_conn_id`` (or by ``impersonation_chain``)
must have the
+ ``roles/logging.viewer`` role on the project that runs the job.
+* Each task instance issues at least one Cloud Logging Read API request, with
additional
+ pages fetched automatically when the execution produces enough log entries
to span
+ multiple pages. Failed executions wait briefly before fetching logs to catch
entries
+ that arrive just after the Cloud Run operation reports failure. Plan around
the
+ project-wide quota of 60 read requests per minute documented at
+ https://cloud.google.com/logging/quotas#api-limits.
+* If the log fetch itself fails (for example missing IAM permission or quota
exhausted),
+ the operator emits a warning and the task result is unaffected.
+
You can also specify overrides that allow you to give a new entrypoint command
to the job and more:
:class:`~airflow.providers.google.cloud.operators.cloud_run.CloudRunExecuteJobOperator`
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
index 3692457038e..f1c307f55f4 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/cloud_run.py
@@ -17,11 +17,14 @@
# under the License.
from __future__ import annotations
+import time
from collections.abc import Sequence
+from functools import cached_property
from typing import TYPE_CHECKING, Any, Literal
import google.cloud.exceptions
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, GoogleAPICallError
+from google.cloud import logging as gcp_logging
from google.cloud.run_v2 import Job, Service
from airflow.providers.common.compat.sdk import AirflowException, conf
@@ -36,6 +39,8 @@ if TYPE_CHECKING:
from airflow.providers.common.compat.sdk import Context
+FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS = 1
+
class CloudRunCreateJobOperator(GoogleCloudBaseOperator):
"""
@@ -302,6 +307,16 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
:param transport: Optional. The transport to use for API requests. Can be
'rest' or 'grpc'.
If set to None, a transport is chosen automatically. Use 'rest' if
gRPC is not available
or fails in your environment (e.g., Docker containers with certain
network configurations).
+ :param verbose: If True, container ``stdout``/``stderr`` from the Cloud
Run job execution
+ is fetched from Cloud Logging once the execution finishes and
forwarded into the
+ Airflow task log. Logs are also forwarded when the execution fails so
that the
+ container output remains visible for debugging. Requires the service
account used by
+ ``gcp_conn_id`` to have ``roles/logging.viewer`` on the project. Each
task instance
+ issues at least one Cloud Logging Read API request (more if the
execution produces
+ enough log entries to paginate). Failed executions wait briefly before
fetching logs
+ to catch late-arriving log entries. Requests count against the
project-wide quota of
+ 60 read requests per minute (see
https://cloud.google.com/logging/quotas#api-limits).
+ Default: ``False``.
"""
operator_extra_links = (CloudRunJobLoggingLink(),)
@@ -330,6 +345,7 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
transport: Literal["rest", "grpc"] | None = None,
+ verbose: bool = False,
**kwargs,
):
super().__init__(**kwargs)
@@ -344,15 +360,11 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
self.deferrable = deferrable
self.use_regional_endpoint = use_regional_endpoint
self.transport = transport
+ self.verbose = verbose
self.operation: operation.Operation | None = None
def execute(self, context: Context):
- hook: CloudRunHook = CloudRunHook(
- gcp_conn_id=self.gcp_conn_id,
- impersonation_chain=self.impersonation_chain,
- transport=self.transport,
- )
- self.operation = hook.execute_job(
+ self.operation = self.hook.execute_job(
region=self.region,
project_id=self.project_id,
job_name=self.job_name,
@@ -371,8 +383,12 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
if not self.deferrable:
result: Execution = self._wait_for_operation(self.operation)
+ if self.verbose and result.name:
+ if self._has_execution_failed(result):
+ time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+ self._log_container_output(result.name.rsplit("/", 1)[-1])
self._fail_if_execution_failed(result)
- job = hook.get_job(
+ job = self.hook.get_job(
job_name=result.job,
region=self.region,
project_id=self.project_id,
@@ -401,19 +417,19 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
raise AirflowException("Operation timed out")
if status == RunJobStatus.FAIL.value:
+ if self.verbose and event.get("execution_name"):
+ time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+ self._log_container_output(event["execution_name"])
error_code = event["operation_error_code"]
error_message = event["operation_error_message"]
raise AirflowException(
f"Operation failed with error code [{error_code}] and error
message [{error_message}]"
)
- hook: CloudRunHook = CloudRunHook(
- gcp_conn_id=self.gcp_conn_id,
- impersonation_chain=self.impersonation_chain,
- transport=self.transport,
- )
+ if self.verbose and event.get("execution_name"):
+ self._log_container_output(event["execution_name"])
- job = hook.get_job(
+ job = self.hook.get_job(
job_name=event["job_name"],
region=self.region,
project_id=self.project_id,
@@ -421,6 +437,56 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
)
return Job.to_dict(job)
+ @cached_property
+ def hook(self) -> CloudRunHook:
+ return CloudRunHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ transport=self.transport,
+ )
+
+ def _log_container_output(self, execution_name: str) -> None:
+ """Forward Cloud Run container logs for ``execution_name`` into the
Airflow task log."""
+ log_filter = (
+ 'resource.type="cloud_run_job" '
+ f'resource.labels.job_name="{self.job_name}" '
+ f'resource.labels.location="{self.region}" '
+ f'labels."run.googleapis.com/execution_name"="{execution_name}" '
+ 'NOT logName:"cloudaudit.googleapis.com"'
+ )
+ try:
+ client = gcp_logging.Client(
+ project=self.project_id,
+ credentials=self.hook.get_credentials(),
+ )
+ for entry in client.list_entries(filter_=log_filter,
order_by=gcp_logging.ASCENDING):
+ payload = entry.payload
+ if isinstance(payload, str):
+ self.log.info(payload)
+ else:
+ self.log.info("%s", payload)
+ except GoogleAPICallError as exc:
+ self.log.warning(
+ "Could not fetch container logs from Cloud Logging
(execution=%s): %s. "
+ "Task result is unaffected.",
+ execution_name,
+ exc,
+ )
+
+ def _log_container_output_for_operation(self, operation:
operation.Operation) -> None:
+ """Forward container logs using the execution metadata on a Cloud Run
operation."""
+ if execution_name :=
self._get_execution_name_from_operation(operation):
+ time.sleep(FAILED_EXECUTION_LOG_FETCH_DELAY_SECONDS)
+ self._log_container_output(execution_name)
+
+ @staticmethod
+ def _get_execution_name_from_operation(operation: operation.Operation) ->
str | None:
+ metadata = getattr(operation, "metadata", None)
+ execution_full_name = getattr(metadata, "name", None)
+ if isinstance(execution_full_name, str) and execution_full_name:
+ return execution_full_name.rsplit("/", 1)[-1]
+ return None
+
def _fail_if_execution_failed(self, execution: Execution):
task_count = execution.task_count
succeeded_count = execution.succeeded_count
@@ -432,10 +498,19 @@ class CloudRunExecuteJobOperator(GoogleCloudBaseOperator):
if failed_count > 0:
raise AirflowException("Some tasks failed execution")
+ @staticmethod
+ def _has_execution_failed(execution: Execution) -> bool:
+ return (
+ execution.failed_count > 0
+ or execution.succeeded_count + execution.failed_count !=
execution.task_count
+ )
+
def _wait_for_operation(self, operation: operation.Operation):
try:
return operation.result(timeout=self.timeout_seconds)
except Exception:
+ if self.verbose:
+ self._log_container_output_for_operation(operation)
error = operation.exception(timeout=self.timeout_seconds)
raise AirflowException(error)
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index 254b68c85eb..3cf323641ca 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -159,6 +159,9 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
if operation.done:
# An operation can only have one of those two combinations: if
it is failed, then
# the error field will be populated, else, then the response
field will be.
+ # In both cases the metadata Any may still carry the Execution
proto, which holds
+ # the short execution name needed for fetching container logs
from Cloud Logging.
+ execution_name = self._extract_execution_name(operation)
if operation.error.SerializeToString():
yield TriggerEvent(
{
@@ -166,6 +169,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
"operation_error_code": operation.error.code,
"operation_error_message": operation.error.message,
"job_name": self.job_name,
+ "execution_name": execution_name,
}
)
return
@@ -190,6 +194,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
f"{execution.cancelled_count}."
),
"job_name": self.job_name,
+ "execution_name": execution_name,
}
)
return
@@ -203,6 +208,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
f"{execution.failed_count} of
task_count={execution.task_count}."
),
"job_name": self.job_name,
+ "execution_name": execution_name,
}
)
return
@@ -210,6 +216,7 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
{
"status": RunJobStatus.SUCCESS.value,
"job_name": self.job_name,
+ "execution_name": execution_name,
}
)
return
@@ -235,3 +242,11 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
impersonation_chain=self.impersonation_chain,
transport=self.transport,
)
+
+ @staticmethod
+ def _extract_execution_name(operation: Any) -> str | None:
+ """Return the short execution name from a Cloud Run job LRO operation,
or ``None``."""
+ execution_pb = Execution.pb(Execution())
+ if operation.metadata.Unpack(execution_pb) and execution_pb.name:
+ return execution_pb.name.rsplit("/", 1)[-1]
+ return None
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
index c29af5f9987..00f339e9493 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
@@ -24,7 +24,7 @@ from __future__ import annotations
from unittest import mock
import pytest
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import Aborted, AlreadyExists, PermissionDenied
from google.cloud.exceptions import GoogleCloudError
from google.cloud.run_v2 import Job, Service
@@ -42,10 +42,13 @@ from airflow.providers.google.cloud.triggers.cloud_run
import RunJobStatus
CLOUD_RUN_HOOK_PATH =
"airflow.providers.google.cloud.operators.cloud_run.CloudRunHook"
CLOUD_RUN_SERVICE_HOOK_PATH =
"airflow.providers.google.cloud.operators.cloud_run.CloudRunServiceHook"
+GCP_LOGGING_PATH =
"airflow.providers.google.cloud.operators.cloud_run.gcp_logging"
TASK_ID = "test"
PROJECT_ID = "testproject"
REGION = "us-central1"
JOB_NAME = "jobname"
+EXECUTION_NAME = "jobname-abc12"
+EXECUTION_FULL_NAME =
f"projects/{PROJECT_ID}/locations/{REGION}/jobs/{JOB_NAME}/executions/{EXECUTION_NAME}"
SERVICE_NAME = "servicename"
OVERRIDES = {
"container_overrides": [{"args": ["python", "main.py"]}],
@@ -375,6 +378,364 @@ class TestCloudRunExecuteJobOperator:
with pytest.raises(AirflowException):
operator.execute(context=mock.MagicMock())
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_false_does_not_call_logging_client(self,
hook_mock, gcp_logging_mock):
+ """Default behaviour (``verbose=False``) must not touch Cloud
Logging."""
+ hook_mock.return_value.get_job.return_value = JOB
+ hook_mock.return_value.execute_job.return_value =
self._mock_operation(3, 3, 0)
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID, project_id=PROJECT_ID, region=REGION,
job_name=JOB_NAME
+ )
+ operator.execute(context=mock.MagicMock())
+
+ gcp_logging_mock.Client.assert_not_called()
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_forwards_container_logs(self, hook_mock,
gcp_logging_mock):
+ """``verbose=True`` fetches Cloud Logging entries for the execution
and logs each one."""
+ execution = self._mock_execution(3, 3, 0)
+ execution.name = EXECUTION_FULL_NAME
+ execution.job = JOB_NAME
+ operation = mock.MagicMock()
+ operation.result.return_value = execution
+ hook_mock.return_value.execute_job.return_value = operation
+ hook_mock.return_value.get_job.return_value = JOB
+
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+ mock.MagicMock(payload="Completed Task #0, Attempt #0"),
+ mock.MagicMock(payload="Container called exit(0)."),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+ operator.execute(context=mock.MagicMock())
+
+ hook_mock.assert_called_once()
+ gcp_logging_mock.Client.assert_called_once_with(
+ project=PROJECT_ID,
+ credentials=hook_mock.return_value.get_credentials.return_value,
+ )
+ log_filter =
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+ assert f'resource.labels.job_name="{JOB_NAME}"' in log_filter
+ assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in
log_filter
+ # Audit logs (logName starting with cloudaudit.googleapis.com/...)
live under the same
+ # cloud_run_job resource and otherwise pollute the Airflow task log
with structured
+ # AuditLog dumps. The filter must exclude them at the API level.
+ assert 'NOT logName:"cloudaudit.googleapis.com"' in log_filter
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("Starting Task #0, Attempt #0 ..."),
+ mock.call("Completed Task #0, Attempt #0"),
+ mock.call("Container called exit(0)."),
+ ]
+ )
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_forwards_structured_log_payload(self,
hook_mock, gcp_logging_mock):
+ """Cloud Logging entries can carry a non-string (JSON / proto)
payload. Those must be
+ forwarded via ``log.info("%s", payload)`` so the structured value
still reaches the
+ Airflow task log instead of being passed as a non-string ``msg`` to
the stdlib logger.
+ """
+ execution = self._mock_execution(3, 3, 0)
+ execution.name = EXECUTION_FULL_NAME
+ execution.job = JOB_NAME
+ operation = mock.MagicMock()
+ operation.result.return_value = execution
+ hook_mock.return_value.execute_job.return_value = operation
+ hook_mock.return_value.get_job.return_value = JOB
+
+ structured_payload = {"severity": "ERROR", "message": "boom", "code":
42}
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+ mock.MagicMock(payload=structured_payload),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+ operator.execute(context=mock.MagicMock())
+
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("Starting Task #0, Attempt #0 ..."),
+ mock.call("%s", structured_payload),
+ ]
+ )
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_logging_api_failure_warns_and_succeeds(self,
hook_mock, gcp_logging_mock):
+ """A Cloud Logging API failure during log forwarding must not fail the
task."""
+ execution = self._mock_execution(3, 3, 0)
+ execution.name = EXECUTION_FULL_NAME
+ execution.job = JOB_NAME
+ operation = mock.MagicMock()
+ operation.result.return_value = execution
+ hook_mock.return_value.execute_job.return_value = operation
+ hook_mock.return_value.get_job.return_value = JOB
+
+ gcp_logging_mock.Client.return_value.list_entries.side_effect =
PermissionDenied(
+ "Missing roles/logging.viewer"
+ )
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+ result = operator.execute(context=mock.MagicMock())
+
+ assert result["name"] == JOB.name
+ hook_mock.assert_called_once()
+ operator._cached_logger.warning.assert_called_once()
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_complete_verbose_true_forwards_container_logs(self,
hook_mock, gcp_logging_mock):
+ """Deferrable path: ``execute_complete`` fetches and forwards
container logs too."""
+ hook_mock.return_value.get_job.return_value = JOB
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="Starting Task #0, Attempt #0 ..."),
+ mock.MagicMock(payload="Completed Task #0, Attempt #0"),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ deferrable=True,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+
+ event = {
+ "status": RunJobStatus.SUCCESS.value,
+ "job_name": JOB_NAME,
+ "execution_name": EXECUTION_NAME,
+ }
+ operator.execute_complete(mock.MagicMock(), event)
+
+ hook_mock.assert_called_once()
+ log_filter =
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+ assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in
log_filter
+ assert 'NOT logName:"cloudaudit.googleapis.com"' in log_filter
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("Starting Task #0, Attempt #0 ..."),
+ mock.call("Completed Task #0, Attempt #0"),
+ ]
+ )
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_with_failed_execution_still_forwards_logs(
+ self, hook_mock, gcp_logging_mock
+ ):
+ """
+ When the execution has failed tasks (``failed_count > 0``),
``verbose=True`` MUST
+ still forward the container logs into the Airflow task log BEFORE the
operator
+ raises the failure. Debugging a failed job is the case the user needs
the logs the
+ most — see issue #36963.
+ """
+ execution = self._mock_execution(task_count=1, succeeded_count=0,
failed_count=1)
+ execution.name = EXECUTION_FULL_NAME
+ execution.job = JOB_NAME
+ operation = mock.MagicMock()
+ operation.result.return_value = execution
+ hook_mock.return_value.execute_job.return_value = operation
+
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="2026/05/18 00:00:00 Starting Task #0,
Attempt #0 ..."),
+ mock.MagicMock(payload="Task failed with exit code 1"),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+
+ with (
+
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+ pytest.raises(AirflowException, match="Some tasks failed
execution"),
+ ):
+ operator.execute(context=mock.MagicMock())
+
+ hook_mock.assert_called_once()
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("2026/05/18 00:00:00 Starting Task #0, Attempt #0
..."),
+ mock.call("Task failed with exit code 1"),
+ ]
+ )
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_with_operation_exception_still_forwards_logs(
+ self, hook_mock, gcp_logging_mock
+ ):
+ """
+ Some Cloud Run job failures make ``operation.result()`` raise before
returning
+ an Execution. ``verbose=True`` must still use the operation metadata
to forward
+ container logs before raising the AirflowException.
+ """
+ operation = mock.MagicMock()
+ operation.metadata.name = EXECUTION_FULL_NAME
+ operation.metadata.log_uri = None
+ operation.result.side_effect = Aborted("The container exited with an
error.")
+ operation.exception.return_value = Aborted("The container exited with
an error.")
+ hook_mock.return_value.execute_job.return_value = operation
+
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="Starting failing task"),
+ mock.MagicMock(payload="Task failed badly"),
+ mock.MagicMock(payload="Container called exit(1)."),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+
+ with (
+
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+ pytest.raises(AirflowException, match="The container exited with
an error"),
+ ):
+ operator.execute(context=mock.MagicMock())
+
+ hook_mock.assert_called_once()
+ log_filter =
gcp_logging_mock.Client.return_value.list_entries.call_args.kwargs["filter_"]
+ assert f'"run.googleapis.com/execution_name"="{EXECUTION_NAME}"' in
log_filter
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("Starting failing task"),
+ mock.call("Task failed badly"),
+ mock.call("Container called exit(1)."),
+ ]
+ )
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def test_execute_verbose_true_with_operation_exception_waits_for_late_logs(
+ self, hook_mock, gcp_logging_mock
+ ):
+ """
+ Cloud Logging can expose entries shortly after the Cloud Run operation
has
+ already failed. ``verbose=True`` waits briefly before one fetch so
stdout/stderr
+ that arrive late are less likely to be missed.
+ """
+ operation = mock.MagicMock()
+ operation.metadata.name = EXECUTION_FULL_NAME
+ operation.metadata.log_uri = None
+ operation.result.side_effect = Aborted("The container exited with an
error.")
+ operation.exception.return_value = Aborted("The container exited with
an error.")
+ hook_mock.return_value.execute_job.return_value = operation
+
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="Starting failing task"),
+ mock.MagicMock(payload="Task failed badly"),
+ mock.MagicMock(payload="Container called exit(1)."),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+
+ with (
+
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep") as
sleep_mock,
+ pytest.raises(AirflowException, match="The container exited with
an error"),
+ ):
+ operator.execute(context=mock.MagicMock())
+
+ sleep_mock.assert_called_once_with(1)
+ hook_mock.assert_called_once()
+ gcp_logging_mock.Client.return_value.list_entries.assert_called_once()
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("Starting failing task"),
+ mock.call("Task failed badly"),
+ mock.call("Container called exit(1)."),
+ ]
+ )
+ assert operator._cached_logger.info.call_count == 3
+
+ @mock.patch(GCP_LOGGING_PATH)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def
test_execute_complete_verbose_true_fail_event_still_forwards_logs(self,
hook_mock, gcp_logging_mock):
+ """
+ Deferrable path: when the trigger reports a FAIL event with an
``execution_name``,
+ ``verbose=True`` MUST still forward the container logs before
re-raising as
+ AirflowException. See issue #36963.
+ """
+ gcp_logging_mock.Client.return_value.list_entries.return_value = [
+ mock.MagicMock(payload="2026/05/18 00:00:00 Starting Task #0,
Attempt #0 ..."),
+ mock.MagicMock(payload="Task failed with exit code 1"),
+ ]
+
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ deferrable=True,
+ verbose=True,
+ )
+ operator._cached_logger = operator._log = mock.MagicMock()
+
+ event = {
+ "status": RunJobStatus.FAIL.value,
+ "operation_error_code": 13,
+ "operation_error_message": "operation error",
+ "job_name": JOB_NAME,
+ "execution_name": EXECUTION_NAME,
+ }
+
+ with (
+
mock.patch("airflow.providers.google.cloud.operators.cloud_run.time.sleep"),
+ pytest.raises(AirflowException, match="Operation failed"),
+ ):
+ operator.execute_complete(mock.MagicMock(), event)
+
+ hook_mock.assert_called_once()
+ operator._cached_logger.info.assert_has_calls(
+ [
+ mock.call("2026/05/18 00:00:00 Starting Task #0, Attempt #0
..."),
+ mock.call("Task failed with exit code 1"),
+ ]
+ )
+
def _mock_operation(self, task_count, succeeded_count, failed_count):
operation = mock.MagicMock()
operation.result.return_value = self._mock_execution(task_count,
succeeded_count, failed_count)
diff --git
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index 18d4f9e0d8e..21904306980 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -60,6 +60,10 @@ POLL_SLEEP = 0.01
TIMEOUT = 0.02
IMPERSONATION_CHAIN = "impersonation_chain"
USE_REGIONAL_ENDPOINT = True
+EXECUTION_NAME = "jobname-abc12"
+EXECUTION_FULL_NAME = (
+
f"projects/{PROJECT_ID}/locations/{LOCATION}/jobs/{JOB_NAME}/executions/{EXECUTION_NAME}"
+)
@pytest.fixture
@@ -102,8 +106,15 @@ class TestCloudBatchJobFinishedTrigger:
):
"""
Tests the CloudRunJobFinishedTrigger fires once the job execution
reaches a successful state.
+
+ The success event must carry the short ``execution_name`` parsed from
the operation's
+ Execution metadata so that ``execute_complete`` can fetch the
container logs from
+ Cloud Logging (see issue #36963).
"""
+ metadata_any = Any()
+ metadata_any.Pack(Execution.pb(Execution(name=EXECUTION_FULL_NAME)))
+
async def _mock_operation(operation_name, location,
use_regional_endpoint):
operation = mock.MagicMock()
operation.done = True
@@ -111,6 +122,7 @@ class TestCloudBatchJobFinishedTrigger:
operation.error = Any()
operation.error.ParseFromString(b"")
operation.response = _packed_execution_response(task_count=3,
succeeded_count=3, failed_count=0)
+ operation.metadata = metadata_any
return operation
mock_hook.return_value.get_operation = _mock_operation
@@ -121,6 +133,7 @@ class TestCloudBatchJobFinishedTrigger:
{
"status": RunJobStatus.SUCCESS.value,
"job_name": JOB_NAME,
+ "execution_name": EXECUTION_NAME,
}
)
== actual
@@ -185,14 +198,23 @@ class TestCloudBatchJobFinishedTrigger:
self, mock_hook, trigger: CloudRunJobFinishedTrigger
):
"""
- Tests the CloudRunJobFinishedTrigger raises an exception once the job
execution fails.
+ Tests the CloudRunJobFinishedTrigger fires a FAIL event when the
operation fails.
+
+ The FAIL event must also carry ``execution_name`` (parsed from
+ ``operation.metadata`` whenever it is populated) so that
+ ``execute_complete`` can still pull container logs from Cloud Logging
+ before raising — see issue #36963.
"""
+ metadata_any = Any()
+ metadata_any.Pack(Execution.pb(Execution(name=EXECUTION_FULL_NAME)))
+
async def _mock_operation(operation_name, location,
use_regional_endpoint):
operation = mock.MagicMock()
operation.done = True
operation.name = "name"
operation.error = Status(code=13, message="Some message")
+ operation.metadata = metadata_any
return operation
mock_hook.return_value.get_operation = _mock_operation
@@ -206,6 +228,7 @@ class TestCloudBatchJobFinishedTrigger:
"operation_error_code": ERROR_CODE,
"operation_error_message": ERROR_MESSAGE,
"job_name": JOB_NAME,
+ "execution_name": EXECUTION_NAME,
}
)
== actual
@@ -254,6 +277,7 @@ class TestCloudBatchJobFinishedTrigger:
{
"status": RunJobStatus.SUCCESS.value,
"job_name": JOB_NAME,
+ "execution_name": None,
}
)
== actual