Bruschkov opened a new issue #20956:
URL: https://github.com/apache/airflow/issues/20956


   ### Apache Airflow version
   
   2.1.1
   
   ### What happened
   
   Scheduler regularly crashes with error messages like this:
   
   ```
   MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 
'some-ETL-2022-01-19 14:00:00.000000' for key 'dag_id'")                        
                                                                                
                        
                                                                                
                                                                                
                                                                                
                
   The above exception was the direct cause of the following exception:         
                                                                                
                                                                                
                
                                                                                
                                                                                
                                                                                
                
   Traceback (most recent call last):                                           
                                                                                
                                                                                
                
     File "/home/airflow/.local/bin/airflow", line 8, in <module>               
                                                                                
                                                                                
                
       sys.exit(main())                                                         
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 
40, in main                                                                     
                                                                                
         
       args.func(args)                                                          
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", 
line 48, in command                                                             
                                                                                
        
       return func(*args, **kwargs)                                             
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 
91, in wrapper                                                                  
                                                                                
        
       return f(*args, **kwargs)                                                
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py",
 line 64, in scheduler                                                          
                                                                         
       job.run()                                                                
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", 
line 237, in run                                                                
                                                                                
         
       self._execute()                                                          
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
 line 1303, in _execute                                                         
                                                                                
     
       self._run_scheduler_loop()                                               
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
 line 1396, in _run_scheduler_loop                                              
                                                                                
     
       num_queued_tis = self._do_scheduling(session)                            
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
 line 1492, in _do_scheduling                                                   
                                                                                
     
       self._create_dagruns_for_dags(guard, session)                            
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 76, in wrapped_function                                                    
                                                                                
         
       for attempt in run_with_db_retries(max_retries=retries, logger=logger, 
**retry_kwargs):                                                                
                                                                                
                  
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
390, in __iter__                                                                
                                                                                
        
       do = self.iter(retry_state=retry_state)                                  
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/tenacity/__init__.py", line 
356, in iter                                                                    
                                                                                
        
       return fut.result()                                                      
                                                                                
                                                                                
                
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 437, in 
result                                                                          
                                                                                
                 
       return self.__get_result()                                               
                                                                                
                                                                                
                
     File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in 
__get_result                                                                    
                                                                                
                 
       raise self._exception                                                    
                                                                                
                                                                                
                
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/retries.py", 
line 85, in wrapped_function                                                    
                                                                                
         
       return func(*args, **kwargs)                               
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
 line 1583, in _create_dagruns_for_dags                                         
                                                                                
     
       self._create_dag_runs(query.all(), session)                              
                                                 
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py",
 line 1625, in _create_dag_runs                                                 
                                                                                
     
       run = dag.create_dagrun(                                   
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", 
line 67, in wrapper                                                             
                                                                                
         
       return func(*args, **kwargs)                               
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/dag.py", line 
1796, in create_dagrun                                                          
                                                                                
       
       session.flush()                                            
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 2523, in flush                                                             
                                                                                
        
       self._flush(objects)                                       
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 2664, in _flush                                                            
                                                                                
        
       transaction.rollback(_capture_exception=True)                            
                                                 
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py",
 line 68, in __exit__                                                           
                                                                                
    
       compat.raise_(                                             
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", 
line 178, in raise_                                                             
                                                                                
        
       raise exception                                            
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", 
line 2624, in _flush                                                            
                                                                                
        
       flush_context.execute()                                    
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py",
 line 422, in execute        
   ...  
   ```
   
   ### What you expected to happen
   
   We would expect these errors not to occur. According to 
https://github.com/apache/airflow/issues/9148 and 
https://github.com/apache/airflow/issues/13925 this issue should have been 
fixed a couple of versions ago.
   
   ### How to reproduce
   
   _No response_
   
   ### Operating System
   
   kubernetes
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-amazon==2.0.0
   apache-airflow-providers-celery==2.0.0
   apache-airflow-providers-cncf-kubernetes==2.0.0
   apache-airflow-providers-docker==2.0.0
   apache-airflow-providers-elasticsearch==2.0.1
   apache-airflow-providers-ftp==2.0.0
   apache-airflow-providers-google==4.0.0
   apache-airflow-providers-grpc==2.0.0
   apache-airflow-providers-hashicorp==2.0.0
   apache-airflow-providers-http==2.0.0
   apache-airflow-providers-imap==2.0.0
   apache-airflow-providers-microsoft-azure==3.0.0
   apache-airflow-providers-mysql==2.0.0
   apache-airflow-providers-odbc==2.0.0
   apache-airflow-providers-postgres==2.0.0
   apache-airflow-providers-redis==2.0.0
   apache-airflow-providers-sendgrid==2.0.0
   apache-airflow-providers-sftp==2.0.0
   apache-airflow-providers-slack==4.0.0
   apache-airflow-providers-sqlite==2.0.0
   apache-airflow-providers-ssh==2.0.0
   
   ```
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   deployed via https://artifacthub.io/packages/helm/airflow-helm/airflow/8.5.0 
to kubernetes cluster (kubernetes 1.18).
   
   Backend is a mariaDB (10.3.31)
   
   Docker image used as base image: apache/airflow:2.1.1-python3.8
   
   Additional python dependencies installed: 
   ```
   airflow-exporter==1.5.2
   boto3==1.18.58
   s3fs==0.4.*
   pandas==1.3.3
   sqlalchemy==1.3.18
   sqlalchemy-redshift==0.8.2
   smart_open[aws]==2.1.*
   # Use PyMySQL as dialect to fix SSL connection error
   PyMySQL==1.0.2
   ```
   
   Relevant parts of the airflow configuration:
   ```
   airflow:
     config:
       # [core]
       AIRFLOW__CORE__PARALLELISM: "24"
       AIRFLOW__CORE__DAG_CONCURRENCY: "20"
       AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "1"
       AIRFLOW__CORE__LOAD_EXAMPLES: "False"
       AIRFLOW__CORE__STORE_SERIALIZED_DAGS: "False"
   ```
   
   ### Anything else
   
   Between 1 and 10 scheduler restarts per hour on average with the above error 
message.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to