This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 48029203f7d fix: sanitize Dag processor metric file names (#67029)
48029203f7d is described below

commit 48029203f7d44d8d8d36f8a50ab6273ba8ae01e3
Author: Anmol Mishra <[email protected]>
AuthorDate: Sun Jun 7 18:21:57 2026 +0530

    fix: sanitize Dag processor metric file names (#67029)
    
    * fix: sanitize Dag processor metric file names
    
    * Apply suggestion from @SameerMesiah97
    
    Co-authored-by: SameerMesiah97 
<[email protected]>
    
    * Sanitize Dag processor file path stats tags
    
    * Trigger CI rerun
    
    ---------
    
    Co-authored-by: Anmol Mishra <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
    Co-authored-by: SameerMesiah97 
<[email protected]>
---
 airflow-core/src/airflow/dag_processing/manager.py | 27 +++++---
 .../tests/unit/dag_processing/test_manager.py      | 75 ++++++++++++++++++++--
 2 files changed, 88 insertions(+), 14 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 37f0496a89c..783de174758 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -142,6 +142,11 @@ class DagFileInfo:
         """Return the stable file identity used for presence checks."""
         return self.bundle_name, self.rel_path
 
+    @property
+    def normalized_file_path_for_stats(self) -> str:
+        """Return the relative file path normalized for use in stats tags."""
+        return normalize_name_for_stats(str(self.rel_path))
+
 
 def _config_int_factory(section: str, key: str):
     return functools.partial(conf.getint, section, key)
@@ -1011,7 +1016,7 @@ class DagFileProcessorManager(LoggingMixin):
                 proc = self._processors.get(file)
                 num_dags = stat.num_dags
                 num_errors = stat.import_errors
-                file_name = Path(file.rel_path).stem
+                file_name = normalize_name_for_stats(Path(file.rel_path).stem)
                 processor_pid = proc.pid if proc else None
                 processor_start_time = proc.start_time if proc else None
                 runtime = (now - processor_start_time) if processor_start_time 
else None
@@ -1116,7 +1121,10 @@ class DagFileProcessorManager(LoggingMixin):
                     continue
                 file_name = str(file.rel_path)
                 self.log.warning("Stopping processor for %s", file_name)
-                stats.decr("dag_processing.processes", tags={"file_path": 
file_name, "action": "stop"})
+                stats.decr(
+                    "dag_processing.processes",
+                    tags={"file_path": file.normalized_file_path_for_stats, 
"action": "stop"},
+                )
                 processor.kill(signal.SIGKILL)
                 processor.logger_filehandle.close()
                 self._file_stats.pop(file, None)
@@ -1335,7 +1343,10 @@ class DagFileProcessorManager(LoggingMixin):
                 continue
 
             processor = self._create_process(file)
-            stats.incr("dag_processing.processes", tags={"file_path": 
str(file.rel_path), "action": "start"})
+            stats.incr(
+                "dag_processing.processes",
+                tags={"file_path": file.normalized_file_path_for_stats, 
"action": "start"},
+            )
 
             self._processors[file] = processor
             stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
@@ -1510,9 +1521,9 @@ class DagFileProcessorManager(LoggingMixin):
                     duration,
                     self.processor_timeout,
                 )
-                file_name = str(file.rel_path)
-                stats.decr("dag_processing.processes", tags={"file_path": 
file_name, "action": "timeout"})
-                stats.incr("dag_processing.processor_timeouts", 
tags={"file_path": file_name})
+                file_path_tag = file.normalized_file_path_for_stats
+                stats.decr("dag_processing.processes", tags={"file_path": 
file_path_tag, "action": "timeout"})
+                stats.incr("dag_processing.processor_timeouts", 
tags={"file_path": file_path_tag})
                 processor.kill(signal.SIGKILL)
 
                 processors_to_remove.append(file)
@@ -1574,9 +1585,9 @@ class DagFileProcessorManager(LoggingMixin):
     def terminate(self):
         """Stop all running processors."""
         for file, processor in self._processors.items():
-            # todo: AIP-66 what to do about file_path tag? replace with bundle 
name and rel path?
             stats.decr(
-                "dag_processing.processes", tags={"file_path": 
str(file.rel_path), "action": "terminate"}
+                "dag_processing.processes",
+                tags={"file_path": file.normalized_file_path_for_stats, 
"action": "terminate"},
             )
             # SIGTERM, wait 5s, SIGKILL if still alive
             processor.kill(signal.SIGTERM, escalation_delay=5.0)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 5b6fc1608e2..c0c42fae1d0 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -371,16 +371,26 @@ class TestDagFileProcessorManager:
             manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
 
         file_1 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
-        file_2 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
+        file_2 = DagFileInfo(
+            bundle_name="testing", rel_path=Path("folder/file 2.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
         file_3 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
         manager._file_queue = OrderedDict.fromkeys([file_1, file_2, file_3])
 
         # Mock that only one processor exists. This processor runs with 
'file_1'
         manager._processors[file_1] = MagicMock()
         # Start New Processes
-        with mock.patch.object(DagFileProcessorManager, "_create_process"):
+        with (
+            mock.patch.object(DagFileProcessorManager, "_create_process"),
+            mock.patch("airflow.dag_processing.manager.stats.incr") as 
stats_incr_mock,
+        ):
             manager._start_new_processes()
 
+        stats_incr_mock.assert_called_once_with(
+            "dag_processing.processes",
+            tags={"file_path": "folder_file_2.py", "action": "start"},
+        )
+
         # Because of the config: '[dag_processor] parsing_processes = 2'
         # verify that only one extra process is created
         # and since a processor with 'file_1' already exists,
@@ -472,14 +482,19 @@ class TestDagFileProcessorManager:
 
     def 
test_terminate_orphan_processes_kills_processor_when_file_is_truly_absent(self):
         manager = DagFileProcessorManager(max_runs=1)
-        versioned_file = _get_versioned_file_info("callbacks.py")
+        versioned_file = _get_versioned_file_info("callbacks with spaces.py")
         processor = MagicMock()
 
         manager._processors[versioned_file] = processor
 
-        manager.terminate_orphan_processes(present=set())
+        with mock.patch("airflow.dag_processing.manager.stats.decr") as 
stats_decr_mock:
+            manager.terminate_orphan_processes(present=set())
 
         assert manager._processors == {}
+        stats_decr_mock.assert_called_once_with(
+            "dag_processing.processes",
+            tags={"file_path": "callbacks_with_spaces.py", "action": "stop"},
+        )
         processor.kill.assert_called_once_with(signal.SIGKILL)
 
     def 
test_remove_orphaned_file_stats_keeps_versioned_callback_stats_when_unversioned_file_is_present(self):
@@ -1149,12 +1164,24 @@ class TestDagFileProcessorManager:
         processor, _ = self.mock_processor(start_time=start_time)
         manager._processors = {
             DagFileInfo(
-                bundle_name="testing", rel_path=Path("abc.txt"), 
bundle_path=TEST_DAGS_FOLDER
+                bundle_name="testing", rel_path=Path("folder/abc txt.py"), 
bundle_path=TEST_DAGS_FOLDER
             ): processor
         }
-        with mock.patch.object(type(processor), "kill") as mock_kill:
+        with (
+            mock.patch.object(type(processor), "kill") as mock_kill,
+            mock.patch("airflow.dag_processing.manager.stats.decr") as 
stats_decr_mock,
+            mock.patch("airflow.dag_processing.manager.stats.incr") as 
stats_incr_mock,
+        ):
             manager._kill_timed_out_processors()
         mock_kill.assert_called_once_with(signal.SIGKILL)
+        stats_decr_mock.assert_called_once_with(
+            "dag_processing.processes",
+            tags={"file_path": "folder_abc_txt.py", "action": "timeout"},
+        )
+        stats_incr_mock.assert_called_once_with(
+            "dag_processing.processor_timeouts",
+            tags={"file_path": "folder_abc_txt.py"},
+        )
         assert len(manager._processors) == 0
         processor.logger_filehandle.close.assert_called()
 
@@ -1175,6 +1202,23 @@ class TestDagFileProcessorManager:
             manager._kill_timed_out_processors()
         mock_kill.assert_not_called()
 
+    def test_terminate_normalizes_file_path_stats_tag(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing", rel_path=Path("folder/abc txt.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+        processor = MagicMock()
+        manager._processors[dag_file] = processor
+
+        with mock.patch("airflow.dag_processing.manager.stats.decr") as 
stats_decr_mock:
+            manager.terminate()
+
+        stats_decr_mock.assert_called_once_with(
+            "dag_processing.processes",
+            tags={"file_path": "folder_abc_txt.py", "action": "terminate"},
+        )
+        processor.kill.assert_called_once_with(signal.SIGTERM, 
escalation_delay=5.0)
+
     def 
test_handle_parsing_result_provides_its_own_session_when_caller_omits(self):
         """``handle_parsing_result`` is wrapped in ``@provide_session`` so 
subclasses overriding it can run without a caller-supplied session."""
         manager = DagFileProcessorManager(max_runs=1)
@@ -1416,6 +1460,25 @@ class TestDagFileProcessorManager:
             tags={"bundle_name": bundle_name, "file_name": dag_filename[:-3]},
         )
 
+    @mock.patch("airflow.dag_processing.manager.stats.gauge")
+    def test_log_file_processing_stats_normalizes_metric_name(self, 
statsd_gauge_mock):
+        manager = DagFileProcessorManager(max_runs=1)
+        dag_file = DagFileInfo(
+            bundle_name="testing",
+            rel_path=Path("test_of sprak_opertaor.py"),
+            bundle_path=TEST_DAGS_FOLDER,
+        )
+        manager._file_stats[dag_file] = DagFileStat(
+            last_finish_time=timezone.utcnow() - timedelta(seconds=5),
+        )
+
+        manager._log_file_processing_stats({"testing": {dag_file}})
+
+        statsd_gauge_mock.assert_any_call(
+            "dag_processing.last_run.seconds_ago.test_of_sprak_opertaor",
+            mock.ANY,
+        )
+
     @pytest.mark.usefixtures("testing_dag_bundle")
     def test_refresh_dags_dir_doesnt_delete_zipped_dags(
         self, tmp_path, session, configure_testing_dag_bundle, test_zip_path

Reply via email to