deepaktripathi1997 opened a new issue, #37657:
URL: https://github.com/apache/airflow/issues/37657

   ### Apache Airflow version
   
   2.8.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   I encountered a deadlock error when trying to execute a task in Airflow. The 
full error message is as follows:
   
   `Traceback (most recent call last):
     File "/home/airflow/.local/bin/airflow", line 8, in <module>
       sys.exit(main())
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 
57, in main
       args.func(args)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_config.py", 
line 49, in command
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
114, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/providers_configuration_loader.py",
 line 55, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 67, in scheduler
       run_command_with_daemon_option(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/daemon_utils.py",
 line 86, in run_command_with_daemon_option
       callback()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 70, in <lambda>
       callback=lambda: _run_scheduler_job(args),
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 52, in _run_scheduler_job
       run_job(job=job_runner.job, execute_callable=job_runner._execute)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 79, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line 
393, in run_job
       return execute_job(job, execute_callable=execute_callable)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/job.py", line 
422, in execute_job
       ret = execute_callable()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 855, in _execute
       self._run_scheduler_loop()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 987, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1061, in _do_scheduling
       self._create_dagruns_for_dags(guard, session)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 91, in wrapped_function
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
347, in __iter__
       do = self.iter(retry_state=retry_state)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
314, in iter
       return fut.result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in 
result
       return self.__get_result()
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in 
__get_result
       raise self._exception
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 100, in wrapped_function
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1133, in _create_dagruns_for_dags
       self._create_dag_runs(non_dataset_dags, session)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job_runner.py",
 line 1167, in _create_dag_runs
       dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 487, in __get__
       return self.impl.get(state, dict_)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 959, in get
       value = self._fire_loader_callables(state, key, passive)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 990, in _fire_loader_callables
       return state._load_expired(state, passive)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/state.py", 
line 712, in _load_expired
       self.manager.expired_attribute_loader(self, toload, passive)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", 
line 1451, in load_scalar_attributes
       result = load_on_ident(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", 
line 407, in load_on_ident
       return load_on_pk_identity(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/loading.py", 
line 530, in load_on_pk_identity
       session.execute(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 1716, in execute
       conn = self._connection_for_bind(bind)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 1555, in _connection_for_bind
       return self._transaction._connection_for_bind(
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 724, in _connection_for_bind
       self._assert_active()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 604, in _assert_active
       raise sa_exc.PendingRollbackError(
   sqlalchemy.exc.PendingRollbackError: This Session's transaction has been 
rolled back due to a previous exception during flush. To begin a new 
transaction with this Session, first issue Session.rollback(). Original 
exception was: (MySQLdb.OperationalError) (1213, 'Deadlock found when trying to 
get lock; try restarting transaction')
   [SQL: INSERT INTO dag_run (dag_id, queued_at, execution_date, start_date, 
end_date, state, run_id, creating_job_id, external_trigger, run_type, conf, 
data_interval_start, data_interval_end, last_scheduling_decision, dag_hash, 
log_template_id, updated_at, clear_number) VALUES (%s, %s, %s, %s, %s, %s, %s, 
%s, %s, %s, %s, %s, %s, %s, %s, (SELECT max(log_template.id) AS max_1 
   FROM log_template), %s, %s)]
   [parameters: ('realtime_payubiz_partner_apiV2_partner_refund_transactions', 
datetime.datetime(2024, 2, 23, 15, 6, 38, 564549), datetime.datetime(2024, 2, 
23, 14, 15), None, None, <DagRunState.QUEUED: 'queued'>, 
'scheduled__2024-02-23T14:15:00+00:00', 5703314, 0, <DagRunType.SCHEDULED: 
'scheduled'>, b'\x80\x05}\x94.', datetime.datetime(2024, 2, 23, 14, 15), 
datetime.datetime(2024, 2, 23, 14, 45), None, 
'7b50292c2f5c842c2dcffe8abdfe30a3', datetime.datetime(2024, 2, 23, 15, 6, 38, 
567950), 0)]
   (Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on 
this error at: https://sqlalche.me/e/14/7s2a)`
   
   
   Some of the tasks are failing after waiting for 5 minutes to 1 hour
   <img width="1460" alt="image" 
