nathadfield opened a new issue, #37013:
URL: https://github.com/apache/airflow/issues/37013

   ### Apache Airflow version
   
   2.8.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Defining a task policy that affects an operator in a mapped task causes a 
DAG Import Error.
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Consider the following Airflow Task Policy which changes the 
`execution_timeout` for `BashOperators` to 30 seconds.
   
   ```
   from datetime import timedelta
   from airflow.models.baseoperator import BaseOperator
   from abc import ABC
   
   
   class TimedOperator(BaseOperator, ABC):
       timeout: timedelta
   
   
   def task_policy(task: TimedOperator):
       if task.operator_name == 'BashOperator':
           task.execution_timeout = timedelta(seconds=30)
   
   ```
   
   If we then create the following DAG and run it, we can see that the task 
does indeed get killed after 30 seconds according to the policy.
   
   ```
   from datetime import datetime
   from airflow import models
   
   from airflow.operators.bash import BashOperator
   
   with models.DAG(
       dag_id='policy_test',
       start_date=datetime(2024, 1, 1),
       catchup=False,
       schedule='0 0 * * *',
   ) as dag:
   
       task1 = BashOperator(
           task_id='single_task',
           bash_command='echo "Hello World!"; sleep 60; echo "Goodbye World!"',
       )
   ```
   
   ```
   [2024-01-25, 15:35:21 UTC] {task_command.py:423} INFO - Running 
<TaskInstance: policy_test.single_task scheduled__2024-01-24T00:00:00+00:00 
[running]> on host 67fd7cade381
   [2024-01-25, 15:35:21 UTC] {taskinstance.py:2480} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='policy_test' 
AIRFLOW_CTX_TASK_ID='single_task' 
AIRFLOW_CTX_EXECUTION_DATE='2024-01-24T00:00:00+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-24T00:00:00+00:00'
   [2024-01-25, 15:35:21 UTC] {subprocess.py:63} INFO - Tmp dir root location: 
/tmp
   [2024-01-25, 15:35:21 UTC] {subprocess.py:75} INFO - Running command: 
['/bin/bash', '-c', 'echo "Hello World!"; sleep 60; echo "Goodbye World!"']
   [2024-01-25, 15:35:21 UTC] {subprocess.py:86} INFO - Output:
   [2024-01-25, 15:35:21 UTC] {subprocess.py:93} INFO - Hello World!
   [2024-01-25, 15:35:51 UTC] {timeout.py:68} ERROR - Process timed out, PID: 
4315
   [2024-01-25, 15:35:51 UTC] {subprocess.py:104} INFO - Sending SIGTERM signal 
to process group
   [2024-01-25, 15:35:51 UTC] {taskinstance.py:2698} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
428, in _execute_task
       result = execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/airflow/operators/bash.py", 
line 203, in execute
       result = self.subprocess_hook.run_command(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/hooks/subprocess.py", line 91, 
in run_command
       for raw_line in iter(self.sub_process.stdout.readline, b""):
     File "/usr/local/lib/python3.11/site-packages/airflow/utils/timeout.py", 
line 69, in handle_timeout
       raise AirflowTaskTimeout(self.error_message)
   airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 4315
   ```
   
   However, if we introduce a dynamically mapped BashOperator task, then we are 
presented with the following error.
   
   ```
   mapped_task = BashOperator.partial(
           task_id='mapped_tasks',
       ).expand(
           bash_command=[
               'echo "Hello World!"; sleep 60; echo "Goodbye World!"',
               'echo "Hello World!"; sleep 60; echo "Goodbye World!"'
           ]
       )
   ```
   
   ![Screenshot 2024-01-25 at 15 37 
48](https://github.com/apache/airflow/assets/967119/03cd4b56-a412-44fb-a98b-411388eb04d7)
   
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to