wolfier opened a new issue #15496:
URL: https://github.com/apache/airflow/issues/15496


   **Apache Airflow version**: 2.0.0
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   ```
   Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.3", 
GitCommit:"1e11e4a2108024935ecfcb2912226cedeafd99df", GitTreeState:"clean", 
BuildDate:"2020-10-14T12:50:19Z", GoVersion:"go1.15.2", Compiler:"gc", 
Platform:"darwin/amd64"}
   Server Version: version.Info{Major:"1", Minor:"18+", 
GitVersion:"v1.18.16-gke.502", 
GitCommit:"a2a88ab32201dca596d0cdb116bbba3f765ebd36", GitTreeState:"clean", 
BuildDate:"2021-03-08T22:06:24Z", GoVersion:"go1.13.15b4", Compiler:"gc", 
Platform:"linux/amd64"}
   ```
   
   **What happened**:
   
   I am experiencing scheduler restarts when I encounter an 
`InvalidatePoolError`.
   
   ```
   [2021-04-20 00:12:15,444] {scheduler_job.py:1305} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 817, in _checkout
       raise exc.InvalidatePoolError()
   sqlalchemy.exc.InvalidatePoolError: ()
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2336, in _wrap_pool_connect
       return fn()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 364, in connect
       return _ConnectionFairy._checkout(self)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 847, in _checkout
       fairy._connection_record._checkin_failed(err)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 843, in _checkout
       fairy._connection_record.get_connection()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 610, in get_connection
       self.__connect()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 661, in __connect
       pool.logger.debug("Error on connect(): %s", e)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 656, in __connect
       connection = pool._invoke_creator(self)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 
114, in connect
       return dialect.connect(*cargs, **cparams)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 
508, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 
127, in connect
       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
   psycopg2.OperationalError: could not connect to server: Connection refused
   
   Is the server running on host 
"planetic-orbit-1785-pgbouncer.astronomer-planetic-orbit-1785.svc.cluster.local."
 (10.98.3.123) and accepting
   
   TCP/IP connections on port 6543?
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1287, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.7/site-packages/astronomer/airflow/version_check/plugin.py",
 line 29, in run_before
       fn(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1389, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1486, in _do_scheduling
       self._create_dag_runs(query.all(), session)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3373, in all
       return list(self)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3535, in __iter__
       return self._execute_and_instances(context)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3557, in _execute_and_instances
       querycontext, self._connection_from_session, close_with_result=True
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3572, in _get_bind_args
       mapper=self._bind_mapper(), clause=querycontext.statement, **kw
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", 
line 3550, in _connection_from_session
       conn = self.session.connection(**kw)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1145, in connection
       execution_options=execution_options,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 1151, in _connection_for_bind
       engine, execution_options
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 
line 433, in _connection_for_bind
       conn = bind._contextual_connect()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2302, in _contextual_connect
       self._wrap_pool_connect(self.pool.connect, None),
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2340, in _wrap_pool_connect
       e, dialect, self
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 1584, in _handle_dbapi_exception_noconnection
       sqlalchemy_exception, with_traceback=exc_info[2], from_=e
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", 
line 2336, in _wrap_pool_connect
       return fn()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 364, in connect
       return _ConnectionFairy._checkout(self)
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 847, in _checkout
       fairy._connection_record._checkin_failed(err)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 843, in _checkout
       fairy._connection_record.get_connection()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 610, in get_connection
       self.__connect()
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 661, in __connect
       pool.logger.debug("Error on connect(): %s", e)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 
70, in __exit__
       with_traceback=exc_tb,
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", 
line 182, in raise_
       raise exception
     File "/usr/local/lib/python3.7/site-packages/sqlalchemy/pool/base.py", 
line 656, in __connect
       connection = pool._invoke_creator(self)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/strategies.py", line 
114, in connect
       return dialect.connect(*cargs, **cparams)
     File 
"/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 
508, in connect
       return self.dbapi.connect(*cargs, **cparams)
     File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 
127, in connect
       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
   sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not 
connect to server: Connection refused
   
    Is the server running on host 
"planetic-orbit-1785-pgbouncer.astronomer-planetic-orbit-1785.svc.cluster.local."
 (10.98.3.123) and accepting
    TCP/IP connections on port 6543?
   
   (Background on this error at: http://sqlalche.me/e/13/e3q8)
   [2021-04-20 00:12:16,452] {process_utils.py:100} INFO - Sending 
Signals.SIGTERM to GPID 60
   [2021-04-20 00:12:16,706] {process_utils.py:66} INFO - Process 
psutil.Process(pid=60, status='terminated', exitcode=0, started='00:07:36') 
(60) terminated with exit code 0
   [2021-04-20 00:12:16,707] {scheduler_job.py:1308} INFO - Exited execute loop
   ```
   
   **What you expected to happen**:
   
   I expect Airflow to retry the connection. SQLAlchemy (and DisconnectionError 
in general) advices 
[InvalidatePoolError](https://docs.sqlalchemy.org/en/14/core/exceptions.html#sqlalchemy.exc.InvalidatePoolError)
 to be caught and handled in general, instead of bubbling all the way up and 
crash the entire app. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to