This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 30cfb7345a Incorrect try number subtraction producing invalid span id
for OTEL airflow (issue #41501) (#41502) (#41535)
30cfb7345a is described below
commit 30cfb7345ae793e3687597e6d499f1657ed8ece3
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Aug 16 18:10:51 2024 +0200
Incorrect try number subtraction producing invalid span id for OTEL airflow
(issue #41501) (#41502) (#41535)
* Fix for issue #39336
* removed unnecessary import
(cherry picked from commit dd3c3a7a43102c967d76cdcfe1f2f8ebeef4e212)
Co-authored-by: Howard Yoo <[email protected]>
---
airflow/executors/base_executor.py | 2 +-
airflow/executors/local_executor.py | 2 +-
airflow/executors/sequential_executor.py | 2 +-
airflow/jobs/scheduler_job_runner.py | 2 +-
airflow/traces/utils.py | 7 +------
5 files changed, 5 insertions(+), 10 deletions(-)
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index dd0b8a66d2..57568af199 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -467,7 +467,7 @@ class BaseExecutor(LoggingMixin):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
self.change_state(key, TaskInstanceState.SUCCESS, info)
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index afa51b1d86..32bba42082 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -277,7 +277,7 @@ class LocalExecutor(BaseExecutor):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(command))
local_worker = LocalWorker(self.executor.result_queue, key=key,
command=command)
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 1b145892eb..5e9542d915 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -76,7 +76,7 @@ class SequentialExecutor(BaseExecutor):
span.set_attribute("dag_id", key.dag_id)
span.set_attribute("run_id", key.run_id)
span.set_attribute("task_id", key.task_id)
- span.set_attribute("try_number", key.try_number - 1)
+ span.set_attribute("try_number", key.try_number)
span.set_attribute("commands_to_run", str(self.commands_to_run))
def sync(self) -> None:
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 163bf5b714..ba5f90c68b 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -837,7 +837,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
span.set_attribute("hostname", ti.hostname)
span.set_attribute("log_url", ti.log_url)
span.set_attribute("operator", str(ti.operator))
- span.set_attribute("try_number", ti.try_number - 1)
+ span.set_attribute("try_number", ti.try_number)
span.set_attribute("executor_state", state)
span.set_attribute("job_id", ti.job_id)
span.set_attribute("pool", ti.pool)
diff --git a/airflow/traces/utils.py b/airflow/traces/utils.py
index afab2591d5..9932c249f0 100644
--- a/airflow/traces/utils.py
+++ b/airflow/traces/utils.py
@@ -22,7 +22,6 @@ from typing import TYPE_CHECKING
from airflow.traces import NO_TRACE_ID
from airflow.utils.hashlib_wrapper import md5
-from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
from airflow.models import DagRun, TaskInstance
@@ -75,12 +74,8 @@ def gen_dag_span_id(dag_run: DagRun, as_int: bool = False)
-> str | int:
def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
"""Generate span id from the task instance."""
dag_run = ti.dag_run
- if ti.state == TaskInstanceState.SUCCESS or ti.state ==
TaskInstanceState.FAILED:
- try_number = ti.try_number - 1
- else:
- try_number = ti.try_number
return _gen_id(
- [dag_run.dag_id, dag_run.run_id, ti.task_id, str(try_number)],
+ [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
as_int,
SPAN_ID,
)