This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a0fc25593925ab33c8a147ecc0fa987f043353ec Author: Kaxil Naik <[email protected]> AuthorDate: Mon Jun 29 12:46:06 2020 +0100 Fix the default value for store_dag_code (#9554) related to #8255 (fixes the issue mentioned with `store_dag_code` but does not address Config interpolation) The default value of `store_dag_code` should be same as `store_serialized_dags` setting. But if the value is set it should use that value (cherry picked from commit 57c722b65c1ddfe527924448291f29ff7036ad0a) --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 3 ++- airflow/models/dag.py | 2 +- airflow/models/dagcode.py | 4 ++-- airflow/settings.py | 5 +++++ airflow/utils/dag_processing.py | 4 ++-- tests/test_configuration.py | 20 ++++++++++++++++++++ 7 files changed, 34 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 61491d8..e32b8cc 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -445,8 +445,8 @@ ``store_serialized_dags`` setting. version_added: 1.10.10 type: string - example: ~ - default: "%(store_serialized_dags)s" + example: "False" + default: ~ - name: max_num_rendered_ti_fields_per_task description: | Maximum number of Rendered Task Instance Fields (Template Fields) per task to store diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 2cc97e2..c75d3ae 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -232,7 +232,8 @@ min_serialized_dag_update_interval = 30 # If set to True, Webserver reads file contents from DB instead of # trying to access files in a DAG folder. Defaults to same as the # ``store_serialized_dags`` setting. -store_dag_code = %(store_serialized_dags)s +# Example: store_dag_code = False +# store_dag_code = # Maximum number of Rendered Task Instance Fields (Template Fields) per task to store # in the Database. diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7759cb3..933dc10 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1529,7 +1529,7 @@ class DAG(BaseDag, LoggingMixin): orm_dag.schedule_interval = self.schedule_interval orm_dag.tags = self.get_dagtags(session=session) - if conf.getboolean('core', 'store_dag_code', fallback=False): + if settings.STORE_DAG_CODE: DagCode.bulk_sync_to_db([orm_dag.fileloc]) session.commit() diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py index 513ec18..6aa7b6a 100644 --- a/airflow/models/dagcode.py +++ b/airflow/models/dagcode.py @@ -21,9 +21,9 @@ from datetime import datetime from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists -from airflow.configuration import conf from airflow.exceptions import AirflowException, DagCodeNotFound from airflow.models import Base +from airflow.settings import STORE_DAG_CODE from airflow.utils import timezone from airflow.utils.file import correct_maybe_zipped, open_maybe_zipped from airflow.utils.db import provide_session @@ -178,7 +178,7 @@ class DagCode(Base): :return: source code as string """ - if conf.getboolean('core', 'store_dag_code', fallback=False): + if STORE_DAG_CODE: return cls._get_code_from_db(fileloc) else: return cls._get_code_from_file(fileloc) diff --git a/airflow/settings.py b/airflow/settings.py index 513f192..c86d4b2 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -401,6 +401,11 @@ STORE_SERIALIZED_DAGS = conf.getboolean('core', 'store_serialized_dags', fallbac MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint( 'core', 'min_serialized_dag_update_interval', fallback=30) +# Whether to persist DAG files code in DB. If set to True, Webserver reads file contents +# from DB instead of trying to access files in a DAG folder. +# Defaults to same as the store_serialized_dags setting. +STORE_DAG_CODE = conf.getboolean("core", "store_dag_code", fallback=STORE_SERIALIZED_DAGS) + # If donot_modify_handlers=True, we do not modify logging handlers in task_run command # If the flag is set to False, we remove all handlers from the root logger # and add all handlers from 'airflow.task' logger to the root Logger. This is done diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 6e4e045..3aac8fd 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -50,7 +50,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException from airflow.settings import Stats from airflow.models import errors -from airflow.settings import STORE_SERIALIZED_DAGS +from airflow.settings import STORE_DAG_CODE, STORE_SERIALIZED_DAGS from airflow.utils import timezone from airflow.utils.helpers import reap_process_group from airflow.utils.db import provide_session @@ -914,7 +914,7 @@ class DagFileProcessorManager(LoggingMixin): SerializedDagModel.remove_deleted_dags(self._file_paths) DagModel.deactivate_deleted_dags(self._file_paths) - if conf.getboolean('core', 'store_dag_code', fallback=False): + if STORE_DAG_CODE: from airflow.models.dagcode import DagCode DagCode.remove_deleted_code(self._file_paths) diff --git a/tests/test_configuration.py b/tests/test_configuration.py index af1df69..5c40cad 100644 --- a/tests/test_configuration.py +++ b/tests/test_configuration.py @@ -32,6 +32,7 @@ import six from airflow import configuration from airflow.configuration import conf, AirflowConfigParser, parameterized_config from tests.compat import mock +from tests.test_utils.config import conf_vars from tests.test_utils.reset_warning_registry import reset_warning_registry if six.PY2: @@ -494,3 +495,22 @@ notacommand = OK conf.write(string_file) content = string_file.getvalue() self.assertIn("dags_folder = /tmp/test_folder", content) + + @conf_vars({("core", "store_serialized_dags"): "True"}) + def test_store_dag_code_default_config(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertFalse(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertTrue(store_dag_code) + + @conf_vars({ + ("core", "store_serialized_dags"): "True", + ("core", "store_dag_code"): "False" + }) + def test_store_dag_code_config_when_set(self): + store_serialized_dags = conf.getboolean('core', 'store_serialized_dags', fallback=False) + store_dag_code = conf.getboolean("core", "store_dag_code", fallback=store_serialized_dags) + self.assertTrue(conf.has_option("core", "store_dag_code")) + self.assertTrue(store_serialized_dags) + self.assertFalse(store_dag_code)
