tirkarthi opened a new issue, #31431:
URL: https://github.com/apache/airflow/issues/31431

   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   Clearing a task flow function executed earlier with task changed to mapped 
task crashes scheduler. It seems TaskMap stored has a foreign key reference by 
map_index which needs to be cleared before execution.
   
   ```
   airflow scheduler       
   
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py:1001
 DeprecationWarning: The namespace option in [kubernetes] has been moved to the 
namespace option in [kubernetes_executor] - the old setting has been used, but 
please update your config.
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:196
 DeprecationWarning: The '[celery] task_adoption_timeout' config option is 
deprecated. Please update your config to use '[scheduler] task_queued_timeout' 
instead.
   
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:201
 DeprecationWarning: The worker_pods_pending_timeout option in [kubernetes] has 
been moved to the worker_pods_pending_timeout option in [kubernetes_executor] - 
the old setting has been used, but please update your config.
   
/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py:206
 DeprecationWarning: The '[kubernetes_executor] worker_pods_pending_timeout' 
config option is deprecated. Please update your config to use '[scheduler] 
task_queued_timeout' instead.
   [2023-05-19T23:41:07.907+0530] {executor_loader.py:114} INFO - Loaded 
executor: SequentialExecutor
   [2023-05-19 23:41:07 +0530] [15527] [INFO] Starting gunicorn 20.1.0
   [2023-05-19 23:41:07 +0530] [15527] [INFO] Listening at: http://[::]:8793 
(15527)
   [2023-05-19 23:41:07 +0530] [15527] [INFO] Using worker: sync
   [2023-05-19 23:41:07 +0530] [15528] [INFO] Booting worker with pid: 15528
   [2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:789} INFO - Starting 
the scheduler
   [2023-05-19T23:41:07.952+0530] {scheduler_job_runner.py:796} INFO - 
Processing each file at most -1 times
   [2023-05-19T23:41:07.954+0530] {scheduler_job_runner.py:1542} INFO - 
Resetting orphaned tasks for active dag runs
   [2023-05-19 23:41:07 +0530] [15529] [INFO] Booting worker with pid: 15529
   [2023-05-19T23:41:08.567+0530] {scheduler_job_runner.py:853} ERROR - 
Exception when executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 836, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 970, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1052, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
 line 90, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
 line 382, in __iter__
       do = self.iter(retry_state=retry_state)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
 line 349, in iter
       return fut.result()
     File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
       return self.__get_result()
     File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in 
__get_result
       raise self._exception
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
 line 99, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1347, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
 line 2811, in __iter__
       return self._iter().__iter__()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
 line 2818, in _iter
       result = self.session.execute(
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 1669, in execute
       conn = self._connection_for_bind(bind, close_with_result=True)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 1519, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 721, in _connection_for_bind
       self._assert_active()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 601, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table 
"task_instance" violates foreign key constraint "task_map_task_instance_fkey" 
on table "task_map"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command, 
manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table 
"task_map".
   
   [SQL: UPDATE task_instance SET map_index=%(map_index)s, 
updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s 
AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id 
= %(task_instance_run_id)s AND task_instance.map_index = 
%(task_instance_map_index)s]
   [parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19, 
18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id': 
'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id': 
'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
   (Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on 
this error at: http://sqlalche.me/e/14/7s2a)
   [2023-05-19T23:41:08.572+0530] {scheduler_job_runner.py:865} INFO - Exited 
execute loop
   Traceback (most recent call last):
     File "/home/karthikeyan/stuff/python/airflow/.env/bin/airflow", line 8, in 
<module>
       sys.exit(main())
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/__main__.py",
 line 48, in main
       args.func(args)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/cli_config.py",
 line 51, in command
       return func(*args, **kwargs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/cli.py",
 line 112, in wrapper
       return f(*args, **kwargs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
 line 77, in scheduler
       _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py",
 line 42, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/session.py",
 line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py",
 line 284, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/job.py",
 line 313, in execute_job
       ret = execute_callable()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 836, in _execute
       self._run_scheduler_loop()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 970, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1052, in _do_scheduling
       callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
 line 90, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
 line 382, in __iter__
       do = self.iter(retry_state=retry_state)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/tenacity/__init__.py",
 line 349, in iter
       return fut.result()
     File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
       return self.__get_result()
     File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in 
__get_result
       raise self._exception
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/utils/retries.py",
 line 99, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1347, in _schedule_all_dag_runs
       callback_tuples = [(run, self._schedule_dag_run(run, session=session)) 
for run in dag_runs]
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
 line 2811, in __iter__
       return self._iter().__iter__()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/query.py",
 line 2818, in _iter
       result = self.session.execute(
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 1669, in execute
       conn = self._connection_for_bind(bind, close_with_result=True)
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 1519, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 721, in _connection_for_bind
       self._assert_active()
     File 
"/home/karthikeyan/stuff/python/airflow/.env/lib/python3.10/site-packages/sqlalchemy/orm/session.py",
 line 601, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (psycopg2.errors.ForeignKeyViolation) update or delete on table 
"task_instance" violates foreign key constraint "task_map_task_instance_fkey" 
on table "task_map"
   DETAIL:  Key (dag_id, task_id, run_id, map_index)=(bash_simple, get_command, 
manual__2023-05-18T13:54:01.345016+00:00, -1) is still referenced from table 
"task_map".
   
   [SQL: UPDATE task_instance SET map_index=%(map_index)s, 
updated_at=%(updated_at)s WHERE task_instance.dag_id = %(task_instance_dag_id)s 
AND task_instance.task_id = %(task_instance_task_id)s AND task_instance.run_id 
= %(task_instance_run_id)s AND task_instance.map_index = 
%(task_instance_map_index)s]
   [parameters: {'map_index': 0, 'updated_at': datetime.datetime(2023, 5, 19, 
18, 11, 8, 90512, tzinfo=Timezone('UTC')), 'task_instance_dag_id': 
'bash_simple', 'task_instance_task_id': 'get_command', 'task_instance_run_id': 
'manual__2023-05-18T13:54:01.345016+00:00', 'task_instance_map_index': -1}]
   (Background on this error at: http://sqlalche.me/e/14/gkpj) (Background on 
this error at: http://sqlalche.me/e/14/7s2a)
   ```
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   1. Create the dag with `command = get_command(1, 1)` and trigger a dagrun 
waiting for it to complete
   2. Now change this to `command = 
get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4])` so that the task is 
now mapped.
   3. Clear the existing task that causes the scheduler to crash.
   
   ```python
   import datetime, time
   
   from airflow.operators.bash import BashOperator
   from airflow import DAG
   from airflow.decorators import task
   
   with DAG(
       dag_id="bash_simple",
       start_date=datetime.datetime(2022, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
   
       @task
       def get_command(arg1, arg2):
           for i in range(10):
               time.sleep(1)
               print(i)
   
           return ["echo hello"]
   
       command = get_command(1, 1)
       # command = get_command.partial(arg1=[1]).expand(arg2=[1, 2, 3, 4])
   
       t1 = BashOperator.partial(task_id="bash").expand(bash_command=command)
   
   
   if __name__ == "__main__":
       dag.test()
   ```
   
   ### Operating System
   
   Ubuntu
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to