This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 734f1dc3f45712d4702e09e0e68c76f5a8327f7b Author: Kaxil Naik <[email protected]> AuthorDate: Wed May 26 11:29:35 2021 +0100 Parse recently modified files even if just parsed (#16075) This commit adds an optimization where the recently modified files (detected by mtime) will be parsed even though it has not reached `min_file_process_interval`. This way you can increase `[scheduler] min_file_process_interval` to a higher value like `600` or so when you have large number of files to avoid unnecessary reparsing if files haven't changed, while still making sure that modified files are taken care of. (cherry picked from commit add7490145fabd097d605d85a662dccd02b600de) --- airflow/utils/dag_processing.py | 8 ++++- tests/utils/test_dag_processing.py | 60 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 4b85234..676b610 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -1054,14 +1054,20 @@ class DagFileProcessorManager(LoggingMixin): # pylint: disable=too-many-instanc if is_mtime_mode: files_with_mtime[file_path] = os.path.getmtime(file_path) + file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path])) else: file_paths.append(file_path) + file_modified_time = None - # Find file paths that were recently processed + # Find file paths that were recently processed to exclude them + # from being added to file_path_queue + # unless they were modified recently and parsing mode is "modified_time" + # in which case we don't honor "self._file_process_interval" (min_file_process_interval) last_finish_time = self.get_last_finish_time(file_path) if ( last_finish_time is not None and (now - last_finish_time).total_seconds() < self._file_process_interval + and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time)) ): file_paths_recently_processed.append(file_path) diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py index 78cd988..3242cf3 100644 --- a/tests/utils/test_dag_processing.py +++ b/tests/utils/test_dag_processing.py @@ -31,6 +31,7 @@ from unittest import mock from unittest.mock import MagicMock, PropertyMock import pytest +from freezegun import freeze_time from airflow.configuration import conf from airflow.jobs.local_task_job import LocalTaskJob as LJ @@ -324,6 +325,65 @@ class TestDagFileProcessorManager(unittest.TestCase): manager.prepare_file_path_queue() assert manager._file_path_queue == ['file_4.py', 'file_1.py', 'file_3.py', 'file_2.py'] + @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"}) + @mock.patch("zipfile.is_zipfile", return_value=True) + @mock.patch("airflow.utils.file.might_contain_dag", return_value=True) + @mock.patch("airflow.utils.file.find_path_from_directory", return_value=True) + @mock.patch("airflow.utils.file.os.path.isfile", return_value=True) + @mock.patch("airflow.utils.file.os.path.getmtime") + def test_recently_modified_file_is_parsed_with_mtime_mode( + self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile + ): + """ + Test recently updated files are processed even if min_file_process_interval is not reached + """ + freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) + initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp() + dag_files = ["file_1.py"] + mock_getmtime.side_effect = [initial_file_1_mtime] + mock_find_path.return_value = dag_files + + manager = DagFileProcessorManager( + dag_directory='directory', + max_runs=3, + processor_factory=MagicMock().return_value, + processor_timeout=timedelta.max, + signal_conn=MagicMock(), + dag_ids=[], + pickle_dags=False, + async_mode=True, + ) + + # let's say the DAG was just parsed 2 seconds before the Freezed time + last_finish_time = freezed_base_time - timedelta(seconds=10) + manager._file_stats = { + "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1), + } + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + manager.prepare_file_path_queue() + assert manager._file_path_queue == [] + + # Simulate the DAG modification by using modified_time which is greater + # than the last_parse_time but still less than now - min_file_process_interval + file_1_new_mtime = freezed_base_time - timedelta(seconds=5) + file_1_new_mtime_ts = file_1_new_mtime.timestamp() + with freeze_time(freezed_base_time): + manager.set_file_paths(dag_files) + assert manager._file_path_queue == [] + # File Path Queue will be empty as the "modified time" < "last finish time" + mock_getmtime.side_effect = [file_1_new_mtime_ts] + manager.prepare_file_path_queue() + # Check that file is added to the queue even though file was just recently passed + assert manager._file_path_queue == ["file_1.py"] + assert last_finish_time < file_1_new_mtime + assert ( + manager._file_process_interval + > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds() + ) + def test_find_zombies(self): manager = DagFileProcessorManager( dag_directory='directory',
