This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ea515e6022 Rename @span decorator to @add_span to avoid collisions
(#41444)
ea515e6022 is described below
commit ea515e6022658bb295bb8d060f4e212c6c1da8fb
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Aug 13 14:49:16 2024 -0700
Rename @span decorator to @add_span to avoid collisions (#41444)
Previously if we ever needed to get the current span and manipulate it, we
needed to name it with leading or trailing underscores to avoid collisions with
the span decorator function name, which was a bit awkward.
---
airflow/dag_processing/manager.py | 6 +++---
airflow/executors/base_executor.py | 8 ++++----
airflow/executors/local_executor.py | 18 +++++++++---------
airflow/executors/sequential_executor.py | 4 ++--
airflow/jobs/job.py | 4 ++--
airflow/jobs/scheduler_job_runner.py | 28 ++++++++++++++--------------
airflow/jobs/triggerer_job_runner.py | 10 +++++-----
airflow/traces/tracer.py | 2 +-
8 files changed, 40 insertions(+), 40 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index c03bc074d0..1f12f3cb03 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -53,7 +53,7 @@ from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.file import list_py_file_paths, might_contain_dag
@@ -1210,7 +1210,7 @@ class DagFileProcessorManager(LoggingMixin):
callback_requests=callback_requests,
)
- @span
+ @add_span
def start_new_processes(self):
"""Start more processors if we have enough slots and files to
process."""
# initialize cache to mutualize calls to Variable.get in DAGs
@@ -1248,7 +1248,7 @@ class DagFileProcessorManager(LoggingMixin):
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_path_queue))
- @span
+ @add_span
def add_new_file_path_to_queue(self):
for file_path in self.file_paths:
if file_path not in self._file_stats:
diff --git a/airflow/executors/base_executor.py
b/airflow/executors/base_executor.py
index dd0b8a66d2..2c5cbc4b57 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -34,7 +34,7 @@ from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.stats import Stats
from airflow.traces import NO_TRACE_ID
-from airflow.traces.tracer import Trace, gen_context, span
+from airflow.traces.tracer import Trace, add_span, gen_context
from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -228,7 +228,7 @@ class BaseExecutor(LoggingMixin):
Executors should override this to perform gather statuses.
"""
- @span
+ @add_span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
if not self.parallelism:
@@ -321,7 +321,7 @@ class BaseExecutor(LoggingMixin):
reverse=True,
)
- @span
+ @add_span
def trigger_tasks(self, open_slots: int) -> None:
"""
Initiate async execution of the queued tasks, up to the number of
available slots.
@@ -381,7 +381,7 @@ class BaseExecutor(LoggingMixin):
if task_tuples:
self._process_tasks(task_tuples)
- @span
+ @add_span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
for key, command, queue, executor_config in task_tuples:
task_instance = self.queued_tasks[key][3] # TaskInstance in
fourth element
diff --git a/airflow/executors/local_executor.py
b/airflow/executors/local_executor.py
index afa51b1d86..81b8ad7628 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -39,7 +39,7 @@ from setproctitle import getproctitle, setproctitle
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
@@ -79,7 +79,7 @@ class LocalWorkerBase(Process, LoggingMixin):
setproctitle("airflow worker -- LocalExecutor")
return super().run()
- @span
+ @add_span
def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None:
"""
Execute command received and stores result state in queue.
@@ -103,7 +103,7 @@ class LocalWorkerBase(Process, LoggingMixin):
# Remove the command since the worker is done executing the task
setproctitle("airflow worker -- LocalExecutor")
- @span
+ @add_span
def _execute_work_in_subprocess(self, command: CommandType) ->
TaskInstanceState:
try:
subprocess.check_call(command, close_fds=True)
@@ -112,7 +112,7 @@ class LocalWorkerBase(Process, LoggingMixin):
self.log.error("Failed to execute task %s.", e)
return TaskInstanceState.FAILED
- @span
+ @add_span
def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
pid = os.fork()
if pid:
@@ -172,7 +172,7 @@ class LocalWorker(LocalWorkerBase):
self.key: TaskInstanceKey = key
self.command: CommandType = command
- @span
+ @add_span
def do_work(self) -> None:
self.execute_work(key=self.key, command=self.command)
@@ -192,7 +192,7 @@ class QueuedLocalWorker(LocalWorkerBase):
super().__init__(result_queue=result_queue)
self.task_queue = task_queue
- @span
+ @add_span
def do_work(self) -> None:
while True:
try:
@@ -253,7 +253,7 @@ class LocalExecutor(BaseExecutor):
self.executor.workers_used = 0
self.executor.workers_active = 0
- @span
+ @add_span
def execute_async(
self,
key: TaskInstanceKey,
@@ -329,7 +329,7 @@ class LocalExecutor(BaseExecutor):
for worker in self.executor.workers:
worker.start()
- @span
+ @add_span
def execute_async(
self,
key: TaskInstanceKey,
@@ -391,7 +391,7 @@ class LocalExecutor(BaseExecutor):
self.impl.start()
- @span
+ @add_span
def execute_async(
self,
key: TaskInstanceKey,
diff --git a/airflow/executors/sequential_executor.py
b/airflow/executors/sequential_executor.py
index 1b145892eb..98fc901a13 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -29,7 +29,7 @@ import subprocess
from typing import TYPE_CHECKING, Any
from airflow.executors.base_executor import BaseExecutor
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
@@ -60,7 +60,7 @@ class SequentialExecutor(BaseExecutor):
super().__init__()
self.commands_to_run = []
- @span
+ @add_span
def execute_async(
self,
key: TaskInstanceKey,
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 9384821807..03bf92d4e3 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -34,7 +34,7 @@ from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.serialization.pydantic.job import JobPydantic
from airflow.stats import Stats
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
from airflow.utils import timezone
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -459,7 +459,7 @@ def execute_job(job: Job, execute_callable: Callable[[],
int | None]) -> int | N
return ret
-@span
+@add_span
def perform_heartbeat(
job: Job, heartbeat_callback: Callable[[Session], None],
only_if_necessary: bool
) -> None:
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 163bf5b714..44938c91b1 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -61,7 +61,7 @@ from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.timetables.simple import DatasetTriggeredTimetable
from airflow.traces import utils as trace_utils
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
from airflow.utils import timezone
from airflow.utils.dates import datetime_to_nano
from airflow.utils.event_scheduler import EventScheduler
@@ -1304,7 +1304,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
guard.commit()
# END: create dagruns
- @span
+ @add_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
# Bulk Fetch DagRuns with dag_id and execution_date same
@@ -1512,7 +1512,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return False
return True
- @span
+ @add_span
def _start_queued_dagruns(self, session: Session) -> None:
"""Find DagRuns in queued state and decide moving them to running
state."""
# added all() to save runtime, otherwise query is executed more than
once
@@ -1522,13 +1522,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs),
only_running=True, session=session),
)
- @span
+ @add_span
def _update_state(dag: DAG, dag_run: DagRun):
- __span = Trace.get_current_span()
- __span.set_attribute("state", str(DagRunState.RUNNING))
- __span.set_attribute("run_id", dag_run.run_id)
- __span.set_attribute("type", dag_run.run_type)
- __span.set_attribute("dag_id", dag_run.dag_id)
+ span = Trace.get_current_span()
+ span.set_attribute("state", str(DagRunState.RUNNING))
+ span.set_attribute("run_id", dag_run.run_id)
+ span.set_attribute("type", dag_run.run_type)
+ span.set_attribute("dag_id", dag_run.dag_id)
dag_run.state = DagRunState.RUNNING
dag_run.start_date = timezone.utcnow()
@@ -1549,8 +1549,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
schedule_delay,
tags={"dag_id": dag.dag_id},
)
- if __span.is_recording():
- __span.add_event(
+ if span.is_recording():
+ span.add_event(
name="schedule_delay",
attributes={"dag_id": dag.dag_id, "schedule_delay":
str(schedule_delay)},
)
@@ -1560,7 +1560,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
partial(self.dagbag.get_dag, session=session)
)
- _span = Trace.get_current_span()
+ span = Trace.get_current_span()
for dag_run in dag_runs:
dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
@@ -1577,8 +1577,8 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
dag_run.execution_date,
)
else:
- if _span.is_recording():
- _span.add_event(
+ if span.is_recording():
+ span.add_event(
name="dag_run",
attributes={
"run_id": dag_run.run_id,
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index 080323a1d1..b41af29f37 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -37,7 +37,7 @@ from airflow.jobs.base_job_runner import BaseJobRunner
from airflow.jobs.job import perform_heartbeat
from airflow.models.trigger import Trigger
from airflow.stats import Stats
-from airflow.traces.tracer import Trace, span
+from airflow.traces.tracer import Trace, add_span
from airflow.triggers.base import TriggerEvent
from airflow.typing_compat import TypedDict
from airflow.utils import timezone
@@ -392,14 +392,14 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
# Idle sleep
time.sleep(1)
- @span
+ @add_span
def load_triggers(self):
"""Query the database for the triggers we're supposed to be running
and update the runner."""
Trigger.assign_unassigned(self.job.id, self.capacity,
self.health_check_threshold)
ids = Trigger.ids_for_triggerer(self.job.id)
self.trigger_runner.update_triggers(set(ids))
- @span
+ @add_span
def handle_events(self):
"""Dispatch outbound events to the Trigger model which pushes them to
the relevant task instances."""
while self.trigger_runner.events:
@@ -410,7 +410,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
# Emit stat event
Stats.incr("triggers.succeeded")
- @span
+ @add_span
def handle_failed_triggers(self):
"""
Handle "failed" triggers. - ones that errored or exited before they
sent an event.
@@ -424,7 +424,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
# Emit stat event
Stats.incr("triggers.failed")
- @span
+ @add_span
def emit_metrics(self):
Stats.gauge(f"triggers.running.{self.job.hostname}",
len(self.trigger_runner.triggers))
Stats.gauge(
diff --git a/airflow/traces/tracer.py b/airflow/traces/tracer.py
index 1d58717287..79272d43d0 100644
--- a/airflow/traces/tracer.py
+++ b/airflow/traces/tracer.py
@@ -42,7 +42,7 @@ def gen_links_from_kv_list(list):
return gen_links_from_kv_list(list)
-def span(func):
+def add_span(func):
"""Decorate a function with span."""
def wrapper(*args, **kwargs):