arorasachin9 opened a new issue, #60517:
URL: https://github.com/apache/airflow/issues/60517
### Apache Airflow version
Other Airflow 3 version (please specify below)
### If "Other Airflow 3 version" selected, which one?
3.1.3
### What happened?
We are using the EMRContainers Operators and running it in deferred mode. We
have a timeout of 20 minutes on the task. Sometimes the task takes more than 20
minutes. The control goes back to the worker pod from triggerer after the
timeout but we are not caching the deferred timeout exception while getting the
control back to worker. This is leading to the job keep on running on EMR
instead of cancelling which happens in case of normal TaskTimeoutException. IN
case of retry this is leading to parallel execution of the workloads which is
not expected.
In file task_runner.py
``` if task.execution_timeout:
from airflow.sdk.execution_time.timeout import timeout
# TODO: handle timeout in case of deferral
timeout_seconds = task.execution_timeout.total_seconds()
try:
# It's possible we're already timed out, so fast-fail if true
if timeout_seconds <= 0:
raise AirflowTaskTimeout()
# Run task in timeout wrapper
with timeout(timeout_seconds):
result = ctx.run(execute, context=context)
except AirflowTaskTimeout:
task.on_kill()
raise
else:
result = ctx.run(execute, context=context)
```
We are only caching the `AirflowTaskTimeout` Exception but he exception that
is thrown while resuming the taks on the worker is `TaskDeferralTimeout`
File: operator.py
```
def resume_execution(self, next_method: str, next_kwargs: dict[str, Any]
| None, context: Context):
"""Entrypoint method called by the Task Runner (instead of execute)
when this task is resumed."""
from airflow.sdk.exceptions import TaskDeferralError,
TaskDeferralTimeout
if next_kwargs is None:
next_kwargs = {}
# __fail__ is a special signal value for next_method that indicates
# this task was scheduled specifically to fail.
if next_method == TRIGGER_FAIL_REPR:
next_kwargs = next_kwargs or {}
traceback = next_kwargs.get("traceback")
if traceback is not None:
self.log.error("Trigger failed:\n%s", "\n".join(traceback))
if (error := next_kwargs.get("error", "Unknown")) ==
TriggerFailureReason.TRIGGER_TIMEOUT:
raise TaskDeferralTimeout(error)
raise TaskDeferralError(error)
# Grab the callable off the Operator/Task and add in any kwargs
execute_callable = getattr(self, next_method)
return execute_callable(context, **next_kwargs)
```
### What you think should happen instead?
Expectation is the EMR job gets cancelled in case of the task timeout
### How to reproduce
1. Add a EMRContainersOperator Task in airflow DAG with deferrable True
2. Let the task timeout with execution timeout
3. The job will keep on running on EMR.
### Operating System
Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]