pasan1992 opened a new issue #18908:
URL: https://github.com/apache/airflow/issues/18908


   ### Apache Airflow version
   
   2.1.2
   
   ### Operating System
   
   CentOS Linux version 7
   
   ### Versions of Apache Airflow Providers
   
   Airflow 2.1.2
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Deployment 
   >Docker engine community v20.10.9
   >Docker-compose version 3
   >API version 1.41
   >PostgreSQL - 9.2.24
   
   Airflow
   > local executor with Postgres DB
   > Non of the dags are scheduled at the start. 
   > All the dags are enabled at the start. 
   
   
   
   ### What happened
   
   During the initialization of the airflow container, we experience random 
airflow scheduler failures. Airflow Webserver starts without any issue but the 
scheduler initialization terminates with the following error. 
   
   ```
   [] {scheduler_job.py:1315} ERROR - Exception when executing 
SchedulerJob._run_scheduler_loop
   | Traceback (most recent call last):
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
   | cursor, statement, parameters, context
   | File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
   | cursor.execute(statement, parameters)
   | psycopg2.OperationalError: server closed the connection unexpectedly
   | This probably means the server terminated abnormally
   | before or while processing the request.
   
   | The above exception was the direct cause of the following exception:
   |
   | Traceback (most recent call last):
   | File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1299, in _execute
   | self._run_scheduler_loop()
   | File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1392, in _run_scheduler_loop
   | num_queued_tis = self._do_scheduling(session)
   | File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1552, in _do_scheduling
   | num_queued_tis = 
self._critical_section_execute_task_instances(session=session)
   | File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1157, in _critical_section_execute_task_instances
   | queued_tis = self._executable_task_instances_to_queued(max_tis, 
session=session)
   | File "/usr/local/lib/python3.7/site-packages/airflow/utils/session.py", 
line 67, in wrapper
   | return func(*args, **kwargs)
   | File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
949, in _executable_task_instances_to_queued
   | **skip_locked(session=session),
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3373, in all
   | return list(self)
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3535, in __iter__
   | return self._execute_and_instances(context)
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3560, in _execute_and_instances
   | result = conn.execute(querycontext.statement, self._params)
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1011, in execute
   | return meth(self, multiparams, params)
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", 
line 298, in _execute_on_connection
   | return connection._execute_clauseelement(self, multiparams, params)
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1130, in _execute_clauseelement
   | distilled_params,
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1317, in _execute_context
   | e, statement, parameters, cursor, context
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1511, in _handle_dbapi_exception
   | sqlalchemy_exception, with_traceback=exc_info[2], from_=e
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
   | raise exception
   | File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1277, in _execute_context
   | cursor, statement, parameters, context
   | File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 
608, in do_execute
   | cursor.execute(statement, parameters)
   | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) server closed 
the connection unexpectedly
   | This probably means the server terminated abnormally
   | before or while processing the request.
   |
   
   
   | [SQL: SELECT task_instance.try_number AS task_instance_try_number, 
task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS 
task_instance_dag_id, task_instance.execution_date AS 
task_instance_execution_date, task_instance.start_date AS 
task_instance_start_date, task_instance.end_date AS task_instance_end_date, 
task_instance.duration AS task_instance_duration, task_instance.state AS 
task_instance_state, task_instance.max_tries AS task_instance_max_tries, 
task_instance.hostname AS task_instance_hostname, task_instance.unixname AS 
task_instance_unixname, task_instance.job_id AS task_instance_job_id, 
task_instance.pool AS task_instance_pool, task_instance.pool_slots AS 
task_instance_pool_slots, task_instance.queue AS task_instance_queue, 
task_instance.priority_weight AS task_instance_priority_weight, 
task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS 
task_instance_queued_dttm, task_instance.queued_by_job_id AS 
task_instance_queued_by_job_id
 , task_instance.pid AS task_instance_pid, task_instance.executor_config AS 
task_instance_executor_config, task_instance.external_executor_id AS 
task_instance_external_executor_id
   | FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = 
dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date JOIN 
dag ON task_instance.dag_id = dag.dag_id
   | WHERE (dag_run.run_id IS NULL OR dag_run.run_type != %(run_type_1)s) AND 
NOT dag.is_paused AND task_instance.state = %(state_1)s ORDER BY 
-task_instance.priority_weight, task_instance.execution_date
   | LIMIT %(param_1)s FOR UPDATE OF task_instance SKIP LOCKED]
   | [parameters: {'run_type_1': <DagRunType.BACKFILL_JOB: 'backfill'>, 
'state_1': 'scheduled', 'param_1': 4}]
   | (Background on this error at: http://sqlalche.me/e/13/e3q8)
   ```
   |
   
   ### What you expected to happen
   
   I was not able to see any logs related to the connection issues in the 
PostgreSQL service. 
   
   ### How to reproduce
   
   This only happens during scheduler initialization and at random. I was not 
able to find a direct way to reproduce the issue. Did not find any direct link 
between high resource allocation and this issue.
   
   ### Anything else
   
   As mention above, this is a random issue that only happens during the 
airflow container initialization. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to