This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 32f5953 Stop creating duplicate Dag File Processors (#13662)
32f5953 is described below
commit 32f59534cbdb8188e4c8f49d7dfbb4b915eaeb4d
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Jan 15 16:40:20 2021 +0000
Stop creating duplicate Dag File Processors (#13662)
When a dag file is executed via Dag File Processors and multiple callbacks
are
created either via zombies or executor events, the dag file is added to
the _file_path_queue and the manager will launch a new process to
process it, which it should not since the dag file is currently under
processing. This will bypass the _parallelism eventually especially when
it takes a long time to process some dag files and since self._processors
is just a dict with file path as the key. So multiple processors with the
same key
count as one and hence parallelism is bypassed.
This address the same issue as https://github.com/apache/airflow/pull/11875
but instead does not exclude file paths that are recently processed and that
run at the limit (which is only used in tests) when Callbacks are sent by
the
Agent. This is by design as the execution of Callbacks is critical. This is
done
with a caveat to avoid duplicate processor -- i.e. if a processor exists,
the file path is removed from the queue. This means that the processor with
the file path to run callback will be still run when the file path is added
again in the
next loop
Tests are added to check the same.
closes https://github.com/apache/airflow/issues/13047
closes https://github.com/apache/airflow/pull/11875
---
airflow/utils/dag_processing.py | 11 ++++++++++-
tests/utils/test_dag_processing.py | 39 ++++++++++++++++++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index e66803a..7e98c11 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -714,7 +714,12 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
self._callback_to_execute[request.full_filepath].append(request)
# Callback has a higher priority over DAG Run scheduling
if request.full_filepath in self._file_path_queue:
- self._file_path_queue.remove(request.full_filepath)
+ # Remove file paths matching request.full_filepath from
self._file_path_queue
+ # Since we are already going to use that filepath to run callback,
+ # there is no need to have same file path again in the queue
+ self._file_path_queue = [
+ file_path for file_path in self._file_path_queue if file_path
!= request.full_filepath
+ ]
self._file_path_queue.insert(0, request.full_filepath)
def _refresh_dag_dir(self):
@@ -988,6 +993,10 @@ class DagFileProcessorManager(LoggingMixin): # pylint:
disable=too-many-instanc
"""Start more processors if we have enough slots and files to
process"""
while self._parallelism - len(self._processors) > 0 and
self._file_path_queue:
file_path = self._file_path_queue.pop(0)
+ # Stop creating duplicate processor i.e. processor with the same
filepath
+ if file_path in self._processors.keys():
+ continue
+
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._processor_factory(
file_path, callback_to_execute_for_file, self._dag_ids,
self._pickle_dags
diff --git a/tests/utils/test_dag_processing.py
b/tests/utils/test_dag_processing.py
index ffcf00e..ad8ef5a 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -142,6 +142,45 @@ class TestDagFileProcessorManager(unittest.TestCase):
child_pipe.close()
parent_pipe.close()
+ @pytest.mark.backend("mysql", "postgres")
+ def test_start_new_processes_with_same_filepath(self):
+ """
+ Test that when a processor already exist with a filepath, a new
processor won't be created
+ with that filepath. The filepath will just be removed from the list.
+ """
+ processor_factory_mock = MagicMock()
+ manager = DagFileProcessorManager(
+ dag_directory='directory',
+ max_runs=1,
+ processor_factory=processor_factory_mock,
+ processor_timeout=timedelta.max,
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ file_1 = 'file_1.py'
+ file_2 = 'file_2.py'
+ file_3 = 'file_3.py'
+ manager._file_path_queue = [file_1, file_2, file_3]
+
+ # Mock that only one processor exists. This processor runs with
'file_1'
+ manager._processors[file_1] = MagicMock()
+ # Start New Processes
+ manager.start_new_processes()
+
+ # Because of the config: '[scheduler] parsing_processes = 2'
+ # verify that only one extra process is created
+ # and since a processor with 'file_1' already exists,
+ # even though it is first in '_file_path_queue'
+ # a new processor is created with 'file_2' and not 'file_1'.
+ processor_factory_mock.assert_called_once_with('file_2.py', [], [],
False)
+
+ assert file_1 in manager._processors.keys()
+ assert file_2 in manager._processors.keys()
+ assert [file_3] == manager._file_path_queue
+
def
test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
manager = DagFileProcessorManager(
dag_directory='directory',