This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c692e85e34c Fix CloudRunExecuteJobOperator deferrable mode silently
passing on cancel (#67050)
c692e85e34c is described below
commit c692e85e34c0f163390fa1506c13cf9513a12b23
Author: deepinsight coder <[email protected]>
AuthorDate: Sun May 17 12:42:58 2026 -0700
Fix CloudRunExecuteJobOperator deferrable mode silently passing on cancel
(#67050)
---
.../providers/google/cloud/triggers/cloud_run.py | 43 +++++++++++++-
.../unit/google/cloud/operators/test_cloud_run.py | 26 +++++++++
.../unit/google/cloud/triggers/test_cloud_run.py | 68 ++++++++++++++++++++++
3 files changed, 135 insertions(+), 2 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index 87f1d5f0d89..d5cebf5ca7f 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -21,6 +21,8 @@ from collections.abc import AsyncIterator, Sequence
from enum import Enum
from typing import Any, Literal
+from google.cloud.run_v2 import Execution
+
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.google.cloud.hooks.cloud_run import CloudRunAsyncHook
from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -127,13 +129,50 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
"job_name": self.job_name,
}
)
- else:
+ return
+
+ # The LRO can complete without populating ``operation.error``
even when the
+ # underlying Cloud Run Execution did not succeed — for example
when the job is
+ # cancelled from the Google Cloud UI or API, every remaining
task ends up in
+ # ``cancelled_count`` rather than ``failed_count``. Mirror the
sync path's
+ # ``_fail_if_execution_failed`` check on the Execution payload
so deferrable mode
+ # surfaces the same failure semantics.
+ execution = Execution.deserialize(operation.response.value)
+ if execution.succeeded_count + execution.failed_count !=
execution.task_count:
+ yield TriggerEvent(
+ {
+ "status": RunJobStatus.FAIL.value,
+ "operation_error_code": None,
+ "operation_error_message": (
+ f"Cloud Run Job did not finish all tasks:
task_count="
+ f"{execution.task_count}, succeeded_count="
+ f"{execution.succeeded_count}, failed_count="
+ f"{execution.failed_count}, cancelled_count="
+ f"{execution.cancelled_count}."
+ ),
+ "job_name": self.job_name,
+ }
+ )
+ return
+ if execution.failed_count > 0:
yield TriggerEvent(
{
- "status": RunJobStatus.SUCCESS.value,
+ "status": RunJobStatus.FAIL.value,
+ "operation_error_code": None,
+ "operation_error_message": (
+ f"Some Cloud Run Job tasks failed:
failed_count="
+ f"{execution.failed_count} of
task_count={execution.task_count}."
+ ),
"job_name": self.job_name,
}
)
+ return
+ yield TriggerEvent(
+ {
+ "status": RunJobStatus.SUCCESS.value,
+ "job_name": self.job_name,
+ }
+ )
return
elif operation.error.message:
raise AirflowException(f"Cloud Run Job error:
{operation.error.message}")
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
index b77d4199d6f..c29af5f9987 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
@@ -252,6 +252,32 @@ class TestCloudRunExecuteJobOperator:
e.value
)
+ @mock.patch(CLOUD_RUN_HOOK_PATH)
+ def
test_execute_deferrable_execute_complete_method_fail_on_cancellation(self,
hook_mock):
+ """
+ Pin the contract that a FAIL event emitted by the trigger when a Cloud
Run Job is
+ cancelled (no ``operation.error`` but ``cancelled_count > 0``)
propagates as an
+ AirflowException — see #57791.
+ """
+ operator = CloudRunExecuteJobOperator(
+ task_id=TASK_ID, project_id=PROJECT_ID, region=REGION,
job_name=JOB_NAME, deferrable=True
+ )
+
+ event = {
+ "status": RunJobStatus.FAIL.value,
+ "operation_error_code": None,
+ "operation_error_message": (
+ "Cloud Run Job did not finish all tasks: task_count=3,
succeeded_count=1, "
+ "failed_count=0, cancelled_count=2."
+ ),
+ "job_name": JOB_NAME,
+ }
+
+ with pytest.raises(AirflowException) as e:
+ operator.execute_complete(mock.MagicMock(), event)
+
+ assert "cancelled_count=2" in str(e.value)
+
@mock.patch(CLOUD_RUN_HOOK_PATH)
def test_execute_deferrable_execute_complete_method_success(self,
hook_mock):
hook_mock.return_value.get_job.return_value = JOB
diff --git
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index a906a7d0332..0988444cfb5 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -20,12 +20,26 @@ from __future__ import annotations
from unittest import mock
import pytest
+from google.cloud.run_v2 import Execution
from google.protobuf.any_pb2 import Any
from google.rpc.status_pb2 import Status
from airflow.providers.google.cloud.triggers.cloud_run import
CloudRunJobFinishedTrigger, RunJobStatus
from airflow.triggers.base import TriggerEvent
+
+def _packed_execution_response(task_count, succeeded_count, failed_count,
cancelled_count=0):
+ """Build a ``google.protobuf.Any`` packed with an ``Execution`` proto for
trigger tests."""
+ execution = Execution()
+ execution.task_count = task_count
+ execution.succeeded_count = succeeded_count
+ execution.failed_count = failed_count
+ execution.cancelled_count = cancelled_count
+ response = Any()
+ response.Pack(Execution.pb(execution))
+ return response
+
+
OPERATION_NAME = "operation"
JOB_NAME = "jobName"
ERROR_CODE = 13
@@ -87,6 +101,7 @@ class TestCloudBatchJobFinishedTrigger:
operation.name = "name"
operation.error = Any()
operation.error.ParseFromString(b"")
+ operation.response = _packed_execution_response(task_count=3,
succeeded_count=3, failed_count=0)
return operation
mock_hook.return_value.get_operation = _mock_operation
@@ -102,6 +117,59 @@ class TestCloudBatchJobFinishedTrigger:
== actual
)
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_yields_fail_when_job_cancelled(
+ self, mock_hook, trigger: CloudRunJobFinishedTrigger
+ ):
+ """
+ When the Cloud Run Job is cancelled via the Google Cloud UI/API the
LRO completes with
+ no ``operation.error`` set but the Execution reports a non-zero
``cancelled_count``. The
+ trigger must surface this as a failure to mirror the sync path's
semantics — see #57791.
+ """
+
+ async def _mock_operation(operation_name, location,
use_regional_endpoint):
+ operation = mock.MagicMock()
+ operation.done = True
+ operation.error = Any()
+ operation.error.ParseFromString(b"")
+ operation.response = _packed_execution_response(
+ task_count=3, succeeded_count=1, failed_count=0,
cancelled_count=2
+ )
+ return operation
+
+ mock_hook.return_value.get_operation = _mock_operation
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+ assert actual.payload["status"] == RunJobStatus.FAIL.value
+ assert actual.payload["job_name"] == JOB_NAME
+ assert "cancelled_count=2" in actual.payload["operation_error_message"]
+ assert "did not finish all tasks" in
actual.payload["operation_error_message"]
+
+ @pytest.mark.asyncio
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_yields_fail_when_some_tasks_failed(
+ self, mock_hook, trigger: CloudRunJobFinishedTrigger
+ ):
+ """
+ Regression-guard symmetry with the sync path: when ``failed_count >
0`` and the counts
+ sum to ``task_count`` the deferrable trigger must still report failure.
+ """
+
+ async def _mock_operation(operation_name, location,
use_regional_endpoint):
+ operation = mock.MagicMock()
+ operation.done = True
+ operation.error = Any()
+ operation.error.ParseFromString(b"")
+ operation.response = _packed_execution_response(task_count=3,
succeeded_count=1, failed_count=2)
+ return operation
+
+ mock_hook.return_value.get_operation = _mock_operation
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+ assert actual.payload["status"] == RunJobStatus.FAIL.value
+ assert "Some Cloud Run Job tasks failed" in
actual.payload["operation_error_message"]
+
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
async def test_trigger_on_operation_failed_yield_error(