wilsonhooi86 opened a new issue, #59075: URL: https://github.com/apache/airflow/issues/59075
### Apache Airflow Provider(s) amazon ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==9.0.0 ### Apache Airflow version 2.10.3 ### Operating System Amazon Linux 2023 ### Deployment Amazon (AWS) MWAA ### Deployment details MWAA in production and mwaa-local-runner for local testing. Bug exists on both. ### What happened When a Deferred GlueJobOperator tasks failed with internal errors , the glueJobOperator task is marked as `failed`. However the existing glue `job_run_id` still running. When the GlueJobOperator task retry 2nd time, it will automatically create a new glue `job_run_id` while the existing glue `job_run_id` still running. This caused duplicate run glue jobs at AWS Glue side. <img width="1707" height="685" alt="Image" src="https://github.com/user-attachments/assets/bd1616d3-2507-459d-acef-051d204b0f2f" /> Example of internal errors: ``` [2025-12-05, 01:10:33 UTC] {baseoperator.py:1798} ERROR - Trigger failed: Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers result = details["task"].result() ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger async for event in trigger.run(): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py", line 73, in run await hook.async_job_completion(self.job_name, self.run_id, self.verbose) File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 315, in async_job_completion ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 334, in _handle_state self.print_job_logs( File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 278, in print_job_logs continuation_tokens.output_stream_continuation = display_logs_from( ^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 245, in display_logs_from for response in paginator.paginate( File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 269, in __iter__ response = self._make_request(current_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 357, in _make_request return self._method(**current_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 569, in _api_call return self._make_api_call(operation_name, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1023, in _make_api_call raise error_class(parsed_response, operation_name) botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the FilterLogEvents operation (reached max retries: 4): Rate exceeded [2025-12-05, 01:10:33 UTC] {taskinstance.py:3311} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1799, in resume_execution raise TaskDeferralError(next_kwargs.get("error", "Unknown")) airflow.exceptions.TaskDeferralError: Trigger failure ``` or ``` The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py", line 222, in send response = await session.request( ^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiohttp/client.py", line 667, in _request raise ConnectionTimeoutError( aiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7 During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/utils.py", line 722, in _get_response response = await session.send(request.prepare()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py", line 259, in send raise ConnectTimeoutError(endpoint_url=request.url, error=e) botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "[http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7"](http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7%22) ``` ### What you think should happen instead I would propose to include a logic for GlueJobOperator during next retry, the GlueJobOperator would check if there is a previous glue `job_run_id` , it would check the the glue `STATE`, If the glue `STATE` = in progress / completed, GlueJobOperator should **not create a new glue** `job_run_id` If the glue `STATE` = failed, should **create a new glue** `job_run_id` ### How to reproduce 1. Create a GlueJobOperator task. 2. Set deferrable=True and verbose=False 3. Run the task and watch the Airflow logs while it is in a deferred state. ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
