nathadfield opened a new issue, #37013:
URL: https://github.com/apache/airflow/issues/37013
### Apache Airflow version
2.8.1
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
Defining a task policy that affects an operator in a mapped task causes a
DAG Import Error.
### What you think should happen instead?
_No response_
### How to reproduce
Consider the following Airflow Task Policy which changes the
`execution_timeout` for `BashOperators` to 30 seconds.
```
from datetime import timedelta
from airflow.models.baseoperator import BaseOperator
from abc import ABC
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.operator_name == 'BashOperator':
task.execution_timeout = timedelta(seconds=30)
```
If we then create the following DAG and run it, we can see that the task
does indeed get killed after 30 seconds according to the policy.
```
from datetime import datetime
from airflow import models
from airflow.operators.bash import BashOperator
with models.DAG(
dag_id='policy_test',
start_date=datetime(2024, 1, 1),
catchup=False,
schedule='0 0 * * *',
) as dag:
task1 = BashOperator(
task_id='single_task',
bash_command='echo "Hello World!"; sleep 60; echo "Goodbye World!"',
)
```
```
[2024-01-25, 15:35:21 UTC] {task_command.py:423} INFO - Running
<TaskInstance: policy_test.single_task scheduled__2024-01-24T00:00:00+00:00
[running]> on host 67fd7cade381
[2024-01-25, 15:35:21 UTC] {taskinstance.py:2480} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='policy_test'
AIRFLOW_CTX_TASK_ID='single_task'
AIRFLOW_CTX_EXECUTION_DATE='2024-01-24T00:00:00+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-24T00:00:00+00:00'
[2024-01-25, 15:35:21 UTC] {subprocess.py:63} INFO - Tmp dir root location:
/tmp
[2024-01-25, 15:35:21 UTC] {subprocess.py:75} INFO - Running command:
['/bin/bash', '-c', 'echo "Hello World!"; sleep 60; echo "Goodbye World!"']
[2024-01-25, 15:35:21 UTC] {subprocess.py:86} INFO - Output:
[2024-01-25, 15:35:21 UTC] {subprocess.py:93} INFO - Hello World!
[2024-01-25, 15:35:51 UTC] {timeout.py:68} ERROR - Process timed out, PID:
4315
[2024-01-25, 15:35:51 UTC] {subprocess.py:104} INFO - Sending SIGTERM signal
to process group
[2024-01-25, 15:35:51 UTC] {taskinstance.py:2698} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line
428, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/operators/bash.py",
line 203, in execute
result = self.subprocess_hook.run_command(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/hooks/subprocess.py", line 91,
in run_command
for raw_line in iter(self.sub_process.stdout.readline, b""):
File "/usr/local/lib/python3.11/site-packages/airflow/utils/timeout.py",
line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 4315
```
However, if we introduce a dynamically mapped BashOperator task, then we are
presented with the following error.
```
mapped_task = BashOperator.partial(
task_id='mapped_tasks',
).expand(
bash_command=[
'echo "Hello World!"; sleep 60; echo "Goodbye World!"',
'echo "Hello World!"; sleep 60; echo "Goodbye World!"'
]
)
```

### Operating System
PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux"
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian
HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]