tanvn commented on issue #39088: URL: https://github.com/apache/airflow/issues/39088#issuecomment-2093473579
I think I found the cause of this issue. The logic of `adopt_or_reset_orphaned_tasks` is called 2 times in a very short time due to the use of `run_with_db_retries` https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1610 When the first attempt tried to update task instances in my MySQL, a `MySQLdb.OperationalError` error happened, and the second attempt was carried out almost immediately (0.4467290363932428 seconds, as shown in the log) https://github.com/apache/airflow/blob/2.8.4/airflow/jobs/scheduler_job_runner.py#L1651-L1679 In the second attempt, while the pods have been adopted already so the corresponding task instances are not removed from `tis_to_flush_by_key` and were added to `tis_to_flush` due to `tis_to_flush.extend(tis_to_flush_by_key.values())` https://github.com/apache/airflow/blob/2.8.4/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py#L553-L575 Here is the logs ``` [2024-05-03T17:18:59.326+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 1 of 3 ... [2024-05-03T17:19:07.223+0000] {before_sleep.py:65} INFO - Retrying <unknown> in 0.4467290363932428 seconds as it raised OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s] [parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 'test__orphaned_test_dag', 'select_next_17', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28 ', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 'scheduled__2024-04-24T02:30:00+00:00', -1) ... displaying 10 of 31 total bound parameter sets ... (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221560), 'test__orphaned_test_dag', 'select_next_75', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 'scheduled__2024-04-24T02:30:00+00:00', -1))] (Background on this error at: https://sqlalche.me/e/14/e3q8). Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany rowcount = cursor.executemany(statement, parameters) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany self.rowcount = sum(self.execute(query, arg) for arg in args) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr> self.rowcount = sum(self.execute(query, arg) for arg in args) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query _mysql.connection.query(self, query) MySQLdb.OperationalError: (2013, 'Lost connection to MySQL server during query') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/airflow_src/airflow/jobs/scheduler_job_runner.py", line 1684, in adopt_or_reset_orphaned_tasks session.flush() File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3449, in flush self._flush(objects) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush with util.safe_reraise(): File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush flush_context.execute() File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj _emit_update_statements( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements c = connection._execute_20( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/usr/local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection return connection._execute_clauseelement( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement ret = self._execute_context( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context self._handle_dbapi_exception( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception util.raise_( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1890, in _execute_context self.dialect.do_executemany( File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 183, in do_executemany rowcount = cursor.executemany(statement, parameters) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in executemany self.rowcount = sum(self.execute(query, arg) for arg in args) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 250, in <genexpr> self.rowcount = sum(self.execute(query, arg) for arg in args) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 179, in execute res = self._query(mogrified_query) File "/usr/local/lib/python3.10/site-packages/MySQLdb/cursors.py", line 330, in _query db.query(q) File "/usr/local/lib/python3.10/site-packages/MySQLdb/connections.py", line 261, in query _mysql.connection.query(self, query) sqlalchemy.exc.OperationalError: (MySQLdb.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: UPDATE task_instance SET queued_by_job_id=%s, updated_at=%s WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s] [parameters: ((7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221361), 'test__orphaned_test_dag', 'select_next_17', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221383), 'test__orphaned_test_dag', 'select_next_18', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221391), 'test__orphaned_test_dag', 'select_next_20', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221398), 'test__orphaned_test_dag', 'select_next_22', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221405), 'test__orphaned_test_dag', 'select_next_23', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221411), 'test__orphaned_test_dag', 'select_next_24', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221418), 'test__orphaned_test_dag', 'select_next_28 ', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221424), 'test__orphaned_test_dag', 'select_next_29', 'scheduled__2024-04-24T02:30:00+00:00', -1) ... displaying 10 of 31 total bound parameter sets ... (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221560), 'test__orphaned_test_dag', 'select_next_75', 'scheduled__2024-04-24T02:30:00+00:00', -1), (7430, datetime.datetime(2024, 5, 3, 17, 19, 7, 221566), 'test__orphaned_test_dag', 'select_next_76', 'scheduled__2024-04-24T02:30:00+00:00', -1))] (Background on this error at: https://sqlalche.me/e/14/e3q8) [2024-05-03T17:19:07.727+0000] {scheduler_job_runner.py:1617} INFO - Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try 2 of 3 ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
