tanvn commented on issue #39088:
URL: https://github.com/apache/airflow/issues/39088#issuecomment-2093473579

   I think I found the cause of this issue.
   The logic of `adopt_or_reset_orphaned_tasks` is called 2 times in a very 
short time due to the use of `run_with_db_retries`
   
https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1610
   
   When the first attempt tried to update task instances in my MySQL, a 
`MySQLdb.OperationalError` error happened,
   and the second attempt was carried out almost immediately 
(0.4467290363932428 seconds, as shown in the log)
   
   
https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1651-L1679
   
   In the second attempt, while the pods have been adopted already so the 
corresponding task instances are not removed from `tis_to_flush_by_key` and 
were added to  `tis_to_flush` due to  
`tis_to_flush.extend(tis_to_flush_by_key.values())`
   
https://github.com/apache/airflow/blob/2.8.4/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py#L553-L575
   
   Here is the logs
   
   ```
   [2024-05-03T17:18:59.326+0000] {scheduler_job_runner.py:1617} INFO - Running 
SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3
   ...
   [2024-05-03T17:19:07.223+0000] {before_sleep.py:65} INFO - Retrying 
<unknown> in 0.4467290363932428 seconds as it raised OperationalError: 
(MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during 
query')
   [SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE 
task_instance.dag_id = %s AND task_instance.task_id = %s AND 
task_instance.run_id = %s AND task_instance.map_index = %s]
   [parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 
'test__orphaned_test_dag', 'select_next_17', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28
 ', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 
5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 
'scheduled__2024-04-24T02:30:00+00:00', -1)  ... displaying 10 of 31 total 
bound parameter sets ...  (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 
221560), 'test__orphaned_test_dag', 'select_next_75', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 
'scheduled__2024-04-24T02:30:00+00:00', -1))]
   (Background on this error at: https://sqlalche.me/e/14/e3q8).
   Traceback (most recent call last):
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", 
line 183, in do_executemany
       rowcount = cursor.executemany(statement, parameters)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
250, in executemany
       self.rowcount = sum(self.execute(query, arg) for arg in args)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
250, in <genexpr>
       self.rowcount = sum(self.execute(query, arg) for arg in args)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", 
line 261, in query
       _mysql.connection.query(self, query)
   MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during 
query')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, 
in adopt_or_reset_orphaned_tasks
       session.flush()
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3588, in _flush
       with util.safe_reraise():
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 
456, in execute
       rec.execute(self)
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 
630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 
237, in save_obj
       _emit_update_statements(
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 
1001, in _emit_update_statements
       c = connection._execute_20(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", 
line 1890, in _execute_context
       self.dialect.do_executemany(
     File 
"/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", 
line 183, in do_executemany
       rowcount = cursor.executemany(statement, parameters)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
250, in executemany
       self.rowcount = sum(self.execute(query, arg) for arg in args)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
250, in <genexpr>
       self.rowcount = sum(self.execute(query, arg) for arg in args)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
179, in execute
       res = self._query(mogrified_query)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 
330, in _query
       db.query(q)
     File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", 
line 261, in query
       _mysql.connection.query(self, query)
   sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost 
connection to MySQL server during query')
   [SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE 
task_instance.dag_id = %s AND task_instance.task_id = %s AND 
task_instance.run_id = %s AND task_instance.map_index = %s]
   [parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 
'test__orphaned_test_dag', 'select_next_17', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28
 ', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 
5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 
'scheduled__2024-04-24T02:30:00+00:00', -1)  ... displaying 10 of 31 total 
bound parameter sets ...  (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 
221560), 'test__orphaned_test_dag', 'select_next_75', 
'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 
3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 
'scheduled__2024-04-24T02:30:00+00:00', -1))]
   (Background on this error at: https://sqlalche.me/e/14/e3q8)
   [2024-05-03T17:19:07.727+0000] {scheduler_job_runner.py:1617} INFO - Running 
SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3
   ...
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to