Repository: incubator-airflow Updated Branches: refs/heads/master 7d11444a5 -> 23a16f7ad
[AIRFLOW-1005] Improve Airflow startup time Airflowâs startup time can be reduced by 50% by deferring imports of Cryptography (and relatedly, not generating Fernet keys unless we have to) and Alembic. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/996dd309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/996dd309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/996dd309 Branch: refs/heads/master Commit: 996dd309331b010b15e34b99222430283ad7d8a4 Parents: a8027a3 Author: Jeremiah Lowin <[email protected]> Authored: Fri Mar 17 18:25:24 2017 -0400 Committer: Jeremiah Lowin <[email protected]> Committed: Sun Mar 19 10:06:33 2017 -0400 ---------------------------------------------------------------------- airflow/configuration.py | 30 ++++++++++++++++------------ airflow/models.py | 46 ++++++++++++++++++++++++++++--------------- airflow/utils/db.py | 11 ++++++----- 3 files changed, 53 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 895c08d..9c7a03e 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -43,18 +43,16 @@ warnings.filterwarnings( action='default', category=PendingDeprecationWarning, module='airflow') -try: - from cryptography.fernet import Fernet -except ImportError: - pass - - def generate_fernet_key(): try: - FERNET_KEY = Fernet.generate_key().decode() + from cryptography.fernet import Fernet + except ImportError: + pass + try: + key = Fernet.generate_key().decode() except NameError: - FERNET_KEY = "cryptography_not_found_storing_passwords_in_plain_text" - return FERNET_KEY + key = "cryptography_not_found_storing_passwords_in_plain_text" + return key def expand_env_var(env_var): @@ -774,14 +772,19 @@ def parameterized_config(template): current scope :param template: a config content templated with {{variables}} """ - FERNET_KEY = generate_fernet_key() all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()} + if 'FERNET_KEY' not in all_vars: + all_vars['FERNET_KEY'] = '' return template.format(**all_vars) TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg' +# only generate a Fernet key if we need to create a new config file +if not os.path.isfile(TEST_CONFIG_FILE) or not os.path.isfile(AIRFLOW_CONFIG): + FERNET_KEY = generate_fernet_key() if not os.path.isfile(TEST_CONFIG_FILE): - logging.info("Creating new airflow config file for unit tests in: " + - TEST_CONFIG_FILE) + logging.info( + 'Creating new Airflow config file for unit tests in: {}'.format( + TEST_CONFIG_FILE)) with open(TEST_CONFIG_FILE, 'w') as f: f.write(parameterized_config(TEST_CONFIG)) @@ -789,7 +792,8 @@ if not os.path.isfile(AIRFLOW_CONFIG): # These configuration options are used to generate a default configuration # when it is missing. The right way to change your configuration is to # alter your configuration file, not this code. - logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG) + logging.info('Creating new Airflow config file in: {}'.format( + AIRFLOW_CONFIG)) with open(AIRFLOW_CONFIG, 'w') as f: f.write(parameterized_config(DEFAULT_CONFIG)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 561b002..f2d955b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import absolute_import from __future__ import division from __future__ import print_function @@ -44,7 +45,6 @@ import textwrap import traceback import warnings import hashlib - from urllib.parse import urlparse from sqlalchemy import ( @@ -66,6 +66,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep + from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.db import provide_session @@ -85,13 +86,17 @@ XCOM_RETURN_KEY = 'return_value' Stats = settings.Stats -ENCRYPTION_ON = False -try: + +def get_fernet(): + """ + Deferred load of Fernet key. + + This function could fail either because Cryptography is not installed + or because the Fernet key is invalid. + """ from cryptography.fernet import Fernet - FERNET = Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8')) - ENCRYPTION_ON = True -except: - pass + return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8')) + if 'mysql' in settings.SQL_ALCHEMY_CONN: LongText = LONGTEXT @@ -572,18 +577,21 @@ class Connection(Base): def get_password(self): if self._password and self.is_encrypted: - if not ENCRYPTION_ON: + try: + fernet = get_fernet() + except: raise AirflowException( "Can't decrypt encrypted password for login={}, \ FERNET_KEY configuration is missing".format(self.login)) - return FERNET.decrypt(bytes(self._password, 'utf-8')).decode() + return fernet.decrypt(bytes(self._password, 'utf-8')).decode() else: return self._password def set_password(self, value): if value: try: - self._password = FERNET.encrypt(bytes(value, 'utf-8')).decode() + fernet = get_fernet() + self._password = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = True except NameError: self._password = value @@ -596,18 +604,21 @@ class Connection(Base): def get_extra(self): if self._extra and self.is_extra_encrypted: - if not ENCRYPTION_ON: + try: + fernet = get_fernet() + except: raise AirflowException( "Can't decrypt `extra` params for login={},\ FERNET_KEY configuration is missing".format(self.login)) - return FERNET.decrypt(bytes(self._extra, 'utf-8')).decode() + return fernet.decrypt(bytes(self._extra, 'utf-8')).decode() else: return self._extra def set_extra(self, value): if value: try: - self._extra = FERNET.encrypt(bytes(value, 'utf-8')).decode() + fernet = get_fernet() + self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_extra_encrypted = True except NameError: self._extra = value @@ -3556,18 +3567,21 @@ class Variable(Base): def get_val(self): if self._val and self.is_encrypted: - if not ENCRYPTION_ON: + try: + fernet = get_fernet() + except: raise AirflowException( "Can't decrypt _val for key={}, FERNET_KEY configuration \ missing".format(self.key)) - return FERNET.decrypt(bytes(self._val, 'utf-8')).decode() + return fernet.decrypt(bytes(self._val, 'utf-8')).decode() else: return self._val def set_val(self, value): if value: try: - self._val = FERNET.encrypt(bytes(value, 'utf-8')).decode() + fernet = get_fernet() + self._val = fernet.encrypt(bytes(value, 'utf-8')).decode() self.is_encrypted = True except NameError: self._val = value http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 977a949..49a8d62 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -22,16 +22,11 @@ from functools import wraps import logging import os -from alembic.config import Config -from alembic import command -from alembic.migration import MigrationContext - from sqlalchemy import event, exc from sqlalchemy.pool import Pool from airflow import settings - def provide_session(func): """ Function decorator that provides a session if it isn't provided. @@ -284,6 +279,10 @@ def initdb(): def upgradedb(): + # alembic adds significant import time, so we import it lazily + from alembic import command + from alembic.config import Config + logging.info("Creating tables") current_dir = os.path.dirname(os.path.abspath(__file__)) package_dir = os.path.normpath(os.path.join(current_dir, '..')) @@ -299,6 +298,8 @@ def resetdb(): Clear out the database ''' from airflow import models + # alembic adds significant import time, so we import it lazily + from alembic.migration import MigrationContext logging.info("Dropping tables that exist") models.Base.metadata.drop_all(settings.engine)
