This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3f6d9b6 Handle case of nonexistent file when preparing file path
queue (#18998)
3f6d9b6 is described below
commit 3f6d9b6e3421ca36c2320e4ee1c63c71ca0cb85e
Author: Sam Wheating <[email protected]>
AuthorDate: Thu Oct 14 19:10:32 2021 -0700
Handle case of nonexistent file when preparing file path queue (#18998)
---
airflow/dag_processing/manager.py | 6 +++++-
tests/dag_processing/test_manager.py | 28 ++++++++++++++++++++++++++++
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 64f8609..19a97ff 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -975,7 +975,11 @@ class DagFileProcessorManager(LoggingMixin):
for file_path in self._file_paths:
if is_mtime_mode:
- files_with_mtime[file_path] = os.path.getmtime(file_path)
+ try:
+ files_with_mtime[file_path] = os.path.getmtime(file_path)
+ except FileNotFoundError:
+ self.log.warning("Skipping processing of missing file:
%s", file_path)
+ continue
file_modified_time =
timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
file_paths.append(file_path)
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 1dbd1c9..43038092 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -372,6 +372,34 @@ class TestDagFileProcessorManager:
@mock.patch("airflow.utils.file.find_path_from_directory",
return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
+ def test_file_paths_in_queue_excludes_missing_file(
+ self, mock_getmtime, mock_isfile, mock_find_path,
mock_might_contain_dag, mock_zipfile
+ ):
+ """Check that a file is not enqueued for processing if it has been
deleted"""
+ dag_files = ["file_3.py", "file_2.py", "file_4.py"]
+ mock_getmtime.side_effect = [1.0, 2.0, FileNotFoundError()]
+ mock_find_path.return_value = dag_files
+
+ manager = DagFileProcessorManager(
+ dag_directory='directory',
+ max_runs=1,
+ processor_timeout=timedelta.max,
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ manager.set_file_paths(dag_files)
+ manager.prepare_file_path_queue()
+ assert manager._file_path_queue == ['file_2.py', 'file_3.py']
+
+ @conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
+ @mock.patch("zipfile.is_zipfile", return_value=True)
+ @mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
+ @mock.patch("airflow.utils.file.find_path_from_directory",
return_value=True)
+ @mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
+ @mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(
self, mock_getmtime, mock_isfile, mock_find_path,
mock_might_contain_dag, mock_zipfile
):