This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 29ac7c07269 Fix Cloud Run deferrable trigger handling of transient 503
(#67219)
29ac7c07269 is described below
commit 29ac7c07269cc32ca74c11604f645b441ebeba2e
Author: Akshet Pandey <[email protected]>
AuthorDate: Sat May 30 01:57:12 2026 -0400
Fix Cloud Run deferrable trigger handling of transient 503 (#67219)
The CloudRunJobFinishedTrigger polls the long-running operation via
CloudRunAsyncHook.get_operation in its loop. When that gRPC call fails
with a transient 503 ServiceUnavailable — typical of a regional Cloud
Run API blip while the underlying job is still progressing — the
exception propagates out of the trigger, the triggerer logs the
failure, and the deferred task is failed with TaskDeferralError. The
worker's task-level retry then re-runs the operator from scratch,
which re-submits a brand new Cloud Run execution rather than waiting
on the in-flight one.
Catch ServiceUnavailable inside the polling loop, log a warning, sleep
polling_period_seconds, and continue — mirroring the equivalent fix in
DataflowJobStatusTrigger (#66293). Other exceptions still propagate so
Airflow's task-level retry can take over for genuinely terminal
errors.
Tests cover the new retry behavior (one ServiceUnavailable followed
by a successful poll yields the SUCCESS TriggerEvent) and lock in
that unexpected exceptions are not silently swallowed.
Signed-off-by: Akshet Pandey <[email protected]>
---
.../providers/google/cloud/triggers/cloud_run.py | 49 +++++++++-
.../unit/google/cloud/triggers/test_cloud_run.py | 108 +++++++++++++++++++++
2 files changed, 152 insertions(+), 5 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index d5cebf5ca7f..254b68c85eb 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -21,6 +21,14 @@ from collections.abc import AsyncIterator, Sequence
from enum import Enum
from typing import Any, Literal
+from google.api_core.exceptions import (
+ Aborted,
+ DeadlineExceeded,
+ GatewayTimeout,
+ InternalServerError,
+ ResourceExhausted,
+ ServiceUnavailable,
+)
from google.cloud.run_v2 import Execution
from airflow.providers.common.compat.sdk import AirflowException
@@ -29,6 +37,21 @@ from airflow.triggers.base import BaseTrigger, TriggerEvent
DEFAULT_BATCH_LOCATION = "us-central1"
+# gRPC errors that indicate a transient failure of the Cloud Run API rather
+# than a terminal state of the underlying job. Re-polling is the right move:
+# any of these surfacing from get_operation while the Cloud Run execution is
+# still progressing would otherwise crash the trigger and have Airflow's
+# task-level retry re-submit the whole job. Anything outside this tuple
+# (NotFound, PermissionDenied, auth failures, ...) still propagates.
+_RETRYABLE_GRPC_EXCEPTIONS = (
+ ServiceUnavailable, # 503 / UNAVAILABLE
+ InternalServerError, # 500 / INTERNAL
+ DeadlineExceeded, # 504 / DEADLINE_EXCEEDED
+ GatewayTimeout, # 504 / gateway timeout
+ ResourceExhausted, # 429 / RESOURCE_EXHAUSTED
+ Aborted, # ABORTED — usually retryable concurrency conflict
+)
+
class RunJobStatus(Enum):
"""Enum to represent the status of a job run."""
@@ -112,11 +135,27 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
timeout = self.timeout
self.hook = self._get_async_hook()
while timeout is None or timeout > 0:
- operation = await self.hook.get_operation(
- operation_name=self.operation_name,
- location=self.location,
- use_regional_endpoint=self.use_regional_endpoint,
- )
+ try:
+ operation = await self.hook.get_operation(
+ operation_name=self.operation_name,
+ location=self.location,
+ use_regional_endpoint=self.use_regional_endpoint,
+ )
+ except _RETRYABLE_GRPC_EXCEPTIONS as e:
+ self.log.warning(
+ "Transient error from Cloud Run get_operation (%s).
Retrying... (%s)",
+ type(e).__name__,
+ e,
+ )
+
+ if timeout is not None:
+ timeout -= self.polling_period_seconds
+
+ if timeout is None or timeout > 0:
+ await asyncio.sleep(self.polling_period_seconds)
+
+ continue
+
if operation.done:
# An operation can only have one of those two combinations: if
it is failed, then
# the error field will be populated, else, then the response
field will be.
diff --git
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index 0988444cfb5..18d4f9e0d8e 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -20,6 +20,15 @@ from __future__ import annotations
from unittest import mock
import pytest
+from google.api_core.exceptions import (
+ Aborted,
+ DeadlineExceeded,
+ GatewayTimeout,
+ InternalServerError,
+ PermissionDenied,
+ ResourceExhausted,
+ ServiceUnavailable,
+)
from google.cloud.run_v2 import Execution
from google.protobuf.any_pb2 import Any
from google.rpc.status_pb2 import Status
@@ -202,6 +211,105 @@ class TestCloudBatchJobFinishedTrigger:
== actual
)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "transient_exception",
+ [
+ pytest.param(ServiceUnavailable("Service is currently
unavailable."), id="ServiceUnavailable"),
+ pytest.param(InternalServerError("Internal server error."),
id="InternalServerError"),
+ pytest.param(DeadlineExceeded("Deadline exceeded."),
id="DeadlineExceeded"),
+ pytest.param(GatewayTimeout("Gateway timeout."),
id="GatewayTimeout"),
+ pytest.param(ResourceExhausted("Quota exceeded."),
id="ResourceExhausted"),
+ pytest.param(Aborted("Aborted."), id="Aborted"),
+ ],
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep",
new_callable=mock.AsyncMock
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_continues_polling_after_retryable_grpc_error(
+ self, mock_hook, mock_sleep, transient_exception, trigger:
CloudRunJobFinishedTrigger
+ ):
+ """
+ Transient gRPC errors from ``get_operation`` should be absorbed at the
polling boundary
+ so the triggerer keeps re-polling instead of failing the deferred task
(which would
+ otherwise cascade into Airflow re-submitting the whole Cloud Run job
on task-level
+ retry). Covers every error class in the retryable tuple.
+ """
+ done_operation = mock.MagicMock()
+ done_operation.done = True
+ done_operation.error = Any()
+ done_operation.error.ParseFromString(b"")
+ done_operation.response = _packed_execution_response(task_count=1,
succeeded_count=1, failed_count=0)
+
+ mock_hook.return_value.get_operation = mock.AsyncMock(
+ side_effect=[transient_exception, done_operation]
+ )
+
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+
+ assert (
+ TriggerEvent(
+ {
+ "status": RunJobStatus.SUCCESS.value,
+ "job_name": JOB_NAME,
+ }
+ )
+ == actual
+ )
+ assert mock_hook.return_value.get_operation.await_count == 2
+ mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "fatal_exception",
+ [
+ pytest.param(PermissionDenied("Permission denied."),
id="PermissionDenied"),
+ pytest.param(RuntimeError("boom"), id="RuntimeError"),
+ ],
+ )
+ @mock.patch(
+ "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep",
new_callable=mock.AsyncMock
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_propagates_unexpected_polling_exception(
+ self, mock_hook, mock_sleep, fatal_exception, trigger:
CloudRunJobFinishedTrigger
+ ):
+ """
+ Only the retryable gRPC error set should be absorbed. Anything outside
that tuple
+ (auth failures, permission denied, unexpected runtime errors, ...)
must propagate so
+ Airflow's task-level retry can take over — and the trigger must NOT
have backed off
+ before re-raising (otherwise we'd be swallowing the exception class
boundary).
+ """
+ mock_hook.return_value.get_operation =
mock.AsyncMock(side_effect=fatal_exception)
+
+ generator = trigger.run()
+ with pytest.raises(type(fatal_exception)):
+ await generator.asend(None) # type:ignore[attr-defined]
+
+ mock_sleep.assert_not_awaited()
+
+ @pytest.mark.asyncio
+ @mock.patch(
+ "airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep",
new_callable=mock.AsyncMock
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_yields_timeout_when_retries_exhaust_timeout(
+ self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+ ):
+ """
+ Retries during a perpetual transient outage must charge against the
trigger's
+ timeout budget the same way a normal poll does, so the trigger
eventually yields
+ TIMEOUT instead of extending the deferral indefinitely.
+ """
+ mock_hook.return_value.get_operation =
mock.AsyncMock(side_effect=ServiceUnavailable("Always 503"))
+
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+
+ assert TriggerEvent({"status": RunJobStatus.TIMEOUT.value, "job_name":
JOB_NAME}) == actual
+
@pytest.mark.asyncio
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
async def test_trigger_timeout(self, mock_hook, trigger:
CloudRunJobFinishedTrigger):