JosephABC commented on issue #37154:
URL: https://github.com/apache/airflow/issues/37154#issuecomment-2151677374
@nathadfield , we were able to replicate the issue with the following code
on Airflow v2.9.1
Following code is slightly modified from the one above. It should create the
scenario automatically, will just need to enable the dag and have at least 2
dag runs.
```python
from airflow.models import DAG
from datetime import date, datetime, timedelta
from airflow.operators.python_operator import PythonOperator
default_args = {"owner": "joseph.ang", "depends_on_past": True, "retries":
1, "retry_delay": timedelta(minutes=1), "max_active_runs":1}
dag = DAG(
"mapped_task_issue",
description="Testing mapped task issue in Airflow v2.9.1",
schedule_interval="0 0 * * *",
catchup=True,
start_date=datetime(2024, 6, 4),
default_args=default_args,
)
def generate_records(**context):
try_number = context['task_instance'].try_number
try_number = 5 - try_number
generated_list = [ {'i': i} for i in range(try_number)]
print(f'Generated list: {generated_list}')
return generated_list
def mapped_task_method(i: int):
print(i)
if i == 3:
exit(1)
generate_records_task = PythonOperator(
task_id='generate_records_task',
python_callable=generate_records,
provide_context=True,
dag=dag,
)
mapped_task = PythonOperator.partial(
task_id='mapped_task_instance',
python_callable=mapped_task_method,
dag=dag,
).expand(op_kwargs=generate_records_task.output)
```
<img width="1896" alt="image"
src="https://github.com/apache/airflow/assets/25327813/201e2d02-554b-47f7-845d-0b36802e17cd">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]