This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48029203f7d fix: sanitize Dag processor metric file names (#67029)
48029203f7d is described below
commit 48029203f7d44d8d8d36f8a50ab6273ba8ae01e3
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun Jun 7 18:21:57 2026 +0530
fix: sanitize Dag processor metric file names (#67029)
* fix: sanitize Dag processor metric file names
* Apply suggestion from @SameerMesiah97
Co-authored-by: SameerMesiah97
<[email protected]>
* Sanitize Dag processor file path stats tags
* Trigger CI rerun
---------
Co-authored-by: Anmol Mishra <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
Co-authored-by: SameerMesiah97
<[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 27 +++++---
.../tests/unit/dag_processing/test_manager.py | 75 ++++++++++++++++++++--
2 files changed, 88 insertions(+), 14 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 37f0496a89c..783de174758 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -142,6 +142,11 @@ class DagFileInfo:
"""Return the stable file identity used for presence checks."""
return self.bundle_name, self.rel_path
+ @property
+ def normalized_file_path_for_stats(self) -> str:
+ """Return the relative file path normalized for use in stats tags."""
+ return normalize_name_for_stats(str(self.rel_path))
+
def _config_int_factory(section: str, key: str):
return functools.partial(conf.getint, section, key)
@@ -1011,7 +1016,7 @@ class DagFileProcessorManager(LoggingMixin):
proc = self._processors.get(file)
num_dags = stat.num_dags
num_errors = stat.import_errors
- file_name = Path(file.rel_path).stem
+ file_name = normalize_name_for_stats(Path(file.rel_path).stem)
processor_pid = proc.pid if proc else None
processor_start_time = proc.start_time if proc else None
runtime = (now - processor_start_time) if processor_start_time
else None
@@ -1116,7 +1121,10 @@ class DagFileProcessorManager(LoggingMixin):
continue
file_name = str(file.rel_path)
self.log.warning("Stopping processor for %s", file_name)
- stats.decr("dag_processing.processes", tags={"file_path":
file_name, "action": "stop"})
+ stats.decr(
+ "dag_processing.processes",
+ tags={"file_path": file.normalized_file_path_for_stats,
"action": "stop"},
+ )
processor.kill(signal.SIGKILL)
processor.logger_filehandle.close()
self._file_stats.pop(file, None)
@@ -1335,7 +1343,10 @@ class DagFileProcessorManager(LoggingMixin):
continue
processor = self._create_process(file)
- stats.incr("dag_processing.processes", tags={"file_path":
str(file.rel_path), "action": "start"})
+ stats.incr(
+ "dag_processing.processes",
+ tags={"file_path": file.normalized_file_path_for_stats,
"action": "start"},
+ )
self._processors[file] = processor
stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
@@ -1510,9 +1521,9 @@ class DagFileProcessorManager(LoggingMixin):
duration,
self.processor_timeout,
)
- file_name = str(file.rel_path)
- stats.decr("dag_processing.processes", tags={"file_path":
file_name, "action": "timeout"})
- stats.incr("dag_processing.processor_timeouts",
tags={"file_path": file_name})
+ file_path_tag = file.normalized_file_path_for_stats
+ stats.decr("dag_processing.processes", tags={"file_path":
file_path_tag, "action": "timeout"})
+ stats.incr("dag_processing.processor_timeouts",
tags={"file_path": file_path_tag})
processor.kill(signal.SIGKILL)
processors_to_remove.append(file)
@@ -1574,9 +1585,9 @@ class DagFileProcessorManager(LoggingMixin):
def terminate(self):
"""Stop all running processors."""
for file, processor in self._processors.items():
- # todo: AIP-66 what to do about file_path tag? replace with bundle
name and rel path?
stats.decr(
- "dag_processing.processes", tags={"file_path":
str(file.rel_path), "action": "terminate"}
+ "dag_processing.processes",
+ tags={"file_path": file.normalized_file_path_for_stats,
"action": "terminate"},
)
# SIGTERM, wait 5s, SIGKILL if still alive
processor.kill(signal.SIGTERM, escalation_delay=5.0)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 5b6fc1608e2..c0c42fae1d0 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -371,16 +371,26 @@ class TestDagFileProcessorManager:
manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
file_1 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
- file_2 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
+ file_2 = DagFileInfo(
+ bundle_name="testing", rel_path=Path("folder/file 2.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
file_3 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
manager._file_queue = OrderedDict.fromkeys([file_1, file_2, file_3])
# Mock that only one processor exists. This processor runs with
'file_1'
manager._processors[file_1] = MagicMock()
# Start New Processes
- with mock.patch.object(DagFileProcessorManager, "_create_process"):
+ with (
+ mock.patch.object(DagFileProcessorManager, "_create_process"),
+ mock.patch("airflow.dag_processing.manager.stats.incr") as
stats_incr_mock,
+ ):
manager._start_new_processes()
+ stats_incr_mock.assert_called_once_with(
+ "dag_processing.processes",
+ tags={"file_path": "folder_file_2.py", "action": "start"},
+ )
+
# Because of the config: '[dag_processor] parsing_processes = 2'
# verify that only one extra process is created
# and since a processor with 'file_1' already exists,
@@ -472,14 +482,19 @@ class TestDagFileProcessorManager:
def
test_terminate_orphan_processes_kills_processor_when_file_is_truly_absent(self):
manager = DagFileProcessorManager(max_runs=1)
- versioned_file = _get_versioned_file_info("callbacks.py")
+ versioned_file = _get_versioned_file_info("callbacks with spaces.py")
processor = MagicMock()
manager._processors[versioned_file] = processor
- manager.terminate_orphan_processes(present=set())
+ with mock.patch("airflow.dag_processing.manager.stats.decr") as
stats_decr_mock:
+ manager.terminate_orphan_processes(present=set())
assert manager._processors == {}
+ stats_decr_mock.assert_called_once_with(
+ "dag_processing.processes",
+ tags={"file_path": "callbacks_with_spaces.py", "action": "stop"},
+ )
processor.kill.assert_called_once_with(signal.SIGKILL)
def
test_remove_orphaned_file_stats_keeps_versioned_callback_stats_when_unversioned_file_is_present(self):
@@ -1149,12 +1164,24 @@ class TestDagFileProcessorManager:
processor, _ = self.mock_processor(start_time=start_time)
manager._processors = {
DagFileInfo(
- bundle_name="testing", rel_path=Path("abc.txt"),
bundle_path=TEST_DAGS_FOLDER
+ bundle_name="testing", rel_path=Path("folder/abc txt.py"),
bundle_path=TEST_DAGS_FOLDER
): processor
}
- with mock.patch.object(type(processor), "kill") as mock_kill:
+ with (
+ mock.patch.object(type(processor), "kill") as mock_kill,
+ mock.patch("airflow.dag_processing.manager.stats.decr") as
stats_decr_mock,
+ mock.patch("airflow.dag_processing.manager.stats.incr") as
stats_incr_mock,
+ ):
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with(signal.SIGKILL)
+ stats_decr_mock.assert_called_once_with(
+ "dag_processing.processes",
+ tags={"file_path": "folder_abc_txt.py", "action": "timeout"},
+ )
+ stats_incr_mock.assert_called_once_with(
+ "dag_processing.processor_timeouts",
+ tags={"file_path": "folder_abc_txt.py"},
+ )
assert len(manager._processors) == 0
processor.logger_filehandle.close.assert_called()
@@ -1175,6 +1202,23 @@ class TestDagFileProcessorManager:
manager._kill_timed_out_processors()
mock_kill.assert_not_called()
+ def test_terminate_normalizes_file_path_stats_tag(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing", rel_path=Path("folder/abc txt.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+ processor = MagicMock()
+ manager._processors[dag_file] = processor
+
+ with mock.patch("airflow.dag_processing.manager.stats.decr") as
stats_decr_mock:
+ manager.terminate()
+
+ stats_decr_mock.assert_called_once_with(
+ "dag_processing.processes",
+ tags={"file_path": "folder_abc_txt.py", "action": "terminate"},
+ )
+ processor.kill.assert_called_once_with(signal.SIGTERM,
escalation_delay=5.0)
+
def
test_handle_parsing_result_provides_its_own_session_when_caller_omits(self):
"""``handle_parsing_result`` is wrapped in ``@provide_session`` so
subclasses overriding it can run without a caller-supplied session."""
manager = DagFileProcessorManager(max_runs=1)
@@ -1416,6 +1460,25 @@ class TestDagFileProcessorManager:
tags={"bundle_name": bundle_name, "file_name": dag_filename[:-3]},
)
+ @mock.patch("airflow.dag_processing.manager.stats.gauge")
+ def test_log_file_processing_stats_normalizes_metric_name(self,
statsd_gauge_mock):
+ manager = DagFileProcessorManager(max_runs=1)
+ dag_file = DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("test_of sprak_opertaor.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ )
+ manager._file_stats[dag_file] = DagFileStat(
+ last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+ )
+
+ manager._log_file_processing_stats({"testing": {dag_file}})
+
+ statsd_gauge_mock.assert_any_call(
+ "dag_processing.last_run.seconds_ago.test_of_sprak_opertaor",
+ mock.ANY,
+ )
+
@pytest.mark.usefixtures("testing_dag_bundle")
def test_refresh_dags_dir_doesnt_delete_zipped_dags(
self, tmp_path, session, configure_testing_dag_bundle, test_zip_path