src="https://github.com/apache/airflow/assets/25430062/f21b17f4-06f6-418d-83f4-3a2877b5e313";>
   
   Inconsistencies Run Duration
   Sometime the same dag is running withing 1 minute
   And sometime it is taking more than 10 minutes 
   Most of the time stuck on a task with these logs:
   
   
   
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Unfortunately, I am unable to provide exact steps to reproduce the issue as 
it occurs sporadically and without any direct intervention from my side. It 
seems to happen under normal operation of Airflow when tasks are being 
executed, but there is no clear pattern or specific task that consistently 
triggers the deadlock error.
   
   ### Operating System
   
   CentOS Linux 7 (Core)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   Docker Image: apache/airflow:2.8.1
   Kubernetes Deployment with CeleryExecutor
   
   Database: MySQL 8
   DB Config:
   ```cisco_ios
   sql_engine_encoding = utf-8
   sql_alchemy_pool_enabled = True
   sql_alchemy_pool_size = 500
   sql_alchemy_max_overflow = 200
   sql_alchemy_pool_recycle = 3600
   sql_alchemy_pool_pre_ping = True
   sql_alchemy_schema = 
   load_default_connections = False
   max_db_retries = 3
   ```
   
   Scheduler Pods : 
   Memory: 3GB
   CPU: 4
   Min Replicas: 5
   Max Replicas: 8
   
   Webserver Pods:
   Memory: 4GB
   CPU: 1
   Replicas: 5
   
   Worker Pods:
   Memory: 13.5GB
   CPU: 3.5
   MinReplicas: 12
   MaxReplicas:18
   
   
   Worker Configuration:
   
   ```cisco_ios
   [celery]
   celery_app_name = airflow.executors.celery_executor
   # worker_concurrency = 48
   worker_autoscale = 64, 32
   worker_prefetch_multiplier = 1
   worker_enable_remote_control = True
   broker_url = redis://localhost:6379/0
   flower_host = 0.0.0.0
   flower_url_prefix = /flower
   flower_port = 5555
   flower_basic_auth = admin,admin
   sync_parallelism = 0
   celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
   ssl_active = False
   ssl_key = 
   ssl_cert = 
   ssl_cacert = 
   pool = prefork
   operation_timeout = 1.0
   task_track_started = True
   task_adoption_timeout = 600
   stalled_task_timeout = 0
   task_publish_max_retries = 3
   worker_precheck = False
   worker_umask = 0o077
   task_acks_late = False
   
   [celery_broker_transport_options]
   visibility_timeout = 3600
   ```
   
   Scheduler Configuration
   
   ```cisco_ios
   [scheduler]
   job_heartbeat_sec = 5
   local_task_job_heartbeat_sec = 10
   scheduler_heartbeat_sec = 3
   num_runs = -1
   scheduler_idle_sleep_time = 1
   min_file_process_interval = 600
   parsing_cleanup_interval = 60
   dag_dir_list_interval = 600
   print_stats_interval = 30
   pool_metrics_interval = 5.0
   scheduler_health_check_threshold = 30
   enable_health_check = True
   scheduler_health_check_server_port = 8974
   orphaned_tasks_check_interval = 300.0
   child_process_log_directory = /opt/airflow/dags/efs_dags/logs/scheduler
   scheduler_zombie_task_threshold = 240
   zombie_detection_interval = 300.0
   catchup_by_default = True
   ignore_first_depends_on_past_by_default = True
   max_tis_per_query = 256
   use_row_level_locking = True
   max_dagruns_to_create_per_loop = 64
   max_dagruns_per_loop_to_schedule = 64
   schedule_after_task_execution = True
   parsing_processes = 4
   file_parsing_sort_mode = modified_time
   standalone_dag_processor = False
   max_callbacks_per_loop = 64
   dag_stale_not_seen_duration = 600
   use_job_schedule = True
   allow_trigger_in_future = False
   trigger_timeout_check_interval = 15
   deactivate_stale_dags_interval = 60
   dependency_detector = 
airflow.serialization.serialized_objects.DependencyDetector
   task_queued_timeout = 60
   ```
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to