This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new add7490  Parse recently modified files even if just parsed (#16075)
add7490 is described below

commit add7490145fabd097d605d85a662dccd02b600de
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed May 26 11:29:35 2021 +0100

    Parse recently modified files even if just parsed (#16075)
    
    This commit adds an optimization where the recently modified files
    (detected by mtime) will be parsed even though it has not reached
    `min_file_process_interval`.
    
    This way you can increase `[scheduler] min_file_process_interval` to
    a higher value like `600` or so when you have large number of files to
    avoid unnecessary reparsing if files haven't changed, while still making
    sure that modified files are taken care of.
---
 airflow/utils/dag_processing.py    |  8 ++++-
 tests/utils/test_dag_processing.py | 60 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 4b85234..676b610 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -1054,14 +1054,20 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: 
disable=too-many-instanc
 
             if is_mtime_mode:
                 files_with_mtime[file_path] = os.path.getmtime(file_path)
+                file_modified_time = 
timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
             else:
                 file_paths.append(file_path)
+                file_modified_time = None
 
-            # Find file paths that were recently processed
+            # Find file paths that were recently processed to exclude them
+            # from being added to file_path_queue
+            # unless they were modified recently and parsing mode is 
"modified_time"
+            # in which case we don't honor "self._file_process_interval" 
(min_file_process_interval)
             last_finish_time = self.get_last_finish_time(file_path)
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < 
self._file_process_interval
+                and not (is_mtime_mode and file_modified_time and 
(file_modified_time > last_finish_time))
             ):
                 file_paths_recently_processed.append(file_path)
 
diff --git a/tests/utils/test_dag_processing.py 
b/tests/utils/test_dag_processing.py
index 78cd988..3242cf3 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -31,6 +31,7 @@ from unittest import mock
 from unittest.mock import MagicMock, PropertyMock
 
 import pytest
+from freezegun import freeze_time
 
 from airflow.configuration import conf
 from airflow.jobs.local_task_job import LocalTaskJob as LJ
@@ -324,6 +325,65 @@ class TestDagFileProcessorManager(unittest.TestCase):
         manager.prepare_file_path_queue()
         assert manager._file_path_queue == ['file_4.py', 'file_1.py', 
'file_3.py', 'file_2.py']
 
+    @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
+    @mock.patch("zipfile.is_zipfile", return_value=True)
+    @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
+    @mock.patch("airflow.utils.file.find_path_from_directory", 
return_value=True)
+    @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
+    @mock.patch("airflow.utils.file.os.path.getmtime")
+    def test_recently_modified_file_is_parsed_with_mtime_mode(
+        self, mock_getmtime, mock_isfile, mock_find_path, 
mock_might_contain_dag, mock_zipfile
+    ):
+        """
+        Test recently updated files are processed even if 
min_file_process_interval is not reached
+        """
+        freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
+        initial_file_1_mtime = (freezed_base_time - 
timedelta(minutes=5)).timestamp()
+        dag_files = ["file_1.py"]
+        mock_getmtime.side_effect = [initial_file_1_mtime]
+        mock_find_path.return_value = dag_files
+
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=3,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        # let's say the DAG was just parsed 2 seconds before the Freezed time
+        last_finish_time = freezed_base_time - timedelta(seconds=10)
+        manager._file_stats = {
+            "file_1.py": DagFileStat(1, 0, last_finish_time, 1.0, 1),
+        }
+        with freeze_time(freezed_base_time):
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == []
+            # File Path Queue will be empty as the "modified time" < "last 
finish time"
+            manager.prepare_file_path_queue()
+            assert manager._file_path_queue == []
+
+        # Simulate the DAG modification by using modified_time which is greater
+        # than the last_parse_time but still less than now - 
min_file_process_interval
+        file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
+        file_1_new_mtime_ts = file_1_new_mtime.timestamp()
+        with freeze_time(freezed_base_time):
+            manager.set_file_paths(dag_files)
+            assert manager._file_path_queue == []
+            # File Path Queue will be empty as the "modified time" < "last 
finish time"
+            mock_getmtime.side_effect = [file_1_new_mtime_ts]
+            manager.prepare_file_path_queue()
+            # Check that file is added to the queue even though file was just 
recently passed
+            assert manager._file_path_queue == ["file_1.py"]
+            assert last_finish_time < file_1_new_mtime
+            assert (
+                manager._file_process_interval
+                > (freezed_base_time - 
manager.get_last_finish_time("file_1.py")).total_seconds()
+            )
+
     def test_find_zombies(self):
         manager = DagFileProcessorManager(
             dag_directory='directory',

Reply via email to