deepaktripathi1997 opened a new issue, #37657:
URL: https://github.com/apache/airflow/issues/37657
### Apache Airflow version
2.8.1
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
I encountered a deadlock error when trying to execute a task in Airflow. The
full error message is as follows:
`Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line
57, in main
args.func(args)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py",
line 49, in command
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line
114, in wrapper
return f(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/providers_configuration_loader.py",
line 55, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 67, in scheduler
run_command_with_daemon_option(
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/daemon_utils.py",
line 86, in run_command_with_daemon_option
callback()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 70, in <lambda>
callback=lambda: _run_scheduler_job(args),
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
line 52, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py",
line 79, in wrapper
return func(*args, session=session, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line
393, in run_job
return execute_job(job, execute_callable=execute_callable)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line
422, in execute_job
ret = execute_callable()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 855, in _execute
self._run_scheduler_loop()
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 987, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1061, in _do_scheduling
self._create_dagruns_for_dags(guard, session)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py",
line 91, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger,
**retry_kwargs):
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
347, in __iter__
do = self.iter(retry_state=retry_state)
File
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line
314, in iter
return fut.result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in
result
return self.__get_result()
File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in
__get_result
raise self._exception
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py",
line 100, in wrapped_function
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1133, in _create_dagruns_for_dags
self._create_dag_runs(non_dataset_dags, session)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
line 1167, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
line 487, in __get__
return self.impl.get(state, dict_)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
line 959, in get
value = self._fire_loader_callables(state, key, passive)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
line 990, in _fire_loader_callables
return state._load_expired(state, passive)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/state.py",
line 712, in _load_expired
self.manager.expired_attribute_loader(self, toload, passive)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 1451, in load_scalar_attributes
result = load_on_ident(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 407, in load_on_ident
return load_on_pk_identity(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py",
line 530, in load_on_pk_identity
session.execute(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1716, in execute
conn = self._connection_for_bind(bind)
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 1555, in _connection_for_bind
return self._transaction._connection_for_bind(
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 724, in _connection_for_bind
self._assert_active()
File
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py",
line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been
rolled back due to a previous exception during flush. To begin a new
transaction with this Session, first issue Session.rollback(). Original
exception was: (MySQLdb.OperationalError) (1213, 'Deadlock found when trying to
get lock; try restarting transaction')
[SQL: INSERT INTO dag_run (dag_id, queued_at, execution_date, start_date,
end_date, state, run_id, creating_job_id, external_trigger, run_type, conf,
data_interval_start, data_interval_end, last_scheduling_decision, dag_hash,
log_template_id, updated_at, clear_number) VALUES (%s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1
FROM log_template), %s, %s)]
[parameters: ('realtime_payubiz_partner_apiV2_partner_refund_transactions',
datetime.datetime(2024, 2, 23, 15, 6, 38, 564549), datetime.datetime(2024, 2,
23, 14, 15), None, None, <DagRunState.QUEUED: 'queued'>,
'scheduled__2024-02-23T14:15:00+00:00', 5703314, 0, <DagRunType.SCHEDULED:
'scheduled'>, b'\x80\x05}\x94.', datetime.datetime(2024, 2, 23, 14, 15),
datetime.datetime(2024, 2, 23, 14, 45), None,
'7b50292c2f5c842c2dcffe8abdfe30a3', datetime.datetime(2024, 2, 23, 15, 6, 38,
567950), 0)]
(Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on
this error at: https://sqlalche.me/e/14/7s2a)`
Some of the tasks are failing after waiting for 5 minutes to 1 hour
<img width="1460" alt="image"
src="https://github.com/apache/airflow/assets/25430062/f21b17f4-06f6-418d-83f4-3a2877b5e313">
Inconsistencies Run Duration
Sometime the same dag is running withing 1 minute
And sometime it is taking more than 10 minutes
Most of the time stuck on a task with these logs:
### What you think should happen instead?
_No response_
### How to reproduce
Unfortunately, I am unable to provide exact steps to reproduce the issue as
it occurs sporadically and without any direct intervention from my side. It
seems to happen under normal operation of Airflow when tasks are being
executed, but there is no clear pattern or specific task that consistently
triggers the deadlock error.
### Operating System
CentOS Linux 7 (Core)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
Docker Image: apache/airflow:2.8.1
Kubernetes Deployment with CeleryExecutor
Database: MySQL 8
DB Config:
```cisco_ios
sql_engine_encoding = utf-8
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 500
sql_alchemy_max_overflow = 200
sql_alchemy_pool_recycle = 3600
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
load_default_connections = False
max_db_retries = 3
```
Scheduler Pods :
Memory: 3GB
CPU: 4
Min Replicas: 5
Max Replicas: 8
Webserver Pods:
Memory: 4GB
CPU: 1
Replicas: 5
Worker Pods:
Memory: 13.5GB
CPU: 3.5
MinReplicas: 12
MaxReplicas:18
Worker Configuration:
```cisco_ios
[celery]
celery_app_name = airflow.executors.celery_executor
# worker_concurrency = 48
worker_autoscale = 64, 32
worker_prefetch_multiplier = 1
worker_enable_remote_control = True
broker_url = redis://localhost:6379/0
flower_host = 0.0.0.0
flower_url_prefix = /flower
flower_port = 5555
flower_basic_auth = admin,admin
sync_parallelism = 0
celery_config_options =
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_adoption_timeout = 600
stalled_task_timeout = 0
task_publish_max_retries = 3
worker_precheck = False
worker_umask = 0o077
task_acks_late = False
[celery_broker_transport_options]
visibility_timeout = 3600
```
Scheduler Configuration
```cisco_ios
[scheduler]
job_heartbeat_sec = 5
local_task_job_heartbeat_sec = 10
scheduler_heartbeat_sec = 3
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 600
parsing_cleanup_interval = 60
dag_dir_list_interval = 600
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = True
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/dags/efs_dags/logs/scheduler
scheduler_zombie_task_threshold = 240
zombie_detection_interval = 300.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 256
use_row_level_locking = True
max_dagruns_to_create_per_loop = 64
max_dagruns_per_loop_to_schedule = 64
schedule_after_task_execution = True
parsing_processes = 4
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 64
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15
deactivate_stale_dags_interval = 60
dependency_detector =
airflow.serialization.serialized_objects.DependencyDetector
task_queued_timeout = 60
```
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]