Repository: incubator-airflow Updated Branches: refs/heads/master 961aa0581 -> 250faad0f
[AIRFLOW-2442][AIRFLOW-2] Airflow run command leaves database connections open Closes #3336 from afernandez/afernandez_db Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/250faad0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/250faad0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/250faad0 Branch: refs/heads/master Commit: 250faad0f557bb8deac8bd0b948112bcaf48004a Parents: 961aa05 Author: Alejandro Fernandez <[email protected]> Authored: Thu May 10 15:29:33 2018 -0700 Committer: Maxime Beauchemin <[email protected]> Committed: Thu May 10 15:29:33 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 15 +++++++---- airflow/config_templates/default_airflow.cfg | 5 ++-- airflow/settings.py | 31 ++++++++++++++++++----- 3 files changed, 38 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 3df8166..1615e76 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -419,10 +419,6 @@ def _run(args, dag, ti): @cli_utils.action_logging def run(args, dag=None): - # Disable connection pooling to reduce the # of connections on the DB - # while it's waiting for the task to finish. - settings.configure_orm(disable_connection_pool=True) - if dag: args.dag_id = dag.dag_id @@ -436,11 +432,20 @@ def run(args, dag=None): if os.path.exists(args.cfg_path): os.remove(args.cfg_path) + # Do not log these properties since some may contain passwords. + # This may also set default values for database properties like + # core.sql_alchemy_pool_size + # core.sql_alchemy_pool_recycle for section, config in conf_dict.items(): for option, value in config.items(): conf.set(section, option, value) settings.configure_vars() - settings.configure_orm() + + # IMPORTANT, have to use the NullPool, otherwise, each "run" command may leave + # behind multiple open sleeping connections while heartbeating, which could + # easily exceed the database connection limit when + # processing hundreds of simultaneous tasks. + settings.configure_orm(disable_connection_pool=True) if not args.pickle and not dag: dag = get_dag(args) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 33b99ff..d2e77a7 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -89,8 +89,9 @@ sql_alchemy_pool_size = 5 # The SqlAlchemy pool recycle is the number of seconds a connection # can be idle in the pool before it is invalidated. This config does -# not apply to sqlite. -sql_alchemy_pool_recycle = 3600 +# not apply to sqlite. If the number of DB connections is ever exceeded, +# a lower config value will allow the system to recover faster. +sql_alchemy_pool_recycle = 1800 # How many seconds to retry re-establishing a DB connection after # disconnects. Setting this to 0 disables retries. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 3ee0d5d..2abe568 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -151,11 +151,29 @@ def configure_orm(disable_connection_pool=False): pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED') if disable_connection_pool or not pool_connections: engine_args['poolclass'] = NullPool + log.info("settings.configure_orm(): Using NullPool") elif 'sqlite' not in SQL_ALCHEMY_CONN: - # Engine args not supported by sqlite - engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') - engine_args['pool_recycle'] = conf.getint('core', - 'SQL_ALCHEMY_POOL_RECYCLE') + # Engine args not supported by sqlite. + # If no config value is defined for the pool size, select a reasonable value. + # 0 means no limit, which could lead to exceeding the Database connection limit. + try: + pool_size = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') + except conf.AirflowConfigException: + pool_size = 5 + + # The DB server already has a value for wait_timeout (number of seconds after + # which an idle sleeping connection should be killed). Since other DBs may + # co-exist on the same server, SQLAlchemy should set its + # pool_recycle to an equal or smaller value. + try: + pool_recycle = conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE') + except conf.AirflowConfigException: + pool_recycle = 1800 + + log.info("setting.configure_orm(): Using pool settings. pool_size={}, " + "pool_recycle={}".format(pool_size, pool_recycle)) + engine_args['pool_size'] = pool_size + engine_args['pool_recycle'] = pool_recycle engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT') @@ -211,6 +229,7 @@ except: configure_logging() configure_vars() configure_adapters() +# The webservers import this file from models.py with the default settings. configure_orm() configure_action_logging()
