ldacey opened a new issue, #25060:
URL: https://github.com/apache/airflow/issues/25060
### Apache Airflow version
2.3.3 (latest released)
### What happened
Our schedulers have crashed on two occasions after upgrading to Airflow
2.3.3. The same DAG is responsible each time, but this is likely due to the
fact that it is the only dynamic task mapping DAG running right now (catching
up some historical data). This DAG uses the same imported @task function that
many other DAGs used successfully with no errors. The issue has only occurred
after upgrading to Airflow 2.3.3
### What you think should happen instead
This error should not be raised - there should be no record of this task
instance because, according to the UI, the task has not run yet. The extract
task is green but the transform task which raised the error is blank. The DAG
run is stuck in the running state until eventually the scheduler dies and the
Airflow banner notifies me that there is no scheduler heartbeat.
Also, this same DAG (and other which use the same imported external @task
function) ran for hours before the upgrade to Airflow 2.3.3.
```
[2022-07-13 22:49:55,880] {process_utils.py:75} INFO - Process
psutil.Process(pid=143, status='terminated', started='22:49:52') (143)
terminated with exit code None
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/base.py",
line 476, in checkout
[SQL: UPDATE task_instance SET map_index=%(map_index)s WHERE
task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id =
%(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s
AND task_instance.map_index = %(task_instance_map_index)s]
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/base.py",
line 476, in checkout
compat.raise_(
[2022-07-13 22:50:23,909] {scheduler_job.py:780} INFO - Exited execute loop
[parameters: {'map_index': 0, 'task_instance_task_id': 'transform',
'task_instance_dag_id': 'dag-id', 'task_instance_run_id':
'scheduled__2022-06-04T14:05:00+00:00', 'task_instance_map_index': -1}]
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/create.py",
line 590, in connect
rec = pool._do_get()
rec = pool._do_get()
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py",
line 145, in _do_get
return dialect.connect(*cargs, **cparams)
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py",
line 145, in _do_get
File
"/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py",
line 207, in raise_
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[parameters: {'map_index': 0, 'task_instance_task_id': 'transform',
'task_instance_dag_id': 'dag_id, 'task_instance_run_id':
'scheduled__2022-06-04T14:05:00+00:00', 'task_instance_map_index': -1}]
[SQL: UPDATE task_instance SET map_index=%(map_index)s WHERE
task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id =
%(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s
AND task_instance.map_index = %(task_instance_map_index)s]
[2022-07-13 22:49:25,323] {scheduler_job.py:780} INFO - Exited execute loop
raise exception
[2022-07-13 22:49:56,001] {process_utils.py:240} INFO - Waiting up to 5
seconds for processes to exit...
```
### How to reproduce
Run a dynamic task mapping DAG in Airflow 2.3.3
### Operating System
Ubuntu 20.04
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
I am using two schedulers which run on separate nodes.
### Anything else
The DAG only allows 1 max active DAG run at a time.
The extract() task returns a list of 1 or more files which have been saved
on cloud storage. The transform task processes each of these paths
dynamically. I have used these same tasks (imported from another file) for over
15 different DAGs so far without issue. The problem only occurred yesterday
sometime after updating Airflow to 2.3.3.
```
def dag_name():
retrieved = extract()
transform = transform_files(retrieved)
finalize = finalize_dataset(transform)
consolidate = consolidate_staging(transform)
retrieved >> transform >> finalize >> consolidate
```
My `transform_files` task is just a function which expands the XCom Arg of
the extract task and transforms each file. Nearly everything is based on DAG
params which are customized in the DAG.
```
transform_file_task = task(process_data)
def transform_files(source):
return (
transform_file_task.override(task_id="transform")
.partial(
destination=f"{{{{ params.container }}}}/{{{{ dag.dag_id |
dag_name }}}}/{{{{ ti.task_id }}}}",
wasb_conn_id="{{ params.wasb_conn_id }}",
pandas_options="{{ params.pandas_options}}",
meta_columns="{{ params.meta_columns }}",
script="{{ params.script }}",
function_name="{{ params.function }}",
schema_name="{{ params.schema }}",
template=f"{{{{ dag.dag_id | dag_version }}}}-{{{{ ti.run_id
}}}}-{{{{ ti.map_index }}}}",
existing_data_behavior="overwrite_or_ignore",
partition_columns="{{ params.partition_columns }}",
dag_name="{{ dag.dag_id | dag_name }}",
failure_recipients="{{ params.recipients }}",
success_recipients="{{ params.recipients }}",
)
.expand(source=source)
)
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]