This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 7b2e0d8bc9c [v3-1-test] fix: corrects otel serialization of file paths
in dag processor (#56665) (#56718)
7b2e0d8bc9c is described below
commit 7b2e0d8bc9c10e955bddce3b56efdb3870c969b4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Oct 16 13:27:54 2025 +0200
[v3-1-test] fix: corrects otel serialization of file paths in dag processor
(#56665) (#56718)
(cherry picked from commit 501da16d44817f23e209be98400f9578d8f3e578)
Co-authored-by: codecae <[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 73f74c415ab..df4f84d0ce5 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -801,8 +801,9 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._processors.pop(file, None)
if not processor:
continue
- self.log.warning("Stopping processor for %s", file)
- Stats.decr("dag_processing.processes", tags={"file_path":
file, "action": "stop"})
+ file_name = str(file.rel_path)
+ self.log.warning("Stopping processor for %s", file_name)
+ Stats.decr("dag_processing.processes", tags={"file_path":
file_name, "action": "stop"})
processor.kill(signal.SIGKILL)
processor.logger_filehandle.close()
self._file_stats.pop(file, None)
@@ -922,7 +923,7 @@ class DagFileProcessorManager(LoggingMixin):
continue
processor = self._create_process(file)
- Stats.incr("dag_processing.processes", tags={"file_path": file,
"action": "start"})
+ Stats.incr("dag_processing.processes", tags={"file_path":
str(file.rel_path), "action": "start"})
self._processors[file] = processor
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
@@ -1033,8 +1034,9 @@ class DagFileProcessorManager(LoggingMixin):
processor.pid,
duration,
)
- Stats.decr("dag_processing.processes", tags={"file_path":
file, "action": "timeout"})
- Stats.incr("dag_processing.processor_timeouts",
tags={"file_path": file})
+ file_name = str(file.rel_path)
+ Stats.decr("dag_processing.processes", tags={"file_path":
file_name, "action": "timeout"})
+ Stats.incr("dag_processing.processor_timeouts",
tags={"file_path": file_name})
processor.kill(signal.SIGKILL)
processors_to_remove.append(file)
@@ -1075,7 +1077,9 @@ class DagFileProcessorManager(LoggingMixin):
"""Stop all running processors."""
for file, processor in self._processors.items():
# todo: AIP-66 what to do about file_path tag? replace with bundle
name and rel path?
- Stats.decr("dag_processing.processes", tags={"file_path": file,
"action": "terminate"})
+ Stats.decr(
+ "dag_processing.processes", tags={"file_path":
str(file.rel_path), "action": "terminate"}
+ )
# SIGTERM, wait 5s, SIGKILL if still alive
processor.kill(signal.SIGTERM, escalation_delay=5.0)