This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 30cfb7345a Incorrect try number subtraction producing invalid span id 
for OTEL airflow (issue #41501) (#41502) (#41535)
30cfb7345a is described below

commit 30cfb7345ae793e3687597e6d499f1657ed8ece3
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Aug 16 18:10:51 2024 +0200

    Incorrect try number subtraction producing invalid span id for OTEL airflow 
(issue #41501) (#41502) (#41535)
    
    * Fix for issue #39336
    
    * removed unnecessary import
    
    (cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212)
    
    Co-authored-by: Howard Yoo <[email protected]>
---
 airflow/executors/base_executor.py       | 2 +-
 airflow/executors/local_executor.py      | 2 +-
 airflow/executors/sequential_executor.py | 2 +-
 airflow/jobs/scheduler_job_runner.py     | 2 +-
 airflow/traces/utils.py                  | 7 +------
 5 files changed, 5 insertions(+), 10 deletions(-)

diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index dd0b8a66d2..57568af199 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -467,7 +467,7 @@ class BaseExecutor(LoggingMixin):
                 span.set_attribute("dag_id", key.dag_id)
                 span.set_attribute("run_id", key.run_id)
                 span.set_attribute("task_id", key.task_id)
-                span.set_attribute("try_number", key.try_number - 1)
+                span.set_attribute("try_number", key.try_number)
 
         self.change_state(key, TaskInstanceState.SUCCESS, info)
 
diff --git a/airflow/executors/local_executor.py 
b/airflow/executors/local_executor.py
index afa51b1d86..32bba42082 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -277,7 +277,7 @@ class LocalExecutor(BaseExecutor):
                 span.set_attribute("dag_id", key.dag_id)
                 span.set_attribute("run_id", key.run_id)
                 span.set_attribute("task_id", key.task_id)
-                span.set_attribute("try_number", key.try_number - 1)
+                span.set_attribute("try_number", key.try_number)
                 span.set_attribute("commands_to_run", str(command))
 
             local_worker = LocalWorker(self.executor.result_queue, key=key, 
command=command)
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 1b145892eb..5e9542d915 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -76,7 +76,7 @@ class SequentialExecutor(BaseExecutor):
             span.set_attribute("dag_id", key.dag_id)
             span.set_attribute("run_id", key.run_id)
             span.set_attribute("task_id", key.task_id)
-            span.set_attribute("try_number", key.try_number - 1)
+            span.set_attribute("try_number", key.try_number)
             span.set_attribute("commands_to_run", str(self.commands_to_run))
 
     def sync(self) -> None:
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 163bf5b714..ba5f90c68b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -837,7 +837,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 span.set_attribute("hostname", ti.hostname)
                 span.set_attribute("log_url", ti.log_url)
                 span.set_attribute("operator", str(ti.operator))
-                span.set_attribute("try_number", ti.try_number - 1)
+                span.set_attribute("try_number", ti.try_number)
                 span.set_attribute("executor_state", state)
                 span.set_attribute("job_id", ti.job_id)
                 span.set_attribute("pool", ti.pool)
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index afab2591d5..9932c249f0 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -22,7 +22,6 @@ from typing import TYPE_CHECKING
 
 from airflow.traces import NO_TRACE_ID
 from airflow.utils.hashlib_wrapper import md5
-from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
     from airflow.models import DagRun, TaskInstance
@@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) 
-> str | int:
 def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
     """Generate span id from the task instance."""
     dag_run = ti.dag_run
-    if ti.state == TaskInstanceState.SUCCESS or ti.state == 
TaskInstanceState.FAILED:
-        try_number = ti.try_number - 1
-    else:
-        try_number = ti.try_number
     return _gen_id(
-        [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)],
+        [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
         as_int,
         SPAN_ID,
     )

Reply via email to