potiuk opened a new issue #11788:
URL: https://github.com/apache/airflow/issues/11788


   Scheduler seems to be stucka and finaly exits with deadlock error (in mysql) 
in the current master setup in breeze
   
   **What happened**:
   
   When you run example dags in Breeze in current master you get "missing 
scheduler" warning and the scheduler seems to be stuck. In MySQL it also 
timeouts with Deadloc after a while. In Postrges it seems to be stuck 
indefinitely. 
   
   
   **How to reproduce it**:
   
   `./breeze  start-airflow --backend mysql --db-reset --load-example-dags`
   
   or
   
   `./breeze  start-airflow --backend postgres --db-reset --load-example-dags`
   
   
   When you try to run example dags they seem to start initially but they are 
left in the "SCHEDULED" state and do not continue to run. Scheduler logs stops 
at:
   
   ```
     ____________       _____________                                           
                                                                                
                                                                                
                                      
    ____    |__( )_________  __/__  /________      __                           
                                                                                
                                                                                
                                
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /                           
                                                                                
                                                                                
                                      
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /                            
                                                                                
                                                                                
                                      
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/                             
                                                                                
                                                                                
                                           
   [2020-10-23 16:53:08,823] {scheduler_job.py:1270} INFO - Starting the 
scheduler           
   [2020-10-23 16:53:08,823] {scheduler_job.py:1275} INFO - Processing each 
file at most -1 times                                                           
                                                                                
                                               
   [2020-10-23 16:53:08,824] {scheduler_job.py:1297} INFO - Resetting orphaned 
tasks for active dag runs
   [2020-10-23 16:53:08,831] {dag_processing.py:250} INFO - Launched 
DagFileProcessorManager with pid: 504                               
   [2020-10-23 16:53:08,836] {settings.py:49} INFO - Configured default 
timezone Timezone('UTC')                         
   /usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py:593: 
Warning: (1300, "Invalid utf8mb4 character string: '800495'")
     cursor.execute(statement, parameters)                                      
                                                         
   /usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py:322: Warning: 
(1300, "Invalid utf8mb4 character string: '800495'")                            
                                                                                
                                                
     rows += self.execute(sql + postfix)                                        
                                                                                
                                                                                
                                           
   [2020-10-23 16:53:29,490] {scheduler_job.py:976} INFO - 4 tasks up for 
execution:                                                                      
                                                                                
                                                 
           <TaskInstance: example_bash_operator.also_run_this 2020-10-21 
00:00:00+00:00 [scheduled]>                                                     
                                                                                
                                                  
           <TaskInstance: example_bash_operator.runme_0 2020-10-21 
00:00:00+00:00 [scheduled]>              
           <TaskInstance: example_bash_operator.runme_1 2020-10-21 
00:00:00+00:00 [scheduled]>            
           <TaskInstance: example_bash_operator.runme_2 2020-10-21 
00:00:00+00:00 [scheduled]>          
   [2020-10-23 16:53:29,491] {scheduler_job.py:1011} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 4 task instances 
ready to be queued
   [2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG 
example_bash_operator has 0/16 running and queued tasks
   [2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG 
example_bash_operator has 1/16 running and queued tasks
   [2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG 
example_bash_operator has 2/16 running and queued tasks                         
                                                                                
                                                           
   [2020-10-23 16:53:29,491] {scheduler_job.py:1038} INFO - DAG 
example_bash_operator has 3/16 running and queued tasks
   [2020-10-23 16:53:29,491] {scheduler_job.py:1090} INFO - Setting the 
following tasks to queued state:                                                
                                                                                
                                                   
           <TaskInstance: example_bash_operator.runme_0 2020-10-21 
00:00:00+00:00 [scheduled]>                                                     
                                                                                
                                                       
           <TaskInstance: example_bash_operator.runme_1 2020-10-21 
00:00:00+00:00 [scheduled]>                                                     
                                                                                
                                                       
           <TaskInstance: example_bash_operator.runme_2 2020-10-21 
00:00:00+00:00 [scheduled]>                                                     
                                                                                
                                                       
           <TaskInstance: example_bash_operator.also_run_this 2020-10-21 
00:00:00+00:00 [scheduled]>                                                     
                                                                                
                                                  
   [2020-10-23 16:53:29,493] {scheduler_job.py:1137} INFO - Sending 
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_0', 
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 3 and queue default          
   [2020-10-23 16:53:29,493] {base_executor.py:78} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', 
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/airflow/example_dags/example_bash_operator.py']    
   [2020-10-23 16:53:29,493] {scheduler_job.py:1137} INFO - Sending 
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_1', 
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 3 and queue default           
   [2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', 
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/airflow/example_dags/example_bash_operator.py']     
   [2020-10-23 16:53:29,494] {scheduler_job.py:1137} INFO - Sending 
TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_2', 
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 3 and queue default           
   [2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', 
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/airflow/example_dags/example_bash_operator.py']     
   [2020-10-23 16:53:29,494] {scheduler_job.py:1137} INFO - Sending 
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
execution_date=datetime.datetime(2020, 10, 21, 0, 0, tzinfo=Timezone('UTC')), 
try_number=1) to executor with priority 2 and queue default
   [2020-10-23 16:53:29,494] {base_executor.py:78} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', 
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/airflow/example_dags/example_bash_operator.py'
   ]                                                                            
                            
   [2020-10-23 16:53:29,494] {sequential_executor.py:57} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', 
'2020-10-21T00:00:00+00:00', '--local', '--pool', 'default_pool', '--subdir', 
'/opt/airflow/airflow/example_dags/example_bash_operator.p
   y']                                                                          
                                  
   [2020-10-23 16:53:31,139] {dagbag.py:436} INFO - Filling up the DagBag from 
/opt/airflow/airflow/example_dags/example_bash_operator.py
   ```
   
   In MySQL you get an extra deadlock exception after a while:
   
   ```
   Running <TaskInstance: example_bash_operator.runme_1 
2020-10-21T00:00:00+00:00 [scheduled]> on host 7216b1a127c3
   
^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[A^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B^[[B
   ^[[B^[[BTraceback (most recent call last):                                   
                            
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context                             
       cursor, statement, parameters, context                                   
                                                                                
                                                                                
                                           
     File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
593, in do_execute                                                              
                                                                                
                                     
       cursor.execute(statement, parameters)                                    
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
255, in execute                                                                 
                                                                                
                                               
       self.errorhandler(self, exc, value)                                      
                          
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 
50, in defaulterrorhandler        
       raise errorvalue                                                         
                             
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
252, in execute                 
       res = self._query(query)                                                 
               
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
378, in _query                                                                  
                                                                                
                                              
       db.query(q)                                                              
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 
280, in query                                                                   
                                                                                
                                           
       _mysql.connection.query(self, query)                                     
                                                                                
                                                                                
                                           
   _mysql_exceptions.OperationalError: (1205, 'Lock wait timeout exceeded; try 
restarting transaction')       
   
   
   The above exception was the direct cause of the following exception:         
                                                                                
                                                                                
                                
                                                                                
                                                                                
                                                                                
                                      
   Traceback (most recent call last):                                           
                                                                                
                                                                                
                                
     File "/usr/local/bin/airflow", line 33, in <module>                        
                                                                                
                                                                                
                                      
       sys.exit(load_entry_point('apache-airflow', 'console_scripts', 
'airflow')())                                                                   
                                                                                
                                                
     File "/opt/airflow/airflow/__main__.py", line 40, in main                  
                                                                                
                                                                                
                                           
       args.func(args)                                                          
             
     File "/opt/airflow/airflow/cli/cli_parser.py", line 53, in command         
                                                                                
                                                                                
                                           
       return func(*args, **kwargs)                                             
                        
     File "/opt/airflow/airflow/utils/cli.py", line 84, in wrapper              
                                                         
       return f(*args, **kwargs)                                                
                                         
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 206, in 
task_run                                                     
       _run_task_by_selected_method(args, dag, ti)                              
                                                         
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 59, in 
_run_task_by_selected_method                                                    
                                                                                
                                                
       _run_task_by_local_task_job(args, ti)                                    
                                                                                
                                                                                
                                           
     File "/opt/airflow/airflow/cli/commands/task_command.py", line 113, in 
_run_task_by_local_task_job                                                     
                                                                                
                                               
       run_job.run()                                                            
                                                                                
                                                                                
                                           
     File "/opt/airflow/airflow/jobs/base_job.py", line 245, in run             
                            
       self._execute()                                                          
                          
     File "/opt/airflow/airflow/jobs/local_task_job.py", line 91, in _execute   
                        
       pool=self.pool):                                                         
                                                                                
            
     File "/opt/airflow/airflow/utils/session.py", line 63, in wrapper          
                                       
       return func(*args, **kwargs)                                             
                                       
     File "/opt/airflow/airflow/models/taskinstance.py", line 945, in 
check_and_change_state_before_execution                                         
                                                                                
                                                     
       self.refresh_from_db(session=session, lock_for_update=True)              
                                       
     File "/opt/airflow/airflow/utils/session.py", line 59, in wrapper          
                                                                                
                                                                                
                                           
       return func(*args, **kwargs)                                             
                                                                                
                                                                                
                                          
     File "/opt/airflow/airflow/models/taskinstance.py", line 534, in 
refresh_from_db                                                                 
                                                                                
                                                    
       ti = qry.with_for_update().first()                                       
                                                                                
                                                                                
                                          
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", 
line 3402, in first                                                             
                                                                                
                                               
       ret = list(self[0:1])                                                    
                                                                                
                                                                                
                                          
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", 
line 3176, in __getitem__                                                       
                                                                                
                                              
       return list(res)                                                         
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", 
line 3508, in __iter__                                                          
                                                                                
                                               
       return self._execute_and_instances(context)                              
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", 
line 3533, in _execute_and_instances                                            
                                                                                
                                               
       result = conn.execute(querycontext.statement, self._params)              
                                                                                
                                                                                
                                      
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute                                                           
                                                                                
                                             
       return meth(self, multiparams, params)                                   
                            
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection                                             
                                                                                
                                            
       return connection._execute_clauseelement(self, multiparams, params)      
                                  
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement                       
       distilled_params,                                                        
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
       e, statement, parameters, cursor, context                                
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception                                           
                                                                                
                                             
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e                
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_                                                             
                                                                                
                                             
       raise exception                                                          
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context      
       cursor, statement, parameters, context                                   
                            
     File 
"/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 
593, in do_execute          
       cursor.execute(statement, parameters)                                    
                                                                                
                                                                                
                                          
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
255, in execute                                                                 
                                                                                
                                               
       self.errorhandler(self, exc, value)                                      
                                                                                
                                                                                
                                           
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 
50, in defaulterrorhandler                                                      
                                                                                
                                           
       raise errorvalue                                                         
                                                                                
                                                                                
                                          
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
252, in execute                                                                 
                                                                                
                                               
       res = self._query(query)                                                 
                                                                                
                                                                                
                                          
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 
378, in _query                                                                  
                                                                                
                                              
       db.query(q)                                                              
                                   
     File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 
280, in query                                                                   
                                                                                
                                           
       _mysql.connection.query(self, query)                                     
                            
   sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1205, 
'Lock wait timeout exceeded; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.execution_date AS 
task_instance_execution_date, task_instance.start_date AS 
task_instance_start_date, tas
   k_instance.end_date AS task_instance_end_date, task_instance.duration AS 
task_instance_duration, task_instance.state AS task_instance_state, 
task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS 
task_instance_hostname, task_instance.unixname AS task_instan
   ce_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.ope
   rator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id AS 
task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, 
task_instance.executor_config AS task_instance_executor_config, 
task_instance.externa
   l_executor_id AS task_instance_external_executor_id                          
                          
   FROM task_instance                                                           
                                  
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND 
task_instance.execution_date = %s      
    LIMIT %s FOR UPDATE]                                                        
                            
   [parameters: ('example_bash_operator', 'runme_1', datetime.datetime(2020, 
10, 21, 0, 0), 1)]
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to