This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9aa2cea9534 Remove unused `dag_ids` argument to DagFile processor
classes. (#44845)
9aa2cea9534 is described below
commit 9aa2cea953480ef69f950d6d82088ad81b509287
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Wed Dec 11 17:52:28 2024 +0000
Remove unused `dag_ids` argument to DagFile processor classes. (#44845)
This hasn't been possible to set in a while, (like since sometime before
2.0,
possibly even before 1.8) and the doc string gives a clue to the behaviour:
only to schedule certain dags, but that is not the job of the dag processor
and hasn't been involved in that flow since 2.0.
Time to go.
---
.../local_commands/dag_processor_command.py | 1 -
airflow/dag_processing/manager.py | 20 +-----
airflow/dag_processing/processor.py | 13 +---
airflow/jobs/scheduler_job_runner.py | 1 -
tests/dag_processing/test_manager.py | 74 ++++++----------------
tests/dag_processing/test_processor.py | 26 ++------
tests/listeners/test_dag_import_error_listener.py | 4 +-
7 files changed, 31 insertions(+), 108 deletions(-)
diff --git a/airflow/cli/commands/local_commands/dag_processor_command.py
b/airflow/cli/commands/local_commands/dag_processor_command.py
index 03513df17a0..f0c3bc5060c 100644
--- a/airflow/cli/commands/local_commands/dag_processor_command.py
+++ b/airflow/cli/commands/local_commands/dag_processor_command.py
@@ -43,7 +43,6 @@ def _create_dag_processor_job_runner(args: Any) ->
DagProcessorJobRunner:
processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
- dag_ids=[],
),
)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 7d9c9298a99..f60377d4966 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -116,7 +116,6 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
:param max_runs: The number of times to parse and schedule each file. -1
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file
processor
- :param dag_ids: if specified, only schedule tasks with these DAG IDs
"""
def __init__(
@@ -124,13 +123,11 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
dag_directory: os.PathLike,
max_runs: int,
processor_timeout: timedelta,
- dag_ids: list[str] | None,
):
super().__init__()
self._dag_directory: os.PathLike = dag_directory
self._max_runs = max_runs
self._processor_timeout = processor_timeout
- self._dag_ids = dag_ids
# Map from file path to the processor
self._processors: dict[str, DagFileProcessorProcess] = {}
# Pipe for communicating signals
@@ -156,7 +153,6 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
self._max_runs,
self._processor_timeout,
child_signal_conn,
- self._dag_ids,
),
)
self._process = process
@@ -177,26 +173,19 @@ class DagFileProcessorAgent(LoggingMixin,
MultiprocessingStartMethodMixin):
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
- dag_ids: list[str] | None,
) -> None:
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
set_new_process_group()
span = Trace.get_current_span()
- span.set_attributes(
- {
- "dag_directory": str(dag_directory),
- "dag_ids": str(dag_ids),
- }
- )
+ span.set_attribute("dag_directory", str(dag_directory))
setproctitle("airflow scheduler -- DagFileProcessorManager")
reload_configuration_for_dag_processing()
processor_manager = DagFileProcessorManager(
dag_directory=dag_directory,
max_runs=max_runs,
processor_timeout=processor_timeout,
- dag_ids=dag_ids,
signal_conn=signal_conn,
)
processor_manager.start()
@@ -307,7 +296,6 @@ class DagFileProcessorManager(LoggingMixin):
for unlimited.
:param processor_timeout: How long to wait before timing out a DAG file
processor
:param signal_conn: connection to communicate signal with processor agent.
- :param dag_ids: if specified, only schedule tasks with these DAG IDs
"""
def __init__(
@@ -315,7 +303,6 @@ class DagFileProcessorManager(LoggingMixin):
dag_directory: os.PathLike[str],
max_runs: int,
processor_timeout: timedelta,
- dag_ids: list[str] | None,
signal_conn: MultiprocessingConnection | None = None,
):
super().__init__()
@@ -325,7 +312,6 @@ class DagFileProcessorManager(LoggingMixin):
self._max_runs = max_runs
# signal_conn is None for dag_processor_standalone mode.
self._direct_scheduler_conn = signal_conn
- self._dag_ids = dag_ids
self._parsing_start_time: float | None = None
self._dag_directory = dag_directory
# Set the signal conn in to non-blocking mode, so that attempting to
@@ -1001,11 +987,10 @@ class DagFileProcessorManager(LoggingMixin):
self.log.debug("%s file paths queued for processing",
len(self._file_path_queue))
@staticmethod
- def _create_process(file_path, dag_ids, dag_directory, callback_requests):
+ def _create_process(file_path, dag_directory, callback_requests):
"""Create DagFileProcessorProcess instance."""
return DagFileProcessorProcess(
file_path=file_path,
- dag_ids=dag_ids,
dag_directory=dag_directory,
callback_requests=callback_requests,
)
@@ -1026,7 +1011,6 @@ class DagFileProcessorManager(LoggingMixin):
callback_to_execute_for_file = self._callback_to_execute[file_path]
processor = self._create_process(
file_path,
- self._dag_ids,
self.get_dag_directory(),
callback_to_execute_for_file,
)
diff --git a/airflow/dag_processing/processor.py
b/airflow/dag_processing/processor.py
index 840c17300f5..b3e6ff770b8 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -92,7 +92,6 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
Runs DAG processing in a separate process using DagFileProcessor.
:param file_path: a Python file containing Airflow DAG definitions
- :param dag_ids: If specified, only look at these DAG ID's
:param callback_requests: failure callback to execute
"""
@@ -102,13 +101,11 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
def __init__(
self,
file_path: str,
- dag_ids: list[str] | None,
dag_directory: str,
callback_requests: list[CallbackRequest],
):
super().__init__()
self._file_path = file_path
- self._dag_ids = dag_ids
self._dag_directory = dag_directory
self._callback_requests = callback_requests
@@ -136,7 +133,6 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
result_channel: MultiprocessingConnection,
parent_channel: MultiprocessingConnection,
file_path: str,
- dag_ids: list[str] | None,
thread_name: str,
dag_directory: str,
callback_requests: list[CallbackRequest],
@@ -147,8 +143,6 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
:param result_channel: the connection to use for passing back the
result
:param parent_channel: the parent end of the channel to close in the
child
:param file_path: the file to process
- :param dag_ids: if specified, only examine DAG ID's that are
- in this list
:param thread_name: the name to use for the process that is launched
:param callback_requests: failure callback to execute
:return: the process that was launched
@@ -174,7 +168,7 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
threading.current_thread().name = thread_name
log.info("Started process (PID=%s) to work on %s", os.getpid(),
file_path)
- dag_file_processor = DagFileProcessor(dag_ids=dag_ids,
dag_directory=dag_directory, log=log)
+ dag_file_processor = DagFileProcessor(dag_directory=dag_directory,
log=log)
result: tuple[int, int, int] = dag_file_processor.process_file(
file_path=file_path,
callback_requests=callback_requests,
@@ -241,7 +235,6 @@ class DagFileProcessorProcess(LoggingMixin,
MultiprocessingStartMethodMixin):
_child_channel,
_parent_channel,
self.file_path,
- self._dag_ids,
f"DagFileProcessor{self._instance_id}",
self._dag_directory,
self._callback_requests,
@@ -415,15 +408,13 @@ class DagFileProcessor(LoggingMixin):
Returns a tuple of 'number of dags found' and 'the count of import errors'
- :param dag_ids: If specified, only look at these DAG ID's
:param log: Logger to save the processing process
"""
UNIT_TEST_MODE: bool = conf.getboolean("core", "UNIT_TEST_MODE")
- def __init__(self, dag_ids: list[str] | None, dag_directory: str, log:
logging.Logger):
+ def __init__(self, dag_directory: str, log: logging.Logger):
super().__init__()
- self.dag_ids = dag_ids
self._log = log
self._dag_directory = dag_directory
self.dag_warnings: set[tuple[str, str]] = set()
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 21fa41aa2c5..0254b417a71 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -934,7 +934,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_directory=Path(self.subdir),
max_runs=self.num_times_parse_dags,
processor_timeout=processor_timeout,
- dag_ids=[],
)
reset_signals = self.register_signals()
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 4a338e164d6..fedc15a7437 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -80,8 +80,8 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
class FakeDagFileProcessorRunner(DagFileProcessorProcess):
# This fake processor will return the zombies it received in constructor
# as its processing result w/o actually parsing anything.
- def __init__(self, file_path, dag_ids, dag_directory, callbacks):
- super().__init__(file_path, dag_ids, dag_directory, callbacks)
+ def __init__(self, file_path, dag_directory, callbacks):
+ super().__init__(file_path, dag_directory, callbacks)
# We need a "real" selectable handle for waitable_handle to work
readable, writable = multiprocessing.Pipe(duplex=False)
writable.send("abc")
@@ -109,10 +109,9 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess):
return self._result
@staticmethod
- def _create_process(file_path, callback_requests, dag_ids, dag_directory):
+ def _create_process(file_path, callback_requests, dag_directory):
return FakeDagFileProcessorRunner(
file_path,
- dag_ids,
dag_directory,
callback_requests,
)
@@ -169,7 +168,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
with create_session() as session:
@@ -199,7 +197,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -217,7 +214,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
file_1 = "file_1.py"
@@ -246,7 +242,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
mock_processor = MagicMock()
@@ -266,7 +261,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
mock_processor = MagicMock()
@@ -295,7 +289,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -320,7 +313,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -380,7 +372,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -413,7 +404,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -445,7 +435,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -486,7 +475,6 @@ class TestDagProcessorJobRunner:
max_runs=3,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
# let's say the DAG was just parsed 10 seconds before the Freezed time
@@ -542,7 +530,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.set_file_paths(dag_files)
@@ -563,7 +550,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
- dag_ids=[],
)
test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -630,7 +616,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
- dag_ids=[],
)
test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
@@ -682,12 +667,10 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(seconds=5),
signal_conn=MagicMock(),
- dag_ids=[],
)
processor = DagFileProcessorProcess(
file_path="abc.txt",
- dag_ids=[],
dag_directory=TEST_DAG_FOLDER,
callback_requests=[],
)
@@ -709,12 +692,10 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(seconds=5),
signal_conn=MagicMock(),
- dag_ids=[],
)
processor = DagFileProcessorProcess(
file_path="abc.txt",
- dag_ids=[],
dag_directory=str(TEST_DAG_FOLDER),
callback_requests=[],
)
@@ -741,7 +722,6 @@ class TestDagProcessorJobRunner:
manager = DagFileProcessorManager(
dag_directory=dag_directory,
- dag_ids=[],
max_runs=1,
processor_timeout=timedelta(seconds=5),
signal_conn=child_pipe,
@@ -782,7 +762,6 @@ class TestDagProcessorJobRunner:
manager = DagFileProcessorManager(
dag_directory=processor_dir_1,
- dag_ids=[],
max_runs=1,
signal_conn=child_pipe,
processor_timeout=timedelta(seconds=5),
@@ -798,7 +777,6 @@ class TestDagProcessorJobRunner:
manager = DagFileProcessorManager(
dag_directory=processor_dir_2,
- dag_ids=[],
max_runs=1,
signal_conn=child_pipe,
processor_timeout=timedelta(seconds=5),
@@ -863,7 +841,6 @@ class TestDagProcessorJobRunner:
manager = DagFileProcessorManager(
dag_directory=dag_filepath,
- dag_ids=[],
# A reasonable large number to ensure that we trigger the deadlock
max_runs=100,
processor_timeout=timedelta(seconds=5),
@@ -904,7 +881,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
self.run_processor_manager_one_loop(manager, parent_pipe)
@@ -933,7 +909,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -957,7 +932,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
dagbag = DagBag(dag_folder=tmp_path, include_examples=False)
zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
@@ -999,7 +973,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
manager.last_dag_dir_refresh_time = timezone.utcnow() -
timedelta(minutes=10)
@@ -1044,7 +1017,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
with create_session() as session:
@@ -1086,7 +1058,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
with create_session() as session:
@@ -1121,7 +1092,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
with create_session() as session:
@@ -1157,7 +1127,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=child_pipe,
- dag_ids=[],
)
with create_session() as session:
@@ -1175,7 +1144,6 @@ class TestDagProcessorJobRunner:
max_runs=1,
processor_timeout=timedelta(days=365),
signal_conn=MagicMock(),
- dag_ids=[],
)
dag1_req1 = DagCallbackRequest(
@@ -1273,7 +1241,7 @@ class TestDagFileProcessorAgent:
os.remove(log_file_loc)
# Starting dag processing with 0 max_runs to avoid redundant
operations.
- processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365))
processor_agent.start()
processor_agent._process.join()
@@ -1288,7 +1256,7 @@ class TestDagFileProcessorAgent:
clear_db_dags()
test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
- processor_agent = DagFileProcessorAgent(test_dag_path, 1,
timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent(test_dag_path, 1,
timedelta(days=365))
processor_agent.start()
while not processor_agent.done:
processor_agent.heartbeat()
@@ -1311,7 +1279,7 @@ class TestDagFileProcessorAgent:
os.remove(log_file_loc)
# Starting dag processing with 0 max_runs to avoid redundant
operations.
- processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365))
processor_agent.start()
processor_agent._process.join()
@@ -1319,25 +1287,25 @@ class TestDagFileProcessorAgent:
assert os.path.isfile(log_file_loc)
def test_get_callbacks_pipe(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
retval = processor_agent.get_callbacks_pipe()
assert retval == processor_agent._parent_signal_conn
def test_get_callbacks_pipe_no_parent_signal_conn(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
processor_agent.get_callbacks_pipe()
def test_heartbeat_no_parent_signal_conn(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
processor_agent.heartbeat()
def test_heartbeat_poll_eof_error(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._parent_signal_conn.poll.return_value = True
processor_agent._parent_signal_conn.recv = Mock()
@@ -1346,7 +1314,7 @@ class TestDagFileProcessorAgent:
assert ret_val is None
def test_heartbeat_poll_connection_error(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._parent_signal_conn.poll.return_value = True
processor_agent._parent_signal_conn.recv = Mock()
@@ -1355,7 +1323,7 @@ class TestDagFileProcessorAgent:
assert ret_val is None
def test_heartbeat_poll_process_message(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._parent_signal_conn.poll.side_effect = [True, False]
processor_agent._parent_signal_conn.recv = Mock()
@@ -1366,19 +1334,19 @@ class TestDagFileProcessorAgent:
def test_process_message_invalid_type(self):
message = "xyz"
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
with pytest.raises(RuntimeError, match="Unexpected message received of
type str"):
processor_agent._process_message(message)
def test_heartbeat_manager(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = None
with pytest.raises(ValueError, match="Process not started"):
processor_agent._heartbeat_manager()
@mock.patch("airflow.utils.process_utils.reap_process_group")
def test_heartbeat_manager_process_restart(self, mock_pg):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._process = MagicMock()
processor_agent.start = Mock()
@@ -1392,7 +1360,7 @@ class TestDagFileProcessorAgent:
@mock.patch("time.monotonic")
@mock.patch("airflow.dag_processing.manager.reap_process_group")
def test_heartbeat_manager_process_reap(self, mock_pg,
mock_time_monotonic, mock_stats):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._process = Mock()
processor_agent._process.pid = 12345
@@ -1413,7 +1381,7 @@ class TestDagFileProcessorAgent:
processor_agent.start.assert_called()
def test_heartbeat_manager_terminate(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._parent_signal_conn = Mock()
processor_agent._process = Mock()
processor_agent._process.is_alive.return_value = True
@@ -1423,7 +1391,7 @@ class TestDagFileProcessorAgent:
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
def test_heartbeat_manager_terminate_conn_err(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._process = Mock()
processor_agent._process.is_alive.return_value = True
processor_agent._parent_signal_conn = Mock()
@@ -1434,7 +1402,7 @@ class TestDagFileProcessorAgent:
processor_agent._parent_signal_conn.send.assert_called_with(DagParsingSignal.TERMINATE_MANAGER)
def test_heartbeat_manager_end_no_process(self):
- processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent("", 1, timedelta(days=365))
processor_agent._process = Mock()
processor_agent._process.__bool__ = Mock(return_value=False)
processor_agent._process.side_effect = [None]
@@ -1449,7 +1417,7 @@ class TestDagFileProcessorAgent:
test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
# Starting dag processing with 0 max_runs to avoid redundant
operations.
- processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365))
processor_agent.start()
processor_agent._process.join()
@@ -1464,7 +1432,7 @@ class TestDagFileProcessorAgent:
test_dag_path = TEST_DAG_FOLDER / "test_scheduler_dags.py"
# Starting dag processing with 0 max_runs to avoid redundant
operations.
- processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365), [])
+ processor_agent = DagFileProcessorAgent(test_dag_path, 0,
timedelta(days=365))
processor_agent.start()
processor_agent._process.join()
diff --git a/tests/dag_processing/test_processor.py
b/tests/dag_processing/test_processor.py
index c2962ea0411..b23cd44f959 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -107,18 +107,14 @@ class TestDagFileProcessor:
self.clean_db()
def _process_file(self, file_path, dag_directory, session):
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
- )
+ dag_file_processor =
DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock())
dag_file_processor.process_file(file_path, [])
@patch.object(TaskInstance, "handle_failure")
def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
- )
+ dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
with create_session() as session:
session.query(TaskInstance).delete()
dag = dagbag.get_dag("example_branch_operator")
@@ -152,9 +148,7 @@ class TestDagFileProcessor:
@patch.object(TaskInstance, "handle_failure")
def test_execute_on_failure_callbacks_without_dag(self,
mock_ti_handle_failure, has_serialized_dag):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
- )
+ dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
with create_session() as session:
session.query(TaskInstance).delete()
dag = dagbag.get_dag("example_branch_operator")
@@ -188,9 +182,7 @@ class TestDagFileProcessor:
def test_failure_callbacks_should_not_drop_hostname(self):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True,
read_dags_from_db=False)
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
- )
+ dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
dag_file_processor.UNIT_TEST_MODE = False
with create_session() as session:
@@ -224,9 +216,7 @@ class TestDagFileProcessor:
callback_file = tmp_path.joinpath("callback.txt")
callback_file.touch()
monkeypatch.setenv("AIRFLOW_CALLBACK_FILE", str(callback_file))
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=TEST_DAGS_FOLDER, log=mock.MagicMock()
- )
+ dag_file_processor = DagFileProcessor(dag_directory=TEST_DAGS_FOLDER,
log=mock.MagicMock())
dag = get_test_dag("test_on_failure_callback")
task = dag.get_task(task_id="test_on_failure_callback_task")
@@ -576,7 +566,6 @@ class TestDagFileProcessor:
def test_dag_parser_output_when_logging_to_stdout(self,
mock_redirect_stdout_for_file):
processor = DagFileProcessorProcess(
file_path="abc.txt",
- dag_ids=[],
dag_directory=[],
callback_requests=[],
)
@@ -584,7 +573,6 @@ class TestDagFileProcessor:
result_channel=MagicMock(),
parent_channel=MagicMock(),
file_path="fake_file_path",
- dag_ids=[],
thread_name="fake_thread_name",
callback_requests=[],
dag_directory=[],
@@ -597,7 +585,6 @@ class TestDagFileProcessor:
def test_dag_parser_output_when_logging_to_file(self,
mock_redirect_stdout_for_file):
processor = DagFileProcessorProcess(
file_path="abc.txt",
- dag_ids=[],
dag_directory=[],
callback_requests=[],
)
@@ -605,7 +592,6 @@ class TestDagFileProcessor:
result_channel=MagicMock(),
parent_channel=MagicMock(),
file_path="fake_file_path",
- dag_ids=[],
thread_name="fake_thread_name",
callback_requests=[],
dag_directory=[],
@@ -622,7 +608,6 @@ class TestDagFileProcessor:
processor = DagFileProcessorProcess(
file_path=zip_filename,
- dag_ids=[],
dag_directory=[],
callback_requests=[],
)
@@ -638,7 +623,6 @@ class TestDagFileProcessor:
processor = DagFileProcessorProcess(
file_path=dag_filename,
- dag_ids=[],
dag_directory=[],
callback_requests=[],
)
diff --git a/tests/listeners/test_dag_import_error_listener.py
b/tests/listeners/test_dag_import_error_listener.py
index ff63d141c78..aa085d3cfd7 100644
--- a/tests/listeners/test_dag_import_error_listener.py
+++ b/tests/listeners/test_dag_import_error_listener.py
@@ -95,9 +95,7 @@ class TestDagFileProcessor:
self.clean_db()
def _process_file(self, file_path, dag_directory, session):
- dag_file_processor = DagFileProcessor(
- dag_ids=[], dag_directory=str(dag_directory), log=mock.MagicMock()
- )
+ dag_file_processor =
DagFileProcessor(dag_directory=str(dag_directory), log=mock.MagicMock())
dag_file_processor.process_file(file_path, [])