pmcquighan-camus opened a new issue, #57359:
URL: https://github.com/apache/airflow/issues/57359

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-google==18.0.0
   
   ### Apache Airflow version
   
   3.1.0
   
   ### Operating System
   
   Debian
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   The provider was polling for job status every 30 seconds for about 
15-minutes as expected.  However, the Dataflow API then returned a 503 - 
Service Unavailable error, and the task then failed.  Retries of the task also 
failed since this was run in deferrable mode and so the task retries saw the 
trigger had completed with an exception, and then also failed.
   
   The Google Dataflow API seems to return 503s with some regularity (weekly) 
with our current workload resulting in a failed DAG run every several days 
(that can be retried by clearing out the full task state, but is wasteful since 
the dataflow job likely already succeeded but any needed XCom values are not 
output)
   
   Initial try:
   ```
   [2025-10-26 22:39:17] ERROR - Exception occurred while checking for job 
completion. 
source=airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger 
loc=dataflow.py:149
   ServiceUnavailable: 503 The service is currently unavailable.
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/triggers/dataflow.py",
 line 113 in run
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 1480 in get_job_status
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/dataflow.py",
 line 1457 in get_job
   File 
"/home/airflow/.local/lib/python3.12/site-packages/google/cloud/dataflow_v1beta3/services/jobs_v1_beta3/async_client.py",
 line 478 in get_job
   File 
"/home/airflow/.local/lib/python3.12/site-packages/google/api_core/grpc_helpers_async.py",
 line 88 in __await__
   AioRpcError: <AioRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "The service is currently unavailable."
        debug_error_string = "UNKNOWN:Error received from peer 
ipv4:74.125.132.95:443 {created_time:"2025-10-27T05:39:17.82102088+00:00", 
grpc_status:14, grpc_message:"The service is currently unavailable."}"
   >
   File 
"/home/airflow/.local/lib/python3.12/site-packages/google/api_core/grpc_helpers_async.py",
 line 85 in __await__
   File 
"/home/airflow/.local/lib/python3.12/site-packages/grpc/aio/_interceptor.py", 
line 472 in __await__
   File "/home/airflow/.local/lib/python3.12/site-packages/grpc/aio/_call.py", 
line 327 in __await__
   ```
   
   Task retry a few minutes later:
   ```
   [2025-10-26 22:42:11] DEBUG - Result from 'on_task_instance_running': [] 
source=airflow.listeners.listener loc=listener.py:42
   [2025-10-26 22:42:11] INFO - status: error, msg: 503 The service is 
currently unavailable. 
source=airflow.task.operators.airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator
 loc=dataflow.py:648
   [2025-10-26 22:42:11] ERROR - Task failed with exception source=task 
loc=task_runner.py:972
   AirflowException: 503 The service is currently unavailable.
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 920 in run
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1307 in _execute_task
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py",
 line 1632 in resume_execution
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/operators/dataflow.py",
 line 649 in execute_complete
   [2025-10-26 22:42:11] ERROR - Top level error source=task 
loc=task_runner.py:1457
   ValueError: dictionary update sequence element #0 has length 1; 2 is required
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1452 in main
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
 line 1390 in finalize
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
 line 112 in get_link
   File 
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py",
 line 92 in get_config
   [2025-10-26 22:42:11] WARNING - Process exited abnormally exit_code=1 
source=task
   ```
   
   ### What you think should happen instead
   
   Since the 503 error is a retryable error, the trigger should retry several 
times or perhaps indefinitely, before failing, just as if the job were still 
running.
   
   ### How to reproduce
   
   Run a dataflow flex template via the `DataflowStartFlexTemplateOperator`, if 
the google dataflow API returns a 503 while the trigger is polling for status 
the error will recur.
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to