Repository: incubator-airflow
Updated Branches:
  refs/heads/master 961aa0581 -> 250faad0f


[AIRFLOW-2442][AIRFLOW-2] Airflow run command leaves database connections open

Closes #3336 from afernandez/afernandez_db


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/250faad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/250faad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/250faad0

Branch: refs/heads/master
Commit: 250faad0f557bb8deac8bd0b948112bcaf48004a
Parents: 961aa05
Author: Alejandro Fernandez <[email protected]>
Authored: Thu May 10 15:29:33 2018 -0700
Committer: Maxime Beauchemin <[email protected]>
Committed: Thu May 10 15:29:33 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                           | 15 +++++++----
 airflow/config_templates/default_airflow.cfg |  5 ++--
 airflow/settings.py                          | 31 ++++++++++++++++++-----
 3 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3df8166..1615e76 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -419,10 +419,6 @@ def _run(args, dag, ti):
 
 @cli_utils.action_logging
 def run(args, dag=None):
-    # Disable connection pooling to reduce the # of connections on the DB
-    # while it's waiting for the task to finish.
-    settings.configure_orm(disable_connection_pool=True)
-
     if dag:
         args.dag_id = dag.dag_id
 
@@ -436,11 +432,20 @@ def run(args, dag=None):
         if os.path.exists(args.cfg_path):
             os.remove(args.cfg_path)
 
+        # Do not log these properties since some may contain passwords.
+        # This may also set default values for database properties like
+        # core.sql_alchemy_pool_size
+        # core.sql_alchemy_pool_recycle
         for section, config in conf_dict.items():
             for option, value in config.items():
                 conf.set(section, option, value)
         settings.configure_vars()
-        settings.configure_orm()
+
+    # IMPORTANT, have to use the NullPool, otherwise, each "run" command may 
leave
+    # behind multiple open sleeping connections while heartbeating, which could
+    # easily exceed the database connection limit when
+    # processing hundreds of simultaneous tasks.
+    settings.configure_orm(disable_connection_pool=True)
 
     if not args.pickle and not dag:
         dag = get_dag(args)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 33b99ff..d2e77a7 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -89,8 +89,9 @@ sql_alchemy_pool_size = 5
 
 # The SqlAlchemy pool recycle is the number of seconds a connection
 # can be idle in the pool before it is invalidated. This config does
-# not apply to sqlite.
-sql_alchemy_pool_recycle = 3600
+# not apply to sqlite. If the number of DB connections is ever exceeded,
+# a lower config value will allow the system to recover faster.
+sql_alchemy_pool_recycle = 1800
 
 # How many seconds to retry re-establishing a DB connection after
 # disconnects. Setting this to 0 disables retries.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/250faad0/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 3ee0d5d..2abe568 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -151,11 +151,29 @@ def configure_orm(disable_connection_pool=False):
     pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
     if disable_connection_pool or not pool_connections:
         engine_args['poolclass'] = NullPool
+        log.info("settings.configure_orm(): Using NullPool")
     elif 'sqlite' not in SQL_ALCHEMY_CONN:
-        # Engine args not supported by sqlite
-        engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
-        engine_args['pool_recycle'] = conf.getint('core',
-                                                  'SQL_ALCHEMY_POOL_RECYCLE')
+        # Engine args not supported by sqlite.
+        # If no config value is defined for the pool size, select a reasonable 
value.
+        # 0 means no limit, which could lead to exceeding the Database 
connection limit.
+        try:
+            pool_size = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
+        except conf.AirflowConfigException:
+            pool_size = 5
+
+        # The DB server already has a value for wait_timeout (number of 
seconds after
+        # which an idle sleeping connection should be killed). Since other DBs 
may
+        # co-exist on the same server, SQLAlchemy should set its
+        # pool_recycle to an equal or smaller value.
+        try:
+            pool_recycle = conf.getint('core', 'SQL_ALCHEMY_POOL_RECYCLE')
+        except conf.AirflowConfigException:
+            pool_recycle = 1800
+
+        log.info("setting.configure_orm(): Using pool settings. pool_size={}, "
+                 "pool_recycle={}".format(pool_size, pool_recycle))
+        engine_args['pool_size'] = pool_size
+        engine_args['pool_recycle'] = pool_recycle
 
     engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
     reconnect_timeout = conf.getint('core', 'SQL_ALCHEMY_RECONNECT_TIMEOUT')
@@ -211,6 +229,7 @@ except:
 configure_logging()
 configure_vars()
 configure_adapters()
+# The webservers import this file from models.py with the default settings.
 configure_orm()
 configure_action_logging()
 

Reply via email to