This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 29ac7c07269 Fix Cloud Run deferrable trigger handling of transient 503 
(#67219)
29ac7c07269 is described below

commit 29ac7c07269cc32ca74c11604f645b441ebeba2e
Author: Akshet Pandey <[email protected]>
AuthorDate: Sat May 30 01:57:12 2026 -0400

    Fix Cloud Run deferrable trigger handling of transient 503 (#67219)
    
    The CloudRunJobFinishedTrigger polls the long-running operation via
    CloudRunAsyncHook.get_operation in its loop. When that gRPC call fails
    with a transient 503 ServiceUnavailable — typical of a regional Cloud
    Run API blip while the underlying job is still progressing — the
    exception propagates out of the trigger, the triggerer logs the
    failure, and the deferred task is failed with TaskDeferralError. The
    worker's task-level retry then re-runs the operator from scratch,
    which re-submits a brand new Cloud Run execution rather than waiting
    on the in-flight one.
    
    Catch ServiceUnavailable inside the polling loop, log a warning, sleep
    polling_period_seconds, and continue — mirroring the equivalent fix in
    DataflowJobStatusTrigger (#66293). Other exceptions still propagate so
    Airflow's task-level retry can take over for genuinely terminal
    errors.
    
    Tests cover the new retry behavior (one ServiceUnavailable followed
    by a successful poll yields the SUCCESS TriggerEvent) and lock in
    that unexpected exceptions are not silently swallowed.
    
    Signed-off-by: Akshet Pandey <[email protected]>
---
 .../providers/google/cloud/triggers/cloud_run.py   |  49 +++++++++-
 .../unit/google/cloud/triggers/test_cloud_run.py   | 108 +++++++++++++++++++++
 2 files changed, 152 insertions(+), 5 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index d5cebf5ca7f..254b68c85eb 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -21,6 +21,14 @@ from collections.abc import AsyncIterator, Sequence
 from enum import Enum
 from typing import Any, Literal
 
+from google.api_core.exceptions import (
+    Aborted,
+    DeadlineExceeded,
+    GatewayTimeout,
+    InternalServerError,
+    ResourceExhausted,
+    ServiceUnavailable,
+)
 from google.cloud.run_v2 import Execution
 
 from airflow.providers.common.compat.sdk import AirflowException
@@ -29,6 +37,21 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent
 
 DEFAULT_BATCH_LOCATION = "us-central1"
 
+# gRPC errors that indicate a transient failure of the Cloud Run API rather
+# than a terminal state of the underlying job. Re-polling is the right move:
+# any of these surfacing from get_operation while the Cloud Run execution is
+# still progressing would otherwise crash the trigger and have Airflow's
+# task-level retry re-submit the whole job. Anything outside this tuple
+# (NotFound, PermissionDenied, auth failures, ...) still propagates.
+_RETRYABLE_GRPC_EXCEPTIONS = (
+    ServiceUnavailable,  # 503 / UNAVAILABLE
+    InternalServerError,  # 500 / INTERNAL
+    DeadlineExceeded,  # 504 / DEADLINE_EXCEEDED
+    GatewayTimeout,  # 504 / gateway timeout
+    ResourceExhausted,  # 429 / RESOURCE_EXHAUSTED
+    Aborted,  # ABORTED — usually retryable concurrency conflict
+)
+
 
 class RunJobStatus(Enum):
     """Enum to represent the status of a job run."""
@@ -112,11 +135,27 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
         timeout = self.timeout
         self.hook = self._get_async_hook()
         while timeout is None or timeout > 0:
-            operation = await self.hook.get_operation(
-                operation_name=self.operation_name,
-                location=self.location,
-                use_regional_endpoint=self.use_regional_endpoint,
-            )
+            try:
+                operation = await self.hook.get_operation(
+                    operation_name=self.operation_name,
+                    location=self.location,
+                    use_regional_endpoint=self.use_regional_endpoint,
+                )
+            except _RETRYABLE_GRPC_EXCEPTIONS as e:
+                self.log.warning(
+                    "Transient error from Cloud Run get_operation (%s). 
Retrying... (%s)",
+                    type(e).__name__,
+                    e,
+                )
+
+                if timeout is not None:
+                    timeout -= self.polling_period_seconds
+
+                if timeout is None or timeout > 0:
+                    await asyncio.sleep(self.polling_period_seconds)
+
+                continue
+
             if operation.done:
                 # An operation can only have one of those two combinations: if 
it is failed, then
                 # the error field will be populated, else, then the response 
field will be.
diff --git 
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py 
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index 0988444cfb5..18d4f9e0d8e 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -20,6 +20,15 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from google.api_core.exceptions import (
+    Aborted,
+    DeadlineExceeded,
+    GatewayTimeout,
+    InternalServerError,
+    PermissionDenied,
+    ResourceExhausted,
+    ServiceUnavailable,
+)
 from google.cloud.run_v2 import Execution
 from google.protobuf.any_pb2 import Any
 from google.rpc.status_pb2 import Status
@@ -202,6 +211,105 @@ class TestCloudBatchJobFinishedTrigger:
             == actual
         )
 
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "transient_exception",
+        [
+            pytest.param(ServiceUnavailable("Service is currently 
unavailable."), id="ServiceUnavailable"),
+            pytest.param(InternalServerError("Internal server error."), 
id="InternalServerError"),
+            pytest.param(DeadlineExceeded("Deadline exceeded."), 
id="DeadlineExceeded"),
+            pytest.param(GatewayTimeout("Gateway timeout."), 
id="GatewayTimeout"),
+            pytest.param(ResourceExhausted("Quota exceeded."), 
id="ResourceExhausted"),
+            pytest.param(Aborted("Aborted."), id="Aborted"),
+        ],
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock
+    )
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_continues_polling_after_retryable_grpc_error(
+        self, mock_hook, mock_sleep, transient_exception, trigger: 
CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient gRPC errors from ``get_operation`` should be absorbed at the 
polling boundary
+        so the triggerer keeps re-polling instead of failing the deferred task 
(which would
+        otherwise cascade into Airflow re-submitting the whole Cloud Run job 
on task-level
+        retry). Covers every error class in the retryable tuple.
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[transient_exception, done_operation]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+
+        assert (
+            TriggerEvent(
+                {
+                    "status": RunJobStatus.SUCCESS.value,
+                    "job_name": JOB_NAME,
+                }
+            )
+            == actual
+        )
+        assert mock_hook.return_value.get_operation.await_count == 2
+        mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "fatal_exception",
+        [
+            pytest.param(PermissionDenied("Permission denied."), 
id="PermissionDenied"),
+            pytest.param(RuntimeError("boom"), id="RuntimeError"),
+        ],
+    )
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock
+    )
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_propagates_unexpected_polling_exception(
+        self, mock_hook, mock_sleep, fatal_exception, trigger: 
CloudRunJobFinishedTrigger
+    ):
+        """
+        Only the retryable gRPC error set should be absorbed. Anything outside 
that tuple
+        (auth failures, permission denied, unexpected runtime errors, ...) 
must propagate so
+        Airflow's task-level retry can take over — and the trigger must NOT 
have backed off
+        before re-raising (otherwise we'd be swallowing the exception class 
boundary).
+        """
+        mock_hook.return_value.get_operation = 
mock.AsyncMock(side_effect=fatal_exception)
+
+        generator = trigger.run()
+        with pytest.raises(type(fatal_exception)):
+            await generator.asend(None)  # type:ignore[attr-defined]
+
+        mock_sleep.assert_not_awaited()
+
+    @pytest.mark.asyncio
+    @mock.patch(
+        "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock
+    )
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_yields_timeout_when_retries_exhaust_timeout(
+        self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Retries during a perpetual transient outage must charge against the 
trigger's
+        timeout budget the same way a normal poll does, so the trigger 
eventually yields
+        TIMEOUT instead of extending the deferral indefinitely.
+        """
+        mock_hook.return_value.get_operation = 
mock.AsyncMock(side_effect=ServiceUnavailable("Always 503"))
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+
+        assert TriggerEvent({"status": RunJobStatus.TIMEOUT.value, "job_name": 
JOB_NAME}) == actual
+
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
     async def test_trigger_timeout(self, mock_hook, trigger: 
CloudRunJobFinishedTrigger):

Reply via email to