arorasachin9 opened a new issue, #60517:
URL: https://github.com/apache/airflow/issues/60517

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.3
   
   ### What happened?
   
   We are using the EMRContainers Operators and running it in deferred mode. We 
have a timeout of 20 minutes on the task. Sometimes the task takes more than 20 
minutes. The control goes back to the worker pod from triggerer after the 
timeout but we are not caching the deferred timeout exception while getting the 
control back to worker. This is leading to the job keep on running on EMR 
instead of cancelling which happens in case of normal TaskTimeoutException. IN 
case of retry this is leading to parallel execution of the workloads which is 
not expected.
    In file task_runner.py
   ```    if task.execution_timeout:
           from airflow.sdk.execution_time.timeout import timeout
   
           # TODO: handle timeout in case of deferral
           timeout_seconds = task.execution_timeout.total_seconds()
           try:
               # It's possible we're already timed out, so fast-fail if true
               if timeout_seconds <= 0:
                   raise AirflowTaskTimeout()
               # Run task in timeout wrapper
               with timeout(timeout_seconds):
                   result = ctx.run(execute, context=context)
           except AirflowTaskTimeout:
               task.on_kill()
               raise
       else:
           result = ctx.run(execute, context=context)
   ```
   We are only caching the `AirflowTaskTimeout` Exception but he exception that 
is thrown while resuming the taks on the worker is `TaskDeferralTimeout`
   File: operator.py
   ```
       def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] 
| None, context: Context):
           """Entrypoint method called by the Task Runner (instead of execute) 
when this task is resumed."""
           from airflow.sdk.exceptions import TaskDeferralError, 
TaskDeferralTimeout
   
           if next_kwargs is None:
               next_kwargs = {}
           # __fail__ is a special signal value for next_method that indicates
           # this task was scheduled specifically to fail.
   
           if next_method == TRIGGER_FAIL_REPR:
               next_kwargs = next_kwargs or {}
               traceback = next_kwargs.get("traceback")
               if traceback is not None:
                   self.log.error("Trigger failed:\n%s", "\n".join(traceback))
               if (error := next_kwargs.get("error", "Unknown")) == 
TriggerFailureReason.TRIGGER_TIMEOUT:
                   raise TaskDeferralTimeout(error)
               raise TaskDeferralError(error)
           # Grab the callable off the Operator/Task and add in any kwargs
           execute_callable = getattr(self, next_method)
           return execute_callable(context, **next_kwargs)
   ```
   
   ### What you think should happen instead?
   
   Expectation is the EMR job gets cancelled in case of the task timeout
   
   ### How to reproduce
   
   1. Add a EMRContainersOperator Task in airflow DAG with deferrable True
   2. Let the task timeout with execution timeout
   3. The job will keep on running on EMR.
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to