This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 734f1dc3f45712d4702e09e0e68c76f5a8327f7b
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed May 26 11:29:35 2021 +0100

    Parse recently modified files even if just parsed (#16075)
    
    This commit adds an optimization where the recently modified files
    (detected by mtime) will be parsed even though it has not reached
    `min_file_process_interval`.
    
    This way you can increase `[scheduler] min_file_process_interval` to
    a higher value like `600` or so when you have large number of files to
    avoid unnecessary reparsing if files haven't changed, while still making
    sure that modified files are taken care of.
    
    (cherry picked from commit add7490145fabd097d605d85a662dccd02b600de)
---
 airflow/utils/dag_processing.py    |  8 ++++-
 tests/utils/test_dag_processing.py | 60 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 4b85234..676b610 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -1054,14 +1054,20 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: 
disable=too-many-instanc
 
             if is_mtime_mode:
                 files_with_mtime[file_path] = os.path.getmtime(file_path)
+                file_modified_time = 
timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
             else:
                 file_paths.append(file_path)
+                file_modified_time = None
 
-            # Find file paths that were recently processed
+            # Find file paths that were recently processed to exclude them
+            # from being added to file_path_queue
+            # unless they were modified recently and parsing mode is 
"modified_time"
+            # in which case we don't honor "self._file_process_interval" 
(min_file_process_interval)
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < 
self._file_process_interval
+                and not (is_mtime_mode and file_modified_time and 
(file_modified_time > last_finish_time))
             ):
                 file_paths_recently_processed.append(file_path)
 
diff --git a/tests/utils/test_dag_processing.py 
b/tests/utils/test_dag_processing.py
index 78cd988..3242cf3 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -31,6 +31,7 @@ from unittest import mock
 from unittest.mock import MagicMock, PropertyMock
 
 import pytest
+from freezegun import freeze_time
 
 from airflow.configuration import conf
 from airflow.jobs.local_task_job import LocalTaskJob as LJ
@@ -324,6 +325,65 @@ class TestDagFileProcessorManager(unittest.TestCase):
         manager.prepare_file_path_queue()
         assert manager._file_path_queue == ['file_4.py', 'file_1.py', 
'file_3.py', 'file_2.py']
 
+    @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
+    @mock.patch("zipfile.is_zipfile", return_value=True)
+    @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
+    @mock.patch("airflow.utils.file.find_path_from_directory", 
return_value=True)
+    @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
+    @mock.patch("airflow.utils.file.os.path.getmtime")
+    def test_recently_modified_file_is_parsed_with_mtime_mode(
+        self, mock_getmtime, mock_isfile, mock_find_path, 
mock_might_contain_dag, mock_zipfile
+    ):
+        """
+        Test recently updated files are processed even if 
min_file_process_interval is not reached
+        """
+        freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
+        initial_file_1_mtime = (freezed_base_time - 
timedelta(minutes=5)).timestamp()
+        dag_files = ["file_1.py"]
+        mock_getmtime.side_effect = [initial_file_1_mtime]
+        mock_find_path.return_value = dag_files
+
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=3,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        # let's say the DAG was just parsed 2 seconds before the Freezed time
+        last_finish_time = freezed_base_time - timedelta(seconds=10)
+        manager._file_stats = {
+            "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+        }
+        with freeze_time(freezed_base_time):
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == []
+            # File Path Queue will be empty as the "modified time" < "last 
finish time"
+            manager.prepare_file_path_queue()
+            assert manager._file_path_queue == []
+
+        # Simulate the DAG modification by using modified_time which is greater
+        # than the last_parse_time but still less than now - 
min_file_process_interval
+        file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
+        file_1_new_mtime_ts = file_1_new_mtime.timestamp()
+        with freeze_time(freezed_base_time):
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == []
+            # File Path Queue will be empty as the "modified time" < "last 
finish time"
+            mock_getmtime.side_effect = [file_1_new_mtime_ts]
+            manager.prepare_file_path_queue()
+            # Check that file is added to the queue even though file was just 
recently passed
+            assert manager._file_path_queue == ["file_1.py"]
+            assert last_finish_time < file_1_new_mtime
+            assert (
+                manager._file_process_interval
+                > (freezed_base_time - 
manager.get_last_finish_time("file_1.py")).total_seconds()
+            )
+
     def test_find_zombies(self):
         manager = DagFileProcessorManager(
             dag_directory='directory',

Reply via email to