This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new add7490 Parse recently modified files even if just parsed (#16075)
add7490 is described below
commit add7490145fabd097d605d85a662dccd02b600de
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed May 26 11:29:35 2021 +0100
Parse recently modified files even if just parsed (#16075)
This commit adds an optimization where the recently modified files
(detected by mtime) will be parsed even though it has not reached
`min_file_process_interval`.
This way you can increase `[scheduler] min_file_process_interval` to
a higher value like `600` or so when you have large number of files to
avoid unnecessary reparsing if files haven't changed, while still making
sure that modified files are taken care of.
---
airflow/utils/dag_processing.py | 8 ++++-
tests/utils/test_dag_processing.py | 60 ++++++++++++++++++++++++++++++++++++++
2 files changed, 67 insertions(+), 1 deletion(-)
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 4b85234..676b610 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -1054,14 +1054,20 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
if is_mtime_mode:
files_with_mtime[file_path] = os.path.getmtime(file_path)
+ file_modified_time =
timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
file_paths.append(file_path)
+ file_modified_time = None
- # Find file paths that were recently processed
+ # Find file paths that were recently processed to exclude them
+ # from being added to file_path_queue
+ # unless they were modified recently and parsing mode is
"modified_time"
+ # in which case we don't honor "self._file_process_interval"
(min_file_process_interval)
last_finish_time = self.get_last_finish_time(file_path)
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() <
self._file_process_interval
+ and not (is_mtime_mode and file_modified_time and
(file_modified_time > last_finish_time))
):
file_paths_recently_processed.append(file_path)
diff --git a/tests/utils/test_dag_processing.py
b/tests/utils/test_dag_processing.py
index 78cd988..3242cf3 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -31,6 +31,7 @@ from unittest import mock
from unittest.mock import MagicMock, PropertyMock
import pytest
+from freezegun import freeze_time
from airflow.configuration import conf
from airflow.jobs.local_task_job import LocalTaskJob as LJ
@@ -324,6 +325,65 @@ class TestDagFileProcessorManager(unittest.TestCase):
manager.prepare_file_path_queue()
assert manager._file_path_queue == ['file_4.py', 'file_1.py',
'file_3.py', 'file_2.py']
+ @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
+ @mock.patch("zipfile.is_zipfile", return_value=True)
+ @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
+ @mock.patch("airflow.utils.file.find_path_from_directory",
return_value=True)
+ @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
+ @mock.patch("airflow.utils.file.os.path.getmtime")
+ def test_recently_modified_file_is_parsed_with_mtime_mode(
+ self, mock_getmtime, mock_isfile, mock_find_path,
mock_might_contain_dag, mock_zipfile
+ ):
+ """
+ Test recently updated files are processed even if
min_file_process_interval is not reached
+ """
+ freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
+ initial_file_1_mtime = (freezed_base_time -
timedelta(minutes=5)).timestamp()
+ dag_files = ["file_1.py"]
+ mock_getmtime.side_effect = [initial_file_1_mtime]
+ mock_find_path.return_value = dag_files
+
+ manager = DagFileProcessorManager(
+ dag_directory='directory',
+ max_runs=3,
+ processor_factory=MagicMock().return_value,
+ processor_timeout=timedelta.max,
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ # let's say the DAG was just parsed 2 seconds before the Freezed time
+ last_finish_time = freezed_base_time - timedelta(seconds=10)
+ manager._file_stats = {
+ "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+ }
+ with freeze_time(freezed_base_time):
+ manager.set_file_paths(dag_files)
+ assert manager._file_path_queue == []
+ # File Path Queue will be empty as the "modified time" < "last
finish time"
+ manager.prepare_file_path_queue()
+ assert manager._file_path_queue == []
+
+ # Simulate the DAG modification by using modified_time which is greater
+ # than the last_parse_time but still less than now -
min_file_process_interval
+ file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
+ file_1_new_mtime_ts = file_1_new_mtime.timestamp()
+ with freeze_time(freezed_base_time):
+ manager.set_file_paths(dag_files)
+ assert manager._file_path_queue == []
+ # File Path Queue will be empty as the "modified time" < "last
finish time"
+ mock_getmtime.side_effect = [file_1_new_mtime_ts]
+ manager.prepare_file_path_queue()
+ # Check that file is added to the queue even though file was just
recently passed
+ assert manager._file_path_queue == ["file_1.py"]
+ assert last_finish_time < file_1_new_mtime
+ assert (
+ manager._file_process_interval
+ > (freezed_base_time -
manager.get_last_finish_time("file_1.py")).total_seconds()
+ )
+
def test_find_zombies(self):
manager = DagFileProcessorManager(
dag_directory='directory',