tirkarthi opened a new issue, #31431:
URL: https://github.com/apache/airflow/issues/31431
### Apache Airflow version
main (development)
### What happened
Clearing a task flow function executed earlier with task changed to mapped
task crashes scheduler. It seems TaskMap stored has a foreign key reference by
map_index which needs to be cleared before execution.
```
airflow scheduler
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py:1001
DeprecationWarning: The namespace option in [kubernetes] has been moved to the
namespace option in [kubernetes_executor] - the old setting has been used, but
please update your config.
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:196
DeprecationWarning: The '[celery] task_adoption_timeout' config option is
deprecated. Please update your config to use '[scheduler] task_queued_timeout'
instead.
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:201
DeprecationWarning: The worker_pods_pending_timeout option in [kubernetes] has
been moved to the worker_pods_pending_timeout option in [kubernetes_executor] -
the old setting has been used, but please update your config.
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:206
DeprecationWarning: The '[kubernetes_executor] worker_pods_pending_timeout'
config option is deprecated. Please update your config to use '[scheduler]
task_queued_timeout' instead.
[2023-05-19T23:41:07.907+0530] {executor_loader.py:114} INFO - Loaded
executor: SequentialExecutor
[2023-05-19 23:41:07 +0530] [15527] [INFO] Starting gunicorn 20.1.0
[2023-05-19 23:41:07 +0530] [15527] [INFO] Listening at: http://[::]:8793
(15527)
[2023-05-19 23:41:07 +0530] [15527] [INFO] Using worker: sync
[2023-05-19 23:41:07 +0530] [15528] [INFO] Booting worker with pid: 15528
[2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:789} INFO - Starting
the scheduler
[2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:796} INFO -
Processing each file at most -1 times
[2023-05-19T23:41:07.954+0530] {scheduler_job_runner.py:1542} INFO -
Resetting orphaned tasks for active dag runs
[2023-05-19 23:41:07 +0530] [15529] [INFO] Booting worker with pid: 15529
[2023-05-19T23:41:08.567+0530] {scheduler_job_runner.py:853} ERROR -
Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 836, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 970, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1052, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
line 90, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
line 382, in __iter__
do = self.iter(retry_state=retry_state)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
line 349, in iter
return fut.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in
__get_result
raise self._exception
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
line 99, in wrapped_function
return func(*args, **kwargs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1347, in _schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session))
for run in dag_runs]
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2811, in __iter__
return self._iter().__iter__()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2818, in _iter
result = self.session.execute(
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1669, in execute
conn = self._connection_for_bind(bind, close_with_result=True)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1519, in _connection_for_bind
return self._transaction._connection_for_bind(
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 721, in _connection_for_bind
self._assert_active()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 601, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table
"task_instance" violates foreign key constraint "task_map_task_instance_fkey"
on table "task_map"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command,
manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table
"task_map".
[SQL: UPDATE task_instance SET map_index=%(map_index)s,
updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s
AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id
= %(task_instance_run_id)s AND task_instance.map_index =
%(task_instance_map_index)s]
[parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19,
18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id':
'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id':
'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
(Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on
this error at: http://sqlalche.me/e/14/7s2a)
[2023-05-19T23:41:08.572+0530] {scheduler_job_runner.py:865} INFO - Exited
execute loop
Traceback (most recent call last):
File "/home/karthikeyan/stuff/python/airflow/.env/bin/airflow", line 8, in
<module>
sys.exit(main())
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/__main__.py",
line 48, in main
args.func(args)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py",
line 51, in command
return func(*args, **kwargs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/cli.py",
line 112, in wrapper
return f(*args, **kwargs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 77, in scheduler
_run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
line 42, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py",
line 284, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py",
line 313, in execute_job
ret = execute_callable()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 836, in _execute
self._run_scheduler_loop()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 970, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1052, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
line 90, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
line 382, in __iter__
do = self.iter(retry_state=retry_state)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
line 349, in iter
return fut.result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in
__get_result
raise self._exception
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
line 99, in wrapped_function
return func(*args, **kwargs)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1347, in _schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session))
for run in dag_runs]
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2811, in __iter__
return self._iter().__iter__()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
line 2818, in _iter
result = self.session.execute(
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1669, in execute
conn = self._connection_for_bind(bind, close_with_result=True)
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 1519, in _connection_for_bind
return self._transaction._connection_for_bind(
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 721, in _connection_for_bind
self._assert_active()
File
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
line 601, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table
"task_instance" violates foreign key constraint "task_map_task_instance_fkey"
on table "task_map"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command,
manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table
"task_map".
[SQL: UPDATE task_instance SET map_index=%(map_index)s,
updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s
AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id
= %(task_instance_run_id)s AND task_instance.map_index =
%(task_instance_map_index)s]
[parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19,
18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id':
'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id':
'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
(Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on
this error at: http://sqlalche.me/e/14/7s2a)
```
### What you think should happen instead
_No response_
### How to reproduce
1. Create the dag with `command = get_command(1, 1)` and trigger a dagrun
waiting for it to complete
2. Now change this to `command =
get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4])` so that the task is
now mapped.
3. Clear the existing task that causes the scheduler to crash.
```python
import datetime, time
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="bash_simple",
start_date=datetime.datetime(2022, 1, 1),
schedule=None,
catchup=False,
) as dag:
@task
def get_command(arg1, arg2):
for i in range(10):
time.sleep(1)
print(i)
return ["echo hello"]
command = get_command(1, 1)
# command = get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4])
t1 = BashOperator.partial(task_id="bash").expand(bash_command=command)
if __name__ == "__main__":
dag.test()
```
### Operating System
Ubuntu
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]