liuning89757 commented on issue #27300:
URL: https://github.com/apache/airflow/issues/27300#issuecomment-1413361554

   Hi @potiuk @ashb @ephraimbuddy ,  hopes my findings may offer some clues 
about this
   I have met similar issue, almost each time when a dynamic task expand to 30+ 
indexes. (Airflow 2.4.3 and MariaDB 10.3.27)
   The traceback:
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.9/site-packages/airflow/executors/local_executor.py", 
line 126, in _execute_work_in_fork
       args.func(args)
     File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 52, in command
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
103, in wrapper
       return f(*args, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 382, in task_run
       _run_task_by_selected_method(args, dag, ti)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 189, in _run_task_by_selected_method
       _run_task_by_local_task_job(args, ti)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", 
line 247, in _run_task_by_local_task_job
       run_job.run()
     File "/usr/local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 247, in run
       self._execute()
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 
132, in _execute
       self.handle_task_exit(return_code)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 
163, in handle_task_exit
       self.task_instance.schedule_downstream_tasks()
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 75, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 
2611, in schedule_downstream_tasks
       info = dag_run.task_instance_scheduling_decisions(session)
     File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 72, in wrapper
       return func(*args, **kwargs)
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 696, in task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
     File "/usr/local/lib/python3.9/site-packages/airflow/models/dagrun.py", 
line 755, in _get_ready_tis
       expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, 
session=session)
     File 
"/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 
719, in expand_mapped_task
       session.flush()
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3444, in flush
       self._flush(objects)
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3584, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/usr/local/lib64/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 210, in raise_
       raise exception
     File "/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3544, in _flush
       flush_context.execute()
     File 
"/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 
237, in save_obj
       _emit_update_statements(
     File 
"/usr/local/lib64/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 
1035, in _emit_update_statements
       raise orm_exc.StaleDataError(
   sqlalchemy.orm.exc.StaleDataError: UPDATE statement on table 'task_instance' 
expected to update 1 row(s); 0 were matched.
   ```
   I turn on general log to capture the SQL when exception happens, found that 
there are two session (537138 and 535083) execute the same statement `UPDATE 
task_instance SET map_index=0 ... where ... AND task_instance.map_index = -1`, 
537138 commit but 535083 rollback then raise the StaleDataError.
   ```
   ...
   537138 Query UPDATE task_instance SET state='removed' WHERE 
task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 
'update_server' AND task_instance.run_id = 
'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index >= 37
                537138 Query    UPDATE task_instance SET map_index=0 WHERE 
task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 
'update_server' AND task_instance.run_id = 
'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index = -1
                537138 Query    INSERT INTO task_instance (task_id, dag_id, 
run_id, map_index, start_date, end_date, duration, state, try_number, 
max_tries, hostname, unixname, job_id, pool, pool_slots, queue, 
priority_weight, operator, queued_dttm, queued_by_job_id, pid, executor_config, 
external_executor_id, trigger_id, trigger_timeout, next_method, next_kwargs) 
VALUES ('update_server', 'new_monitor_server', 
'scheduled__2023-02-01T08:35:00+00:00', 1, NULL, NULL, NULL, NULL, 0, 0, '', 
'root', NULL, 'default_pool', 1, 'default', 1, '_PythonDecoratedOperator', 
NULL, NULL, NULL, '€}”.', NULL, NULL, NULL, NULL, 'null'),('update_server', 
'new_monitor_server', 'scheduled__2023-02-01T08:35:00+00:00', 2, NULL, NULL, 
NULL, NULL, 0, 0, '', 'root', NULL, 'default_pool', 1, 'default', 1, 
'_PythonDecoratedOperator', NULL, NULL, NULL, '€}”.', NULL, NULL, NULL, NULL, 
'null'),('update_server', 'new_monitor_server', 
'scheduled__2023-02-01T08:35:00+00:00', 3, NULL, NULL, NULL, NULL, 0, 0, '', 
'root', NUL
 L, 'default_pool', 1, 'default', 1, 
   ......
   535083 Query UPDATE task_instance SET state='removed' WHERE 
task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 
'update_server' AND task_instance.run_id = 
'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index >= 37
                535083 Query    UPDATE dag_run SET 
last_scheduling_decision='2023-02-01 08:40:01.318630' WHERE dag_run.id = 4320
                535083 Query    UPDATE task_instance SET map_index=0 WHERE 
task_instance.dag_id = 'new_monitor_server' AND task_instance.task_id = 
'update_server' AND task_instance.run_id = 
'scheduled__2023-02-01T08:35:00+00:00' AND task_instance.map_index = -1
                535083 Query    ROLLBACK
                535083 Query    ROLLBACK
   ```
   
   After dig into the code in `airflow/models/mappedoperator.py` function 
`expand_mapped_task`,  i think there is a race condition in this section (maybe 
 mini scheduler and the main scheduler ? )  `unmapped_ti` is not use  lock 
`with_for_update()`, when two thread concurrently execute ` 
unmapped_ti.map_index = 0`, the first thread commit but the second thread call 
`flush()` may raise this exception.
   ```
   unmapped_ti: TaskInstance | None = (
               session.query(TaskInstance)
               .filter(
                   TaskInstance.dag_id == self.dag_id,
                   TaskInstance.task_id == self.task_id,
                   TaskInstance.run_id == run_id,
                   TaskInstance.map_index == -1,
                   or_(TaskInstance.state.in_(State.unfinished), 
TaskInstance.state.is_(None)),
               )
               .one_or_none()
           )
   
           all_expanded_tis: list[TaskInstance] = []
   
           if unmapped_ti:
               # The unmapped task instance still exists and is unfinished, 
i.e. we
               # haven't tried to run it before.
               if total_length is None:
                   if self.dag and self.dag.partial:
                       # If the DAG is partial, it's likely that the upstream 
tasks
                       # are not done yet, so we do nothing
                       indexes_to_map: Iterable[int] = ()
                   else:
                       # If the map length cannot be calculated (due to 
unavailable
                       # upstream sources), fail the unmapped task.
                       unmapped_ti.state = TaskInstanceState.UPSTREAM_FAILED
                       indexes_to_map = ()
               elif total_length < 1:
                   # If the upstream maps this to a zero-length value, simply 
mark
                   # the unmapped task instance as SKIPPED (if needed).
                   self.log.info(
                       "Marking %s as SKIPPED since the map has %d values to 
expand",
                       unmapped_ti,
                       total_length,
                   )
                   unmapped_ti.state = TaskInstanceState.SKIPPED
                   indexes_to_map = ()
               else:
                   # Otherwise convert this into the first mapped index, and 
create
                   # TaskInstance for other indexes.
                   unmapped_ti.map_index = 0
                   self.log.debug("Updated in place to become %s", unmapped_ti)
                   all_expanded_tis.append(unmapped_ti)
                   indexes_to_map = range(1, total_length)
   ```
   
   After change `unmapped_ti`  to `with_for_update().one_or_none()`, the 
exception not reappear  recently. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to