ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520307917
##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
heartbeat_callback(session)
self.log.debug("[heartbeat]")
except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ class_name = self.__class__.__name__
+ Stats.incr(convert_camel_to_snake(class_name) +
"_heartbeat_failure", 1, 1)
if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
+ msg = f"{class_name} heartbeat got an exception"
+ self.log.exception(msg)
self.heartbeat_failed = True
- s.add_event(
+ span.add_event(
name="error",
- attributes={"message": f"{self.__class__.__name__}
heartbeat got an exception"},
+ attributes={"message": msg},
Review Comment:
Personal preference, feel free to ignore: With `"message": msg` being so
short now (here and below), I think this should all fit on one line and look
clean. Removing the trailing `,` at the end of this line will let the linter
do that. Pretty sure that's just a personal style thing though, you do you.
:P
##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
heartbeat_callback(session)
self.log.debug("[heartbeat]")
except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ class_name = self.__class__.__name__
+ Stats.incr(convert_camel_to_snake(class_name) +
"_heartbeat_failure", 1, 1)
if not self.heartbeat_failed:
- self.log.exception("%s heartbeat got an exception",
self.__class__.__name__)
+ msg = f"{class_name} heartbeat got an exception"
Review Comment:
Thanks, I think this looks much cleaner and will make sure those messages
will always be consistent.
##########
airflow/traces/utils.py:
##########
@@ -18,52 +18,66 @@
from __future__ import annotations
import logging
+from typing import TYPE_CHECKING
from airflow.utils.hashlib_wrapper import md5
+if TYPE_CHECKING:
+ from airflow.models import DagRun, TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
+
log = logging.getLogger(__name__)
-def gen_trace_id(dag_run) -> str:
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+ """Generate trace id from DagRun."""
dag_id = dag_run.dag_id
run_id = dag_run.run_id
start_dt = dag_run.start_date
hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+ if as_int is True:
+ return int(hash_hex, 16)
Review Comment:
I know this was my suggestion, but seeing it in use, I wonder if `as_int` is
right, or should it be `as_decimal`? I'll leave it to you. I think both
work, not sure if one is better than the other.
##########
airflow/executors/base_executor.py:
##########
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
@span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
- from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
for key, command, queue, executor_config in task_tuples:
- qt = self.queued_tasks[key][3]
- trace_id = int(gen_trace_id(qt.dag_run), 16) # TaskInstance in
fourth element
- span_id = int(gen_span_id_from_ti_key(key), 16)
+ task_instance = self.queued_tasks[key][3]
+ trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
# TaskInstance in fourth element
Review Comment:
Here and elsewhere: With `as_int` casting the return value, you shouldn't
have to cast it again here.
##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
heartbeat_callback(session)
self.log.debug("[heartbeat]")
except OperationalError:
- Stats.incr(convert_camel_to_snake(self.__class__.__name__) +
"_heartbeat_failure", 1, 1)
+ class_name = self.__class__.__name__
+ Stats.incr(convert_camel_to_snake(class_name) +
"_heartbeat_failure", 1, 1)
Review Comment:
Stats.incr() should default to 1,1 so you should be able to drop those and
make this just `Stats.incr(convert_camel_to_snake(class_name) +
"_heartbeat_failure")`, but I'll leave it up to you if you think it's nicer to
be explicit there.
##########
airflow/executors/base_executor.py:
##########
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
@span
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
- from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
for key, command, queue, executor_config in task_tuples:
- qt = self.queued_tasks[key][3]
- trace_id = int(gen_trace_id(qt.dag_run), 16) # TaskInstance in
fourth element
- span_id = int(gen_span_id_from_ti_key(key), 16)
+ task_instance = self.queued_tasks[key][3]
+ trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))
# TaskInstance in fourth element
Review Comment:
I think this comment would be more useful on the previous line, explaining
what the [3] is for, right?
##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
links=None,
start_time=None,
):
- """Start a span. if service_name is not given, otel_service is used."""
+ """Start a span; if service_name is not given, otel_service is used."""
if component is None:
component = self.otel_service
trace_id = self.get_current_span().get_span_context().trace_id
if span_id is not None:
- tracer = self.get_tracer_with_id(component=component,
trace_id=trace_id, span_id=span_id)
+ tracer = self.get_tracer(component=component, trace_id=trace_id,
span_id=span_id)
else:
tracer = self.get_tracer(component)
- kvs = {}
- if self.tags is not None:
- kvs = parse_tracestate(self.tags)
+ attributes = parse_tracestate(self.tag_string) if self.tag_string else
{}
if links is not None:
_links = gen_links_from_kv_list(links)
else:
_links = []
Review Comment:
Consider replacing these four lines with one:
```suggestion
_links = gen_links_from_kv_list(links) if links else []
```
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1391,11 +1392,11 @@ def _start_queued_dagruns(self, session: Session) ->
None:
@span
def _update_state(dag: DAG, dag_run: DagRun):
Review Comment:
I see what you mean here. That double underscore is a little awkward but I
think it feels better than just `s`. What do you think?
##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
links=None,
start_time=None,
):
- """Start a span. if service_name is not given, otel_service is used."""
+ """Start a span; if service_name is not given, otel_service is used."""
if component is None:
component = self.otel_service
trace_id = self.get_current_span().get_span_context().trace_id
if span_id is not None:
- tracer = self.get_tracer_with_id(component=component,
trace_id=trace_id, span_id=span_id)
+ tracer = self.get_tracer(component=component, trace_id=trace_id,
span_id=span_id)
else:
tracer = self.get_tracer(component)
Review Comment:
I may be missing something here, but now that the two different get_tracer
methods are combined, and the beginning of the new get_tracer method is
checking if the span_id exists, do we need this? Why not just pass trace_id
and span_id, and let get_tracer decide how to handle it?
##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
links=None,
start_time=None,
):
- """Start a span. if service_name is not given, otel_service is used."""
+ """Start a span; if service_name is not given, otel_service is used."""
if component is None:
component = self.otel_service
trace_id = self.get_current_span().get_span_context().trace_id
if span_id is not None:
- tracer = self.get_tracer_with_id(component=component,
trace_id=trace_id, span_id=span_id)
+ tracer = self.get_tracer(component=component, trace_id=trace_id,
span_id=span_id)
else:
tracer = self.get_tracer(component)
- kvs = {}
- if self.tags is not None:
- kvs = parse_tracestate(self.tags)
+ attributes = parse_tracestate(self.tag_string) if self.tag_string else
{}
if links is not None:
_links = gen_links_from_kv_list(links)
else:
_links = []
if start_time is not None:
- start_time = int(start_time.timestamp() * 1000000000)
+ start_time = datetime_to_nano(start_time)
if parent_sc is not None:
ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
span = tracer.start_as_current_span(
- span_name, context=ctx, attributes=kvs, links=_links,
start_time=start_time
+ span_name, context=ctx, attributes=attributes, links=_links,
start_time=start_time
)
else:
span = tracer.start_as_current_span(
- span_name, attributes=kvs, links=_links, start_time=start_time
+ span_name, attributes=attributes, links=_links,
start_time=start_time
)
Review Comment:
the OTel SDK docs are unbearable.... you'll need to double check this,
depending on how start_as_current_span() handles the context value, but I think
you can condense these 9 lines down to
```
span = tracer.start_as_current_span(
span_name,
attributes=attributes,
links=_links,
start_time=start_time,
context=trace.set_span_in_context(NonRecordingSpan(parent_sc)) if parent_sc
else None
)
```
or maybe
```
if parent_sc:
context=trace.set_span_in_context(NonRecordingSpan(parent_sc))
span = tracer.start_as_current_span(
span_name,
attributes=attributes,
links=_links,
start_time=start_time,
context=context or None
)
```
But that depends on how tracer.start_as_current_span() handles the None
value there. May work, may not.
##########
airflow/traces/otel_tracer.py:
##########
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
"""Produce a span from dag run."""
# check if dagrun has configs
conf = dagrun.conf
- trace_id = int(gen_trace_id(dag_run=dagrun), 16)
- span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+ trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+ span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
if conf is not None:
traceparent = conf.get(TRACEPARENT)
tracestate = conf.get(TRACESTATE)
- tracer = self.get_tracer_with_id(component=component, span_id=span_id,
trace_id=trace_id)
+ tracer = self.get_tracer(component=component, span_id=span_id,
trace_id=trace_id)
- kvstr = None
+ tag_string = None
# merge attributes from tags and tracestate
- if self.tags is not None:
- kvstr = self.tags
+ if self.tag_string is not None:
+ tag_string = self.tag_string
Review Comment:
These three lines boil down to:
```
tag_string = self.tag_string
```
##########
airflow/traces/utils.py:
##########
@@ -18,52 +18,66 @@
from __future__ import annotations
import logging
+from typing import TYPE_CHECKING
from airflow.utils.hashlib_wrapper import md5
+if TYPE_CHECKING:
+ from airflow.models import DagRun, TaskInstance
+ from airflow.models.taskinstancekey import TaskInstanceKey
+
log = logging.getLogger(__name__)
Review Comment:
I feel like all these `gen_foo` methods can be consolidated.... I'll put
some thought into how. I think maybe something like this..... maybe??
((untested code, just thinking here))
```
def gen_id(seeds: List[str], as_int: bool = False, shorten: bool = False) ->
str | int:
seed_str = "_".join(seeds).encode("utf-8")
hash_hex = md5(seed_str).hexdigest()
if shorten:
hash_hex = hash_hex[16:]
return int(hash_hex, 16) if as_int else hash_hex
def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
return gen_id([dag_run.dag_id, dag_run.run_id,
dag_run.start_date.timestamp], as_int, shorten=False)
def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False)
-> str | int:
return gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id,
ti_key.try_number], as_int)
def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
return gen_id([dag_run.dag_id,
dag_run.run_id,dag_run.start_date.timestamp()], as_int)
<and so on>
```
or maybe if that is too "magic" then the gen_id method could accept the
assembled seed string, so in use it would look like:
```
def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
seed =
f"{dag_run.dag_id}_{dag_run.run_id}_{dag_run.start_date.timestamp()}"
return gen_id(seed, as_int, shorten=False)
```
Either of those would reduce around 50 lines of repetitive code with around
20 lines, and be just as easy to read (IMHO), easier to modify later if all of
those needs to get changed at some point, and just as easy to change if only
one needs to get adjusted later. No disadvantage IMHO.
##########
airflow/traces/otel_tracer.py:
##########
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
"""Produce a span from dag run."""
# check if dagrun has configs
conf = dagrun.conf
- trace_id = int(gen_trace_id(dag_run=dagrun), 16)
- span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+ trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+ span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
if conf is not None:
traceparent = conf.get(TRACEPARENT)
tracestate = conf.get(TRACESTATE)
- tracer = self.get_tracer_with_id(component=component, span_id=span_id,
trace_id=trace_id)
+ tracer = self.get_tracer(component=component, span_id=span_id,
trace_id=trace_id)
- kvstr = None
+ tag_string = None
# merge attributes from tags and tracestate
- if self.tags is not None:
- kvstr = self.tags
+ if self.tag_string is not None:
+ tag_string = self.tag_string
if tracestate is not None:
- if kvstr is None:
- kvstr = tracestate
+ if tag_string is None:
+ tag_string = tracestate
else:
- kvstr = kvstr + "," + tracestate
- kvs = parse_tracestate(kvstr)
+ tag_string = tag_string + "," + tracestate
Review Comment:
Consider simplifying all of L145 - L155 with the following?
```
if self.tag_string and tracestate:
tag_string = self.tag_string + "," + tracestate
else:
tag_string = self.tag_string or tracestate
```
that last line is the same as
```
if self.tag_string is not None:
tag_string = self.tag_string
elif tracestate is not None:
tag_string = tracestate
else:
tag_string = None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]