akshetpandey commented on code in PR #67219:
URL: https://github.com/apache/airflow/pull/67219#discussion_r3290974578
##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +211,77 @@ async def _mock_operation(operation_name, location,
use_regional_endpoint):
== actual
)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "transient_exception",
+ [
+ pytest.param(ServiceUnavailable("Service is currently
unavailable."), id="ServiceUnavailable"),
+ pytest.param(InternalServerError("Internal server error."),
id="InternalServerError"),
+ pytest.param(DeadlineExceeded("Deadline exceeded."),
id="DeadlineExceeded"),
+ pytest.param(GatewayTimeout("Gateway timeout."),
id="GatewayTimeout"),
+ pytest.param(ResourceExhausted("Quota exceeded."),
id="ResourceExhausted"),
+ pytest.param(Aborted("Aborted."), id="Aborted"),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep",
new_callable=mock.AsyncMock)
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_continues_polling_after_retryable_grpc_error(
+ self, mock_hook, mock_sleep, transient_exception, trigger:
CloudRunJobFinishedTrigger
+ ):
+ """
+ Transient gRPC errors from ``get_operation`` should be absorbed at the
polling boundary
+ so the triggerer keeps re-polling instead of failing the deferred task
(which would
+ otherwise cascade into Airflow re-submitting the whole Cloud Run job
on task-level
+ retry). Covers every error class in the retryable tuple.
+ """
+ done_operation = mock.MagicMock()
+ done_operation.done = True
+ done_operation.error = Any()
+ done_operation.error.ParseFromString(b"")
+ done_operation.response = _packed_execution_response(task_count=1,
succeeded_count=1, failed_count=0)
+
+ mock_hook.return_value.get_operation = mock.AsyncMock(
+ side_effect=[transient_exception, done_operation]
+ )
+
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+
+ assert (
+ TriggerEvent(
+ {
+ "status": RunJobStatus.SUCCESS.value,
+ "job_name": JOB_NAME,
+ }
+ )
+ == actual
+ )
+ assert mock_hook.return_value.get_operation.await_count == 2
+ mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "fatal_exception",
+ [
+ pytest.param(PermissionDenied("Permission denied."),
id="PermissionDenied"),
+ pytest.param(RuntimeError("boom"), id="RuntimeError"),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_propagates_unexpected_polling_exception(
+ self, mock_hook, fatal_exception, trigger: CloudRunJobFinishedTrigger
+ ):
+ """
+ Only the retryable gRPC error set should be absorbed. Anything outside
that tuple
+ (auth failures, permission denied, unexpected runtime errors, ...)
must propagate so
+ Airflow's task-level retry can take over.
+ """
+ mock_hook.return_value.get_operation =
mock.AsyncMock(side_effect=fatal_exception)
+
+ generator = trigger.run()
+ with pytest.raises(type(fatal_exception)):
+ await generator.asend(None) # type:ignore[attr-defined]
+
Review Comment:
Fixed, test now asserts that non retriable errors don't sleep
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]