nesp159de opened a new issue, #46015:
URL: https://github.com/apache/airflow/issues/46015

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   We used defered operator waiting for something in several instances during 
high mysql database server load.
   We got following issue of airflow core two times:
   
   ```
   Exception when executing TriggererJobRunner._run_trigger_loop
   Traceback (most recent call last):
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py", 
line 153, in execute
       result = self._query(query)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py", 
line 322, in _query
       conn.query(q)
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
558, in query
       self._affected_rows = self._read_query_result(unbuffered=unbuffered)
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
822, in _read_query_result
       result.read()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
1200, in read
       first_packet = self.connection._read_packet()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
772, in _read_packet
       packet.raise_for_error()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/protocol.py", line 
221, in raise_for_error
       err.raise_mysql_exception(self._data)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/err.py", 
line 143, in raise_mysql_exception
       raise errorclass(errno, errval)
   pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get 
lock; try restarting transaction')
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 333, in _execute
       self._run_trigger_loop()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 362, in _run_trigger_loop
       self.handle_events()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
 line 390, in handle_events
       Trigger.submit_event(trigger_id=trigger_id, event=event)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/api_internal/internal_api_call.py",
 line 112, in wrapper
       return func(*args, **kwargs)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/utils/session.py", 
line 76, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/gts/3pp/python/lib/python3.9/contextlib.py", line 126, in 
__exit__
       next(self.gen)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/utils/session.py", 
line 37, in create_session
       session.commit()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 1454, in commit
       self._transaction.commit(_to_root=self.future)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 832, in commit
       self._prepare_impl()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 811, in _prepare_impl
       self.session.flush()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3449, in flush
       self._flush(objects)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3589, in _flush
       transaction.rollback(_capture_exception=True)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
 line 70, in __exit__
       compat.raise_(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py", 
line 3549, in _flush
       flush_context.execute()
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 456, in execute
       rec.execute(self)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
 line 630, in execute
       util.preloaded.orm_persistence.save_obj(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 237, in save_obj
       _emit_update_statements(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
 line 1001, in _emit_update_statements
       c = connection._execute_20(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1710, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", 
line 334, in _execute_on_connection
       return connection._execute_clauseelement(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1577, in _execute_clauseelement
       ret = self._execute_context(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1953, in _execute_context
       self._handle_dbapi_exception(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 2134, in _handle_dbapi_exception
       util.raise_(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/compat.py", 
line 211, in raise_
       raise exception
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py", 
line 1910, in _execute_context
       self.dialect.do_execute(
     File 
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
 line 736, in do_execute
       cursor.execute(statement, parameters)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py", 
line 153, in execute
       result = self._query(query)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py", 
line 322, in _query
       conn.query(q)
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
558, in query
       self._affected_rows = self._read_query_result(unbuffered=unbuffered)
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
822, in _read_query_result
       result.read()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
1200, in read
       first_packet = self.connection._read_packet()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line 
772, in _read_packet
       packet.raise_for_error()
     File 
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/protocol.py", line 
221, in raise_for_error
       err.raise_mysql_exception(self._data)
     File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/err.py", 
line 143, in raise_mysql_exception
       raise errorclass(errno, errval)
   sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 
'Deadlock found when trying to get lock; try restarting transaction')
   
   [SQL: UPDATE task_instance SET state=%(state)s, updated_at=%(updated_at)s, 
         trigger_id=%(trigger_id)s, next_kwargs=%(next_kwargs)s 
         WHERE task_instance.dag_id = %(task_instance_dag_id)s 
           AND task_instance.task_id = %(task_instance_task_id)s 
           AND task_instance.run_id = %(task_instance_run_id)s 
           AND task_instance.map_index = %(task_instance_map_index)s]
   [parameters: {
       'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 
       'updated_at': datetime.datetime(2024, 12, 12, 10, 37, 41, 847579), 
       'trigger_id': None, 
       'next_kwargs': '{"__var": {"event": {"__var": 1733999860.954081, 
"__type": "datetime"}}, "__type": "dict"}', 
       'task_instance_dag_id': 'AE_executor', 
       'task_instance_task_id': 'RFF.RFF_WAIT_FOR_IVS_DATA_CLEANSING_STATUS', 
       'task_instance_run_id': 'manual__2024-12-12T08:01:19+00:00', 
       'task_instance_map_index': -1
   }]
   ```
   
   
   ### What you think should happen instead?
   
   Recommended from mysql is re-trigger the operation as is already implemented 
