KulykDmytro opened a new issue #18843:
URL: https://github.com/apache/airflow/issues/18843


   ### Apache Airflow version
   
   2.1.4 (latest released)
   
   ### Operating System
   
   Linux 5.4.149-73.259.amzn2.x86_64
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   AWS EKS over own helm chart
   
   ### What happened
   
   We have an issue back from 2.0.x #13504
   Each time scheduler is restarted it become failed with error
   ```
   [2021-10-08 20:19:40,683] {kubernetes_executor.py:761} INFO - Shutting down 
Kubernetes executor                                                             
                               
   [2021-10-08 20:19:41,705] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 32                                                      
                                           
   [2021-10-08 20:19:42,207] {process_utils.py:207} INFO - Waiting up to 5 
seconds for processes to exit...                                                
                                   
   [2021-10-08 20:19:42,223] {process_utils.py:66} INFO - Process 
psutil.Process(pid=32, status='terminated', exitcode=0, started='20:19:40') 
(32) terminated with exit code 0                
   [2021-10-08 20:19:42,225] {process_utils.py:66} INFO - Process 
psutil.Process(pid=40, status='terminated', started='20:19:40') (40) terminated 
with exit code None                         
   [2021-10-08 20:19:42,226] {process_utils.py:66} INFO - Process 
psutil.Process(pid=36, status='terminated', started='20:19:40') (36) terminated 
with exit code None                         
   [2021-10-08 20:19:42,226] {scheduler_job.py:722} INFO - Exited execute loop  
                                                                                
                              
   Traceback (most recent call last):                                           
                                                                                
                              
     File "/home/airflow/.local/bin/airflow", line 8, in <module>               
                                                                                
                              
       sys.exit(main())                                                         
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 
40, in main                                                                     
                       
       args.func(args)                                                          
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 48, in command                                                             
                      
       return func(*args, **kwargs)                                             
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 
92, in wrapper                                                                  
                      
       return f(*args, **kwargs)                                                
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py",
 line 70, in scheduler                                                          
       
       job.run()                                                                
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", 
line 245, in run                                                                
                       
       self._execute()                                                          
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 695, in _execute                                                          
                   
       self._run_scheduler_loop()                                               
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 788, in _run_scheduler_loop                                               
                   
       num_queued_tis = self._do_scheduling(session)                            
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 927, in _do_scheduling                                                    
                   
       num_queued_tis = 
self._critical_section_execute_task_instances(session=session)                  
                                                                                
      
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 551, in _critical_section_execute_task_instances                          
                   
       queued_tis = self._executable_task_instances_to_queued(max_tis, 
session=session)                                                                
                                       
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 67, in wrapper                                                             
                       
       return func(*args, **kwargs)                                             
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py",
 line 431, in _executable_task_instances_to_queued                              
                   
       serialized_dag = self.dagbag.get_dag(dag_id, session=session)            
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 67, in wrapper                                                             
                       
       return func(*args, **kwargs)                                             
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 186, in get_dag                                                            
                       
       self._add_dag_from_db(dag_id=dag_id, session=session)                    
                                                                                
                              
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 258, in _add_dag_from_db                                                   
                       
       raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag 
table")                                                                         
                              
   airflow.exceptions.SerializedDagNotFound: DAG 'aws_transforms_player_hourly' 
not found in serialized_dag table                                               
                              
   ```
   causing All DAGs to be absent in serialized_dag table
   ```
   Python version: 3.9.7
   Airflow version: 2.1.4
   Node: airflow-webserver-7b45758f99-rk8dg
   
-------------------------------------------------------------------------------
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
2447, in wsgi_app
       response = self.full_dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1952, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1821, in handle_user_exception
       reraise(exc_type, exc_value, tb)
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/_compat.py", 
line 39, in reraise
       raise value
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1950, in full_dispatch_request
       rv = self.dispatch_request()
     File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 
1936, in dispatch_request
       return self.view_functions[rule.endpoint](**req.view_args)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 
49, in decorated
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", 
line 97, in view_func
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", 
line 60, in wrapper
       return f(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 
2027, in tree
       dag = current_app.dag_bag.get_dag(dag_id)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", 
line 70, in wrapper
       return func(*args, session=session, **kwargs)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 186, in get_dag
       self._add_dag_from_db(dag_id=dag_id, session=session)
     File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", 
line 258, in _add_dag_from_db
       raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag 
table")
   airflow.exceptions.SerializedDagNotFound: DAG 'canary_dag' not found in 
serialized_dag table
   ```
   
   ### What you expected to happen
   
   Scheduler shouldn't fail
   
   ### How to reproduce
   
   restart scheduler pod
   observe its failure
   open dag in webserver
   observe an error
   
   ### Anything else
   
   issue is temporary gone when i've run "serialize" script from webserver pod 
until next scheduler reboot
   ```python
   from airflow.models import DagBag
   from airflow.models.serialized_dag import SerializedDagModel
   
   dag_bag = DagBag()
   
   # Check DB for missing serialized DAGs, and add them if missing
   for dag_id in dag_bag.dag_ids:
       if not SerializedDagModel.get(dag_id):
           dag = dag_bag.get_dag(dag_id)
           SerializedDagModel.write_dag(dag)
   
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to