This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 7b2e0d8bc9c [v3-1-test] fix: corrects otel serialization of file paths 
in dag processor (#56665) (#56718)
7b2e0d8bc9c is described below

commit 7b2e0d8bc9c10e955bddce3b56efdb3870c969b4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Oct 16 13:27:54 2025 +0200

    [v3-1-test] fix: corrects otel serialization of file paths in dag processor 
(#56665) (#56718)
    
    (cherry picked from commit 501da16d44817f23e209be98400f9578d8f3e578)
    
    Co-authored-by: codecae <[email protected]>
---
 airflow-core/src/airflow/dag_processing/manager.py | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 73f74c415ab..df4f84d0ce5 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -801,8 +801,9 @@ class DagFileProcessorManager(LoggingMixin):
                 processor = self._processors.pop(file, None)
                 if not processor:
                     continue
-                self.log.warning("Stopping processor for %s", file)
-                Stats.decr("dag_processing.processes", tags={"file_path": 
file, "action": "stop"})
+                file_name = str(file.rel_path)
+                self.log.warning("Stopping processor for %s", file_name)
+                Stats.decr("dag_processing.processes", tags={"file_path": 
file_name, "action": "stop"})
                 processor.kill(signal.SIGKILL)
                 processor.logger_filehandle.close()
                 self._file_stats.pop(file, None)
@@ -922,7 +923,7 @@ class DagFileProcessorManager(LoggingMixin):
                 continue
 
             processor = self._create_process(file)
-            Stats.incr("dag_processing.processes", tags={"file_path": file, 
"action": "start"})
+            Stats.incr("dag_processing.processes", tags={"file_path": 
str(file.rel_path), "action": "start"})
 
             self._processors[file] = processor
             Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
@@ -1033,8 +1034,9 @@ class DagFileProcessorManager(LoggingMixin):
                     processor.pid,
                     duration,
                 )
-                Stats.decr("dag_processing.processes", tags={"file_path": 
file, "action": "timeout"})
-                Stats.incr("dag_processing.processor_timeouts", 
tags={"file_path": file})
+                file_name = str(file.rel_path)
+                Stats.decr("dag_processing.processes", tags={"file_path": 
file_name, "action": "timeout"})
+                Stats.incr("dag_processing.processor_timeouts", 
tags={"file_path": file_name})
                 processor.kill(signal.SIGKILL)
 
                 processors_to_remove.append(file)
@@ -1075,7 +1077,9 @@ class DagFileProcessorManager(LoggingMixin):
         """Stop all running processors."""
         for file, processor in self._processors.items():
             # todo: AIP-66 what to do about file_path tag? replace with bundle 
name and rel path?
-            Stats.decr("dag_processing.processes", tags={"file_path": file, 
"action": "terminate"})
+            Stats.decr(
+                "dag_processing.processes", tags={"file_path": 
str(file.rel_path), "action": "terminate"}
+            )
             # SIGTERM, wait 5s, SIGKILL if still alive
             processor.kill(signal.SIGTERM, escalation_delay=5.0)
 

Reply via email to