in airflow for example when fixing: 
https://github.com/apache/airflow/issues/41428
   
   ### How to reproduce
   
   I created code to trigger the function same way as the code raised the issue:
   
   ```
   from operators.reschedule import RescheduleOperator
   from airflow.operators.python import BaseOperator
   from airflow.operators.bash import BashOperator
   from operators.prisma_empty import PrismaEmptyOperator
   from airflow import DAG
   from datetime import datetime, timedelta
   from common.logger import task_logger
   from airflow.triggers.temporal import TimeDeltaTrigger
   
   class EmptyOperator(BashOperator):
       '''
       Empty operator does literally nothing. To be used for example as 
placeholder for
       not-yet-implemented job.
       '''
   
       def __init__(self, *args, **kwargs):
           super().__init__(
               bash_command="echo 'No action'",
               *args, **kwargs)
   
   
   class WaitForOperator(BaseOperator):
       def __init__(self, *,
                    poll_interval = 0.5,
                    **kwargs):
   
           self.timeout_deadline = datetime.now() + timedelta(days = 1)
           self.poll_interval = timedelta(seconds=poll_interval)
   
           super().__init__(**kwargs)
   
       def execute(self, context):
           now = datetime.now()
           deadline = self.timeout_deadline
           if deadline < now:
               task_logger.info(f"Deadline time is in the past. Exiting.")
               return
   
           task_logger.info(f"Setting deadline to: {deadline}")
           time_diff = (deadline - now).total_seconds()
           self.xcom_push(context, key="timeout_rerun_sec", value=time_diff)
           self.xcom_push(context, key="start_time", value=now.isoformat())
           self.xcom_push(context, key="deadline", value=deadline.isoformat())
   
           self.deferred(context)
   
       def evaluate_condition(self):
           return False
   
       # pylint: disable=unused-argument
       def deferred(self, context, event=None):
           """deferred"""
   
           if self.evaluate_condition():
               return
   
           now = datetime.now()
           deadline = datetime.fromisoformat(self.xcom_pull(context, 
key="deadline"))
           task_logger.info(f"Woken from deferral; deadline: {deadline}")
           diff = deadline - now
           if diff >= timedelta(seconds=0):
               task_logger.info(f"Remaining time to deadline: {diff}")
           elif self.fail_on_timeout:
               raise RuntimeError(self.message_on_timeout)
           else:
               task_logger.info("Deadline reached, considering as SUCCESS")
               return
   
           defer_interval = min(self.poll_interval, deadline - now)
           task_logger.info(f"Deferring for {defer_interval} (until {now + 
defer_interval})")
           self.defer(trigger=TimeDeltaTrigger(defer_interval), 
method_name="deferred")
   
   
   # This sample is to be deleted soon
   with DAG(
       dag_id="wait_for_drill",
       default_args=None,
       description="Spawns several instances of Wait for in order to 
investigate PRISMA-10686",
       tags=["prisma-samples"],
   ) as dag:
       beg = EmptyOperator(dag=dag, task_id="BEGIN")
       end = EmptyOperator(dag=dag, task_id="END")
   
       for i in range(30):
           wait_for = WaitForOperator(dag=dag, task_id=f"WAITFOR_{i}")
           beg >> wait_for
   
           wait_for >> end
   
   
   
   ```
   
   But even with thise example I was not able to reproduce it.
   The issue probably relates huge complexity of our application involve more 
than 300 tasks spread in 30 dags.
   
   ### Operating System
   
   Red Hat Enterprise Linux 8.10 (Ootpa)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Local executor + Airflow is installed on Rhel machine.
   
   ### Anything else?
   
   We see the issue two times during one day of testing.
   We are sharing mysql server for Airflow and application itself. 
   We are using two airflow instances (each one is having itself database) over 
SQL alchemy.
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to