ferruzzi commented on code in PR #37948:
URL: https://github.com/apache/airflow/pull/37948#discussion_r1520307917


##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
                 heartbeat_callback(session)
                 self.log.debug("[heartbeat]")
             except OperationalError:
-                Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+                class_name = self.__class__.__name__
+                Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)
                 if not self.heartbeat_failed:
-                    self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+                    msg = f"{class_name} heartbeat got an exception"
+                    self.log.exception(msg)
                     self.heartbeat_failed = True
-                    s.add_event(
+                    span.add_event(
                         name="error",
-                        attributes={"message": f"{self.__class__.__name__} 
heartbeat got an exception"},
+                        attributes={"message": msg},

Review Comment:
   Personal preference, feel free to ignore:  With `"message": msg` being so 
short now (here and below), I think this should all fit on one line and look 
clean.  Removing the trailing `,` at the end of this line will let the linter 
do that.   Pretty sure that's just a personal style thing though, you do you. 
:P 



##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
                 heartbeat_callback(session)
                 self.log.debug("[heartbeat]")
             except OperationalError:
-                Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+                class_name = self.__class__.__name__
+                Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)
                 if not self.heartbeat_failed:
-                    self.log.exception("%s heartbeat got an exception", 
self.__class__.__name__)
+                    msg = f"{class_name} heartbeat got an exception"

Review Comment:
   Thanks, I think this looks much cleaner and will make sure those messages 
will always be consistent.



##########
airflow/traces/utils.py:
##########
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+    from airflow.models import DagRun, TaskInstance
+    from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 
-def gen_trace_id(dag_run) -> str:
+def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+    """Generate trace id from DagRun."""
     dag_id = dag_run.dag_id
     run_id = dag_run.run_id
     start_dt = dag_run.start_date
     hash_seed = f"{dag_id}_{run_id}_{start_dt.timestamp()}"
     hash_hex = md5(hash_seed.encode("utf-8")).hexdigest()
+    if as_int is True:
+        return int(hash_hex, 16)

Review Comment:
   I know this was my suggestion, but seeing it in use, I wonder if `as_int` is 
right, or should it be `as_decimal`?   I'll leave it to you.  I think both 
work, not sure if one is better than the other.



##########
airflow/executors/base_executor.py:
##########
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
     @span
     def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-        from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
         for key, command, queue, executor_config in task_tuples:
-            qt = self.queued_tasks[key][3]
-            trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-            span_id = int(gen_span_id_from_ti_key(key), 16)
+            task_instance = self.queued_tasks[key][3]
+            trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   Here and elsewhere:  With `as_int` casting the return value, you shouldn't 
have to cast it again here.



##########
airflow/jobs/job.py:
##########
@@ -211,35 +211,29 @@ def heartbeat(
                 heartbeat_callback(session)
                 self.log.debug("[heartbeat]")
             except OperationalError:
-                Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+                class_name = self.__class__.__name__
+                Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure", 1, 1)

Review Comment:
   Stats.incr() should default to 1,1 so you should be able to drop those and 
make this just `Stats.incr(convert_camel_to_snake(class_name) + 
"_heartbeat_failure")`, but I'll leave it up to you if you think it's nicer to 
be explicit there.



##########
airflow/executors/base_executor.py:
##########
@@ -311,12 +313,10 @@ def trigger_tasks(self, open_slots: int) -> None:
 
     @span
     def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
-        from airflow.traces.utils import gen_span_id_from_ti_key, gen_trace_id
-
         for key, command, queue, executor_config in task_tuples:
-            qt = self.queued_tasks[key][3]
-            trace_id = int(gen_trace_id(qt.dag_run), 16)  # TaskInstance in 
fourth element
-            span_id = int(gen_span_id_from_ti_key(key), 16)
+            task_instance = self.queued_tasks[key][3]
+            trace_id = int(gen_trace_id(task_instance.dag_run, as_int=True))  
# TaskInstance in fourth element

Review Comment:
   I think this comment would be more useful on the previous line, explaining 
what the [3] is for, right?



##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
         links=None,
         start_time=None,
     ):
-        """Start a span. if service_name is not given, otel_service is used."""
+        """Start a span; if service_name is not given, otel_service is used."""
         if component is None:
             component = self.otel_service
 
         trace_id = self.get_current_span().get_span_context().trace_id
         if span_id is not None:
-            tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+            tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
         else:
             tracer = self.get_tracer(component)
 
-        kvs = {}
-        if self.tags is not None:
-            kvs = parse_tracestate(self.tags)
+        attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
 
         if links is not None:
             _links = gen_links_from_kv_list(links)
         else:
             _links = []

Review Comment:
   Consider replacing these four lines with one:
   ```suggestion
               _links = gen_links_from_kv_list(links) if links else []
   ```



##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1391,11 +1392,11 @@ def _start_queued_dagruns(self, session: Session) -> 
None:
 
         @span
         def _update_state(dag: DAG, dag_run: DagRun):

Review Comment:
   I see what you mean here.  That double underscore is a little awkward but I 
think it feels better than just `s`.   What do you think?



##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
         links=None,
         start_time=None,
     ):
-        """Start a span. if service_name is not given, otel_service is used."""
+        """Start a span; if service_name is not given, otel_service is used."""
         if component is None:
             component = self.otel_service
 
         trace_id = self.get_current_span().get_span_context().trace_id
         if span_id is not None:
-            tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+            tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
         else:
             tracer = self.get_tracer(component)

Review Comment:
   I may be missing something here, but now that the two different get_tracer 
methods are combined, and the beginning of the new get_tracer method is 
checking if the span_id exists, do we need this?  Why not just pass trace_id 
and span_id, and let get_tracer decide how to handle it?



##########
airflow/traces/otel_tracer.py:
##########
@@ -104,36 +98,34 @@ def start_span(
         links=None,
         start_time=None,
     ):
-        """Start a span. if service_name is not given, otel_service is used."""
+        """Start a span; if service_name is not given, otel_service is used."""
         if component is None:
             component = self.otel_service
 
         trace_id = self.get_current_span().get_span_context().trace_id
         if span_id is not None:
-            tracer = self.get_tracer_with_id(component=component, 
trace_id=trace_id, span_id=span_id)
+            tracer = self.get_tracer(component=component, trace_id=trace_id, 
span_id=span_id)
         else:
             tracer = self.get_tracer(component)
 
-        kvs = {}
-        if self.tags is not None:
-            kvs = parse_tracestate(self.tags)
+        attributes = parse_tracestate(self.tag_string) if self.tag_string else 
{}
 
         if links is not None:
             _links = gen_links_from_kv_list(links)
         else:
             _links = []
 
         if start_time is not None:
-            start_time = int(start_time.timestamp() * 1000000000)
+            start_time = datetime_to_nano(start_time)
 
         if parent_sc is not None:
             ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
             span = tracer.start_as_current_span(
-                span_name, context=ctx, attributes=kvs, links=_links, 
start_time=start_time
+                span_name, context=ctx, attributes=attributes, links=_links, 
start_time=start_time
             )
         else:
             span = tracer.start_as_current_span(
-                span_name, attributes=kvs, links=_links, start_time=start_time
+                span_name, attributes=attributes, links=_links, 
start_time=start_time
             )

Review Comment:
   the OTel SDK docs are unbearable.... you'll need to double check this, 
depending on how start_as_current_span() handles the context value, but I think 
you can condense these 9 lines down to 
   
   ```
   span = tracer.start_as_current_span(
                   span_name, 
                   attributes=attributes, 
                   links=_links, 
                   start_time=start_time,
                   
context=trace.set_span_in_context(NonRecordingSpan(parent_sc)) if parent_sc 
else None
               )
    ```
    
    or maybe
    
   ```
   if parent_sc:
       context=trace.set_span_in_context(NonRecordingSpan(parent_sc))
   
   span = tracer.start_as_current_span(
                   span_name, 
                   attributes=attributes, 
                   links=_links, 
                   start_time=start_time,
                   context=context or None
               )
    ```
    
    But that depends on how tracer.start_as_current_span() handles the None 
value there.  May work, may not.



##########
airflow/traces/otel_tracer.py:
##########
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
         """Produce a span from dag run."""
         # check if dagrun has configs
         conf = dagrun.conf
-        trace_id = int(gen_trace_id(dag_run=dagrun), 16)
-        span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+        trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+        span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
 
         if conf is not None:
             traceparent = conf.get(TRACEPARENT)
             tracestate = conf.get(TRACESTATE)
 
-        tracer = self.get_tracer_with_id(component=component, span_id=span_id, 
trace_id=trace_id)
+        tracer = self.get_tracer(component=component, span_id=span_id, 
trace_id=trace_id)
 
-        kvstr = None
+        tag_string = None
         # merge attributes from tags and tracestate
-        if self.tags is not None:
-            kvstr = self.tags
+        if self.tag_string is not None:
+            tag_string = self.tag_string

Review Comment:
   These three lines boil down to:
   
   ```
   tag_string = self.tag_string
   ```
   
   



##########
airflow/traces/utils.py:
##########
@@ -18,52 +18,66 @@
 from __future__ import annotations
 
 import logging
+from typing import TYPE_CHECKING
 
 from airflow.utils.hashlib_wrapper import md5
 
+if TYPE_CHECKING:
+    from airflow.models import DagRun, TaskInstance
+    from airflow.models.taskinstancekey import TaskInstanceKey
+
 log = logging.getLogger(__name__)
 
 

Review Comment:
   I feel like all these `gen_foo` methods can be consolidated.... I'll put 
some thought into how.  I think maybe something like this..... maybe??
   
   ((untested code, just thinking here))
   ```
   def gen_id(seeds: List[str], as_int: bool = False, shorten: bool = False) -> 
str | int:
       seed_str = "_".join(seeds).encode("utf-8")
       hash_hex = md5(seed_str).hexdigest()
       
       if shorten:
           hash_hex = hash_hex[16:]
       
       return int(hash_hex, 16) if as_int else hash_hex    
       
       
   def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
       return gen_id([dag_run.dag_id, dag_run.run_id, 
dag_run.start_date.timestamp], as_int, shorten=False)
       
   def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) 
-> str | int:
       return gen_id([ti_key.dag_id, ti_key.run_id, ti_key.task_id, 
ti_key.try_number], as_int)     
   
   def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
       return gen_id([dag_run.dag_id, 
dag_run.run_id,dag_run.start_date.timestamp()], as_int)
       
   <and so on>    
   ```
   
   or maybe if that is too "magic" then the gen_id method could accept the 
assembled seed string, so in use it would look like:
   
   ```
   def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
       seed = 
f"{dag_run.dag_id}_{dag_run.run_id}_{dag_run.start_date.timestamp()}"
       return gen_id(seed, as_int, shorten=False)
   ```
   
   Either of those would reduce around 50 lines of repetitive code with around 
20 lines, and be just as easy to read (IMHO), easier to modify later if all of 
those needs to get changed at some point, and just as easy to change if only 
one needs to get adjusted later.  No disadvantage IMHO.



##########
airflow/traces/otel_tracer.py:
##########
@@ -143,25 +135,24 @@ def start_span_from_dagrun(
         """Produce a span from dag run."""
         # check if dagrun has configs
         conf = dagrun.conf
-        trace_id = int(gen_trace_id(dag_run=dagrun), 16)
-        span_id = int(gen_dag_span_id(dag_run=dagrun), 16)
+        trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
+        span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
 
         if conf is not None:
             traceparent = conf.get(TRACEPARENT)
             tracestate = conf.get(TRACESTATE)
 
-        tracer = self.get_tracer_with_id(component=component, span_id=span_id, 
trace_id=trace_id)
+        tracer = self.get_tracer(component=component, span_id=span_id, 
trace_id=trace_id)
 
-        kvstr = None
+        tag_string = None
         # merge attributes from tags and tracestate
-        if self.tags is not None:
-            kvstr = self.tags
+        if self.tag_string is not None:
+            tag_string = self.tag_string
         if tracestate is not None:
-            if kvstr is None:
-                kvstr = tracestate
+            if tag_string is None:
+                tag_string = tracestate
             else:
-                kvstr = kvstr + "," + tracestate
-        kvs = parse_tracestate(kvstr)
+                tag_string = tag_string + "," + tracestate
 

Review Comment:
   Consider simplifying all of L145 - L155 with the following?
   
   ```
   if self.tag_string and tracestate:
       tag_string = self.tag_string + "," + tracestate
   else:
       tag_string = self.tag_string or tracestate
   ```
   
   that last line is the same as 
   
   ```
   if self.tag_string is not None:
       tag_string = self.tag_string
   elif tracestate is not None:
      tag_string = tracestate
   else:
       tag_string = None
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to