This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit cae8eadb49c904f1114b6167305fb5b3cfa9fa84 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Wed Mar 31 15:48:46 2021 +0100 Allow pathlib.Path in DagBag and various util fns (#15110) We do a lot of path manipulation in this test file, and it's easier to understand by using pathlib without all the nested `os.path.*` calls. This change adds "support" for passing Path objects to DagBag and util functions. (cherry picked from commit 6e99ae05642758691361dfe9d7b767cfc9a2b616) --- airflow/models/dagbag.py | 20 ++++++++++++-------- airflow/utils/dag_processing.py | 7 +++++-- airflow/utils/file.py | 13 ++++++++----- tests/utils/test_dag_processing.py | 19 ++++++++++--------- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 9ddf013..8228659 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -27,7 +27,7 @@ import traceback import warnings import zipfile from datetime import datetime, timedelta -from typing import Dict, List, NamedTuple, Optional +from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Union from croniter import CroniterBadCronError, CroniterBadDateError, CroniterNotAlphaError, croniter from sqlalchemy.exc import OperationalError @@ -46,6 +46,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import provide_session from airflow.utils.timeout import timeout +if TYPE_CHECKING: + import pathlib + class FileLoadStat(NamedTuple): """Information about single file""" @@ -89,7 +92,7 @@ class DagBag(LoggingMixin): def __init__( self, - dag_folder: Optional[str] = None, + dag_folder: Union[str, "pathlib.Path", None] = None, include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), @@ -424,11 +427,11 @@ class DagBag(LoggingMixin): def collect_dags( self, - dag_folder=None, - only_if_updated=True, - include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), - include_smart_sensor=conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), - safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), + dag_folder: Union[str, "pathlib.Path", None] = None, + only_if_updated: bool = True, + include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'), + include_smart_sensor: bool = conf.getboolean('smart_sensor', 'USE_SMART_SENSOR'), + safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), ): """ Given a file path or a folder, this method looks for python modules, @@ -450,7 +453,8 @@ class DagBag(LoggingMixin): # Used to store stats around DagBag processing stats = [] - dag_folder = correct_maybe_zipped(dag_folder) + # Ensure dag_folder is a str -- it may have been a pathlib.Path + dag_folder = correct_maybe_zipped(str(dag_folder)) for filepath in list_py_file_paths( dag_folder, safe_mode=safe_mode, diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 7e98c11..f93847c 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -30,7 +30,7 @@ from collections import defaultdict from datetime import datetime, timedelta from importlib import import_module from multiprocessing.connection import Connection as MultiprocessingConnection -from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union, cast from setproctitle import setproctitle # pylint: disable=no-name-in-module from sqlalchemy import or_ @@ -52,6 +52,9 @@ from airflow.utils.process_utils import kill_child_processes_by_pids, reap_proce from airflow.utils.session import provide_session from airflow.utils.state import State +if TYPE_CHECKING: + import pathlib + class AbstractDagFileProcessorProcess(metaclass=ABCMeta): """Processes a DAG file. See SchedulerJob.process_file() for more details.""" @@ -489,7 +492,7 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc def __init__( self, - dag_directory: str, + dag_directory: Union[str, "pathlib.Path"], max_runs: int, processor_factory: Callable[[str, List[CallbackRequest]], AbstractDagFileProcessorProcess], processor_timeout: timedelta, diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 553c506..03343cd 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -20,10 +20,13 @@ import os import re import zipfile from pathlib import Path -from typing import Dict, Generator, List, Optional, Pattern +from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Pattern, Union from airflow.configuration import conf +if TYPE_CHECKING: + import pathlib + log = logging.getLogger(__name__) @@ -130,7 +133,7 @@ def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Gener def list_py_file_paths( - directory: str, + directory: Union[str, "pathlib.Path"], safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True), include_examples: Optional[bool] = None, include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'), @@ -158,7 +161,7 @@ def list_py_file_paths( if directory is None: file_paths = [] elif os.path.isfile(directory): - file_paths = [directory] + file_paths = [str(directory)] elif os.path.isdir(directory): find_dag_file_paths(directory, file_paths, safe_mode) if include_examples: @@ -174,9 +177,9 @@ def list_py_file_paths( return file_paths -def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool): +def find_dag_file_paths(directory: Union[str, "pathlib.Path"], file_paths: list, safe_mode: bool): """Finds file paths of all DAG files.""" - for file_path in find_path_from_directory(directory, ".airflowignore"): + for file_path in find_path_from_directory(str(directory), ".airflowignore"): try: if not os.path.isfile(file_path): continue diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index dc08210..68a950f 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -18,6 +18,7 @@ import multiprocessing import os +import pathlib import sys import unittest from datetime import datetime, timedelta @@ -49,7 +50,7 @@ from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags -TEST_DAG_FOLDER = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, 'dags') +TEST_DAG_FOLDER = pathlib.Path(__file__).parent.parent / 'dags' DEFAULT_DATE = timezone.datetime(2016, 1, 1) @@ -276,7 +277,7 @@ class TestDagFileProcessorManager(unittest.TestCase): Check that the same set of failure callback with zombies are passed to the dag file processors until the next zombie detection logic is invoked. """ - test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py') + test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py' with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}): dagbag = DagBag(test_dag_path, read_dags_from_db=False) with create_session() as session: @@ -305,7 +306,7 @@ class TestDagFileProcessorManager(unittest.TestCase): ) ] - test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_example_bash_operator.py') + test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py' child_pipe, parent_pipe = multiprocessing.Pipe() async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') @@ -334,12 +335,12 @@ class TestDagFileProcessorManager(unittest.TestCase): if async_mode: # Once for initial parse, and then again for the add_callback_to_queue assert len(fake_processors) == 2 - assert fake_processors[0]._file_path == test_dag_path + assert fake_processors[0]._file_path == str(test_dag_path) assert fake_processors[0]._callback_requests == [] else: assert len(fake_processors) == 1 - assert fake_processors[-1]._file_path == test_dag_path + assert fake_processors[-1]._file_path == str(test_dag_path) callback_requests = fake_processors[-1]._callback_requests assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == { result.simple_task_instance.key for result in callback_requests @@ -403,7 +404,7 @@ class TestDagFileProcessorManager(unittest.TestCase): from airflow.jobs.scheduler_job import SchedulerJob dag_id = 'exit_test_dag' - dag_directory = os.path.normpath(os.path.join(TEST_DAG_FOLDER, os.pardir, "dags_with_system_exit")) + dag_directory = TEST_DAG_FOLDER.parent / 'dags_with_system_exit' # Delete the one valid DAG/SerializedDAG, and check that it gets re-created clear_db_dags() @@ -465,7 +466,7 @@ class TestDagFileProcessorAgent(unittest.TestCase): with settings_context(SETTINGS_FILE_VALID): # Launch a process through DagFileProcessorAgent, which will try # reload the logging module. - test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') + test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') @@ -493,7 +494,7 @@ class TestDagFileProcessorAgent(unittest.TestCase): clear_db_serialized_dags() clear_db_dags() - test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') + test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') processor_agent = DagFileProcessorAgent( test_dag_path, 1, type(self)._processor_factory, timedelta.max, [], False, async_mode @@ -517,7 +518,7 @@ class TestDagFileProcessorAgent(unittest.TestCase): assert dag_ids == [('test_start_date_scheduling',), ('test_task_start_date_scheduling',)] def test_launch_process(self): - test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py') + test_dag_path = TEST_DAG_FOLDER / 'test_scheduler_dags.py' async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn') log_file_loc = conf.get('logging', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
