[AIRFLOW-1665] Reconnect on database errors This change enables the scheduler to recover from temporary database errors and downtimes. The same holds true for the webserver if run without its regular worker refresh.
The reconnect logic is based on a truncated exponential binary backoff to ensure reconnect attempts don't overload the database. Included changes: * Switch to recommended pessimistic disconnect handling for engines http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic * Remove legacy pool-based disconnect handling. * Ensure event handlers are registered for each newly created engine. Engines are re-initialized in child processes so this is crucial for correctness. This commit is based on a contribution by @vklogin https://github.com/apache/incubator-airflow/pull/2744 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/94deac34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/94deac34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/94deac34 Branch: refs/heads/master Commit: 94deac34eca869a0accbc6affe7640b09dab1530 Parents: 6bf1a6e Author: Stephan Erb <[email protected]> Authored: Sun Nov 26 21:28:12 2017 +0100 Committer: Stephan Erb <[email protected]> Committed: Wed Nov 29 12:29:47 2017 +0100 ---------------------------------------------------------------------- airflow/bin/cli.py | 1 - airflow/config_templates/default_airflow.cfg | 4 + airflow/jobs.py | 4 +- airflow/settings.py | 4 + airflow/utils/db.py | 34 ------- airflow/utils/sqlalchemy.py | 108 ++++++++++++++++++++++ 6 files changed, 117 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 4e56d54..c0ab5e3 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -328,7 +328,6 @@ def run(args, dag=None): # while it's waiting for the task to finish. settings.configure_orm(disable_connection_pool=True) - db_utils.pessimistic_connection_handling() if dag: args.dag_id = dag.dag_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 7a8ddf2..32af0a3 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -79,6 +79,10 @@ sql_alchemy_pool_size = 5 # not apply to sqlite. sql_alchemy_pool_recycle = 3600 +# How many seconds to retry re-establishing a DB connection after +# disconnects. Setting this to 0 disables retries. +sql_alchemy_reconnect_timeout = 300 + # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 91ab96c..ed6da60 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -54,8 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor, SimpleDag, SimpleDagBag, list_py_file_paths) -from airflow.utils.db import ( - create_session, provide_session, pessimistic_connection_handling) +from airflow.utils.db import create_session, provide_session from airflow.utils.email import send_email from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter from airflow.utils.state import State @@ -1508,7 +1507,6 @@ class SchedulerJob(BaseJob): def _execute(self): self.log.info("Starting the scheduler") - pessimistic_connection_handling() # DAGs can be pickled for easier remote execution by some executors pickle_dags = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 04d3548..929663b 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -28,6 +28,7 @@ from sqlalchemy.pool import NullPool from airflow import configuration as conf from airflow.logging_config import configure_logging +from airflow.utils.sqlalchemy import setup_event_handlers log = logging.getLogger(__name__) @@ -152,6 +153,9 @@ def configure_orm(disable_connection_pool=False): 'SQL_ALCHEMY_POOL_RECYCLE') engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) + reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT') + setup_event_handlers(engine, reconnect_timeout) + Session = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=engine)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 9c924d1..64ce220 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -22,9 +22,6 @@ from functools import wraps import os import contextlib -from sqlalchemy import event, exc -from sqlalchemy.pool import Pool - from airflow import settings from airflow.utils.log.logging_mixin import LoggingMixin @@ -74,21 +71,6 @@ def provide_session(func): return wrapper -def pessimistic_connection_handling(): - @event.listens_for(Pool, "checkout") - def ping_connection(dbapi_connection, connection_record, connection_proxy): - ''' - Disconnect Handling - Pessimistic, taken from: - http://docs.sqlalchemy.org/en/rel_0_9/core/pooling.html - ''' - cursor = dbapi_connection.cursor() - try: - cursor.execute("SELECT 1") - except: - raise exc.DisconnectionError() - cursor.close() - - @provide_session def merge_conn(conn, session=None): from airflow import models @@ -98,22 +80,6 @@ def merge_conn(conn, session=None): session.commit() [email protected]_for(settings.engine, "connect") -def connect(dbapi_connection, connection_record): - connection_record.info['pid'] = os.getpid() - - [email protected]_for(settings.engine, "checkout") -def checkout(dbapi_connection, connection_record, connection_proxy): - pid = os.getpid() - if connection_record.info['pid'] != pid: - connection_record.connection = connection_proxy.connection = None - raise exc.DisconnectionError( - "Connection record belongs to pid {}, " - "attempting to check out in pid {}".format(connection_record.info['pid'], pid) - ) - - def initdb(): session = settings.Session() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/94deac34/airflow/utils/sqlalchemy.py ---------------------------------------------------------------------- diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py new file mode 100644 index 0000000..fa07437 --- /dev/null +++ b/airflow/utils/sqlalchemy.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import os +import time +import random + +from sqlalchemy import event, exc, select + +from airflow.utils.log.logging_mixin import LoggingMixin + +log = LoggingMixin().log + + +def setup_event_handlers( + engine, + reconnect_timeout_seconds, + initial_backoff_seconds=0.2, + max_backoff_seconds=120): + + @event.listens_for(engine, "engine_connect") + def ping_connection(connection, branch): + """ + Pessimistic SQLAlchemy disconnect handling. Ensures that each + connection returned from the pool is properly connected to the database. + + http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic + """ + if branch: + # "branch" refers to a sub-connection of a connection, + # we don't want to bother pinging on these. + return + + start = time.time() + backoff = initial_backoff_seconds + + # turn off "close with result". This flag is only used with + # "connectionless" execution, otherwise will be False in any case + save_should_close_with_result = connection.should_close_with_result + + while True: + connection.should_close_with_result = False + + try: + connection.scalar(select([1])) + # If we made it here then the connection appears to be healty + break + except exc.DBAPIError as err: + if time.time() - start >= reconnect_timeout_seconds: + log.error( + "Failed to re-establish DB connection within %s secs: %s", + reconnect_timeout_seconds, + err) + raise + if err.connection_invalidated: + log.warning("DB connection invalidated. Reconnecting...") + + # Use a truncated binary exponential backoff. Also includes + # a jitter to prevent the thundering herd problem of + # simultaneous client reconnects + backoff += backoff * random.random() + time.sleep(min(backoff, max_backoff_seconds)) + + # run the same SELECT again - the connection will re-validate + # itself and establish a new connection. The disconnect detection + # here also causes the whole connection pool to be invalidated + # so that all stale connections are discarded. + continue + else: + log.error( + "Unknown database connection error. Not retrying: %s", + err) + raise + finally: + # restore "close with result" + connection.should_close_with_result = save_should_close_with_result + + + @event.listens_for(engine, "connect") + def connect(dbapi_connection, connection_record): + connection_record.info['pid'] = os.getpid() + + + @event.listens_for(engine, "checkout") + def checkout(dbapi_connection, connection_record, connection_proxy): + pid = os.getpid() + if connection_record.info['pid'] != pid: + connection_record.connection = connection_proxy.connection = None + raise exc.DisconnectionError( + "Connection record belongs to pid {}, " + "attempting to check out in pid {}".format(connection_record.info['pid'], pid) + )
