nesp159de opened a new issue, #46015:
URL: https://github.com/apache/airflow/issues/46015
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.6.3
### What happened?
We used defered operator waiting for something in several instances during
high mysql database server load.
We got following issue of airflow core two times:
```
Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1910, in _execute_context
self.dialect.do_execute(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py",
line 153, in execute
result = self._query(query)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py",
line 322, in _query
conn.query(q)
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
558, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
822, in _read_query_result
result.read()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
1200, in read
first_packet = self.connection._read_packet()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
772, in _read_packet
packet.raise_for_error()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/protocol.py", line
221, in raise_for_error
err.raise_mysql_exception(self._data)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/err.py",
line 143, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get
lock; try restarting transaction')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 333, in _execute
self._run_trigger_loop()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 362, in _run_trigger_loop
self.handle_events()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 390, in handle_events
Trigger.submit_event(trigger_id=trigger_id, event=event)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/api_internal/internal_api_call.py",
line 112, in wrapper
return func(*args, **kwargs)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/utils/session.py",
line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/gts/3pp/python/lib/python3.9/contextlib.py", line 126, in
__exit__
next(self.gen)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/airflow/utils/session.py",
line 37, in create_session
session.commit()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 1454, in commit
self._transaction.commit(_to_root=self.future)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 832, in commit
self._prepare_impl()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 811, in _prepare_impl
self.session.flush()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3449, in flush
self._flush(objects)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3589, in _flush
transaction.rollback(_capture_exception=True)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py",
line 70, in __exit__
compat.raise_(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/session.py",
line 3549, in _flush
flush_context.execute()
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 456, in execute
rec.execute(self)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py",
line 630, in execute
util.preloaded.orm_persistence.save_obj(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 237, in save_obj
_emit_update_statements(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py",
line 1001, in _emit_update_statements
c = connection._execute_20(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/sql/elements.py",
line 334, in _execute_on_connection
return connection._execute_clauseelement(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1577, in _execute_clauseelement
ret = self._execute_context(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1953, in _execute_context
self._handle_dbapi_exception(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 2134, in _handle_dbapi_exception
util.raise_(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/util/compat.py",
line 211, in raise_
raise exception
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/base.py",
line 1910, in _execute_context
self.dialect.do_execute(
File
"/opt/gts/3pp/airflow/lib/python3.9/site-packages/sqlalchemy/engine/default.py",
line 736, in do_execute
cursor.execute(statement, parameters)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py",
line 153, in execute
result = self._query(query)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/cursors.py",
line 322, in _query
conn.query(q)
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
558, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
822, in _read_query_result
result.read()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
1200, in read
first_packet = self.connection._read_packet()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/connections.py", line
772, in _read_packet
packet.raise_for_error()
File
"/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/protocol.py", line
221, in raise_for_error
err.raise_mysql_exception(self._data)
File "/opt/gts/3pp/python/lib/python3.9/site-packages/pymysql/err.py",
line 143, in raise_mysql_exception
raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213,
'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET state=%(state)s, updated_at=%(updated_at)s,
trigger_id=%(trigger_id)s, next_kwargs=%(next_kwargs)s
WHERE task_instance.dag_id = %(task_instance_dag_id)s
AND task_instance.task_id = %(task_instance_task_id)s
AND task_instance.run_id = %(task_instance_run_id)s
AND task_instance.map_index = %(task_instance_map_index)s]
[parameters: {
'state': <TaskInstanceState.SCHEDULED: 'scheduled'>,
'updated_at': datetime.datetime(2024, 12, 12, 10, 37, 41, 847579),
'trigger_id': None,
'next_kwargs': '{"__var": {"event": {"__var": 1733999860.954081,
"__type": "datetime"}}, "__type": "dict"}',
'task_instance_dag_id': 'AE_executor',
'task_instance_task_id': 'RFF.RFF_WAIT_FOR_IVS_DATA_CLEANSING_STATUS',
'task_instance_run_id': 'manual__2024-12-12T08:01:19+00:00',
'task_instance_map_index': -1
}]
```
### What you think should happen instead?
Recommended from mysql is re-trigger the operation as is already implemented
in airflow for example when fixing:
https://github.com/apache/airflow/issues/41428
### How to reproduce
I created code to trigger the function same way as the code raised the issue:
```
from operators.reschedule import RescheduleOperator
from airflow.operators.python import BaseOperator
from airflow.operators.bash import BashOperator
from operators.prisma_empty import PrismaEmptyOperator
from airflow import DAG
from datetime import datetime, timedelta
from common.logger import task_logger
from airflow.triggers.temporal import TimeDeltaTrigger
class EmptyOperator(BashOperator):
'''
Empty operator does literally nothing. To be used for example as
placeholder for
not-yet-implemented job.
'''
def __init__(self, *args, **kwargs):
super().__init__(
bash_command="echo 'No action'",
*args, **kwargs)
class WaitForOperator(BaseOperator):
def __init__(self, *,
poll_interval = 0.5,
**kwargs):
self.timeout_deadline = datetime.now() + timedelta(days = 1)
self.poll_interval = timedelta(seconds=poll_interval)
super().__init__(**kwargs)
def execute(self, context):
now = datetime.now()
deadline = self.timeout_deadline
if deadline < now:
task_logger.info(f"Deadline time is in the past. Exiting.")
return
task_logger.info(f"Setting deadline to: {deadline}")
time_diff = (deadline - now).total_seconds()
self.xcom_push(context, key="timeout_rerun_sec", value=time_diff)
self.xcom_push(context, key="start_time", value=now.isoformat())
self.xcom_push(context, key="deadline", value=deadline.isoformat())
self.deferred(context)
def evaluate_condition(self):
return False
# pylint: disable=unused-argument
def deferred(self, context, event=None):
"""deferred"""
if self.evaluate_condition():
return
now = datetime.now()
deadline = datetime.fromisoformat(self.xcom_pull(context,
key="deadline"))
task_logger.info(f"Woken from deferral; deadline: {deadline}")
diff = deadline - now
if diff >= timedelta(seconds=0):
task_logger.info(f"Remaining time to deadline: {diff}")
elif self.fail_on_timeout:
raise RuntimeError(self.message_on_timeout)
else:
task_logger.info("Deadline reached, considering as SUCCESS")
return
defer_interval = min(self.poll_interval, deadline - now)
task_logger.info(f"Deferring for {defer_interval} (until {now +
defer_interval})")
self.defer(trigger=TimeDeltaTrigger(defer_interval),
method_name="deferred")
# This sample is to be deleted soon
with DAG(
dag_id="wait_for_drill",
default_args=None,
description="Spawns several instances of Wait for in order to
investigate PRISMA-10686",
tags=["prisma-samples"],
) as dag:
beg = EmptyOperator(dag=dag, task_id="BEGIN")
end = EmptyOperator(dag=dag, task_id="END")
for i in range(30):
wait_for = WaitForOperator(dag=dag, task_id=f"WAITFOR_{i}")
beg >> wait_for
wait_for >> end
```
But even with thise example I was not able to reproduce it.
The issue probably relates huge complexity of our application involve more
than 300 tasks spread in 30 dags.
### Operating System
Red Hat Enterprise Linux 8.10 (Ootpa)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other
### Deployment details
Local executor + Airflow is installed on Rhel machine.
### Anything else?
We see the issue two times during one day of testing.
We are sharing mysql server for Airflow and application itself.
We are using two airflow instances (each one is having itself database) over
SQL alchemy.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]