ferruzzi commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1683536667


##########
airflow/executors/base_executor.py:
##########
@@ -338,6 +379,20 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
         :param info: Executor information for the task instance
         :param key: Unique key for the task instance
         """
+        trace_id = Trace.get_current_span().get_span_context().trace_id
+        if trace_id != 1:

Review Comment:
   Here and elsewhere: consider adding a comment or creating a constant with a 
descriptive name for the `1`.   I don't understand the significance of the 
value of 1 here, it's just some magic number.



##########
airflow/dag_processing/manager.py:
##########
@@ -562,118 +568,144 @@ def _run_parsing_loop(self):
             # in sync mode we need to be told to start a "loop"
             self.start_new_processes()
         while True:
-            loop_start_time = time.monotonic()
-            ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
-            self.heartbeat()
-            if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
-                agent_signal = self._direct_scheduler_conn.recv()
-
-                self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
-                if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
-                    self.terminate()
-                    break
-                elif agent_signal == DagParsingSignal.END_MANAGER:
-                    self.end()
-                    sys.exit(os.EX_OK)
-                elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
-                    # continue the loop to parse dags
-                    pass
-                elif isinstance(agent_signal, CallbackRequest):
-                    self._add_callback_to_queue(agent_signal)
-                else:
-                    raise ValueError(f"Invalid message {type(agent_signal)}")
-
-            if not ready and not self._async_mode:
-                # In "sync" mode we don't want to parse the DAGs until we
-                # are told to (as that would open another connection to the
-                # SQLite DB which isn't a good practice
-
-                # This shouldn't happen, as in sync mode poll should block for
-                # ever. Lets be defensive about that.
-                self.log.warning(
-                    "wait() unexpectedly returned nothing ready after infinite 
timeout (%r)!", poll_time
-                )
-
-                continue
-
-            for sentinel in ready:
-                if sentinel is not self._direct_scheduler_conn:
-                    processor = self.waitables.get(sentinel)
-                    if processor:
-                        self._collect_results_from_processor(processor)
-                        self.waitables.pop(sentinel)
-                        self._processors.pop(processor.file_path)
-
-            if self.standalone_dag_processor:
-                self._fetch_callbacks(max_callbacks_per_loop)
-            self._scan_stale_dags()
-            DagWarning.purge_inactive_dag_warnings()
-            refreshed_dag_dir = self._refresh_dag_dir()
-
-            self._kill_timed_out_processors()
-
-            # Generate more file paths to process if we processed all the 
files already. Note for this
-            # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
-            # cleared all files added as a result of callbacks
-            if not self._file_path_queue:
-                self.emit_metrics()
-                self.prepare_file_path_queue()
-
-            # if new files found in dag dir, add them
-            elif refreshed_dag_dir:
-                self.add_new_file_path_to_queue()
-
-            self._refresh_requested_filelocs()
-            self.start_new_processes()
-
-            # Update number of loop iteration.
-            self._num_run += 1
-
-            if not self._async_mode:
-                self.log.debug("Waiting for processors to finish since we're 
using sqlite")
-                # Wait until the running DAG processors are finished before
-                # sending a DagParsingStat message back. This means the Agent
-                # can tell we've got to the end of this iteration when it sees
-                # this type of message
-                self.wait_until_finished()
-
-            # Collect anything else that has finished, but don't kick off any 
more processors
-            self.collect_results()
+            with Trace.start_span(span_name="dag_parsing_loop", 
component="DagFileProcessorManager") as span:
+                loop_start_time = time.monotonic()
+                ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
+                self.heartbeat()
+                if span.is_recording():
+                    span.add_event(name="heartbeat()")

Review Comment:
   Non-blocking question, I don't have the end-user experience to make this 
call so fee free to disagree and ignore:   I get you are using the parenthesis 
at the end of the name to indicate the event was a method that was called.  It 
makes sense.   Is that some kind of standard?  Would there be any down side to 
dropping the parenthesis here and elsewhere?  It seems like that would result 
in a cleaner UX when the user is viewing these. 



##########
airflow/jobs/job.py:
##########
@@ -199,52 +200,65 @@ def heartbeat(
         :param session to use for saving the job
         """
         previous_heartbeat = self.latest_heartbeat
-
-        try:
-            # This will cause it to load from the db
-            self._merge_from(Job._fetch_from_db(self, session))
-            previous_heartbeat = self.latest_heartbeat
-
-            if self.state == JobState.RESTARTING:
-                self.kill()
-
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
-
-            job = Job._update_heartbeat(job=self, session=session)
-            self._merge_from(job)
-            time_since_last_heartbeat = (timezone.utcnow() - 
previous_heartbeat).total_seconds()
-            health_check_threshold_value = 
health_check_threshold(self.job_type, self.heartrate)
-            if time_since_last_heartbeat > health_check_threshold_value:
-                self.log.info("Heartbeat recovered after %.2f seconds", 
time_since_last_heartbeat)
-            # At this point, the DB has updated.
-            previous_heartbeat = self.latest_heartbeat
-
-            heartbeat_callback(session)
-            self.log.debug("[heartbeat]")
-            self.heartbeat_failed = False
-        except OperationalError:
-            Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
-            if not self.heartbeat_failed:
-                self.log.exception("%s heartbeat failed with error", 
self.__class__.__name__)
-                self.heartbeat_failed = True
-            if self.is_alive():
-                self.log.error(
-                    "%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
-                    self.__class__.__name__,
-                )
-            else:
-                self.log.error(
-                    "%s heartbeat failed with error. Scheduler is in unhealthy 
state", self.__class__.__name__
-                )
-            # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
-            self.latest_heartbeat = previous_heartbeat
+        with Trace.start_span(span_name="heartbeat", component="Job") as span:
+            try:
+                span.set_attribute("heartbeat", str(self.latest_heartbeat))
+                # This will cause it to load from the db
+                self._merge_from(Job._fetch_from_db(self, session))
+                previous_heartbeat = self.latest_heartbeat
+
+                if self.state == JobState.RESTARTING:
+                    self.kill()
+
+                # Figure out how long to sleep for
+                sleep_for = 0
+                if self.latest_heartbeat:
+                    seconds_remaining = (
+                        self.heartrate - (timezone.utcnow() - 
self.latest_heartbeat).total_seconds()
+                    )
+                    sleep_for = max(0, seconds_remaining)
+                if span.is_recording():
+                    span.add_event(name="sleep()", attributes={"sleep_for": 
sleep_for})
+                sleep(sleep_for)
+
+                job = Job._update_heartbeat(job=self, session=session)
+                self._merge_from(job)
+                time_since_last_heartbeat = (timezone.utcnow() - 
previous_heartbeat).total_seconds()
+                health_check_threshold_value = 
health_check_threshold(self.job_type, self.heartrate)
+                if time_since_last_heartbeat > health_check_threshold_value:
+                    self.log.info("Heartbeat recovered after %.2f seconds", 
time_since_last_heartbeat)
+                # At this point, the DB has updated.
+                previous_heartbeat = self.latest_heartbeat
+
+                heartbeat_callback(session)
+                self.log.debug("[heartbeat]")
+                self.heartbeat_failed = False
+            except OperationalError:
+                Stats.incr(convert_camel_to_snake(self.__class__.__name__) + 
"_heartbeat_failure", 1, 1)
+                if not self.heartbeat_failed:
+                    self.log.exception("%s heartbeat failed with error", 
self.__class__.__name__)
+                    self.heartbeat_failed = True
+                    msg = f"{self.__class__.__name__} heartbeat got an 
exception"
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})
+                if self.is_alive():
+                    self.log.error(
+                        "%s heartbeat failed with error. Scheduler may go into 
unhealthy state",
+                        self.__class__.__name__,
+                    )
+                    msg = f"{self.__class__.__name__} heartbeat failed with 
error. Scheduler may go into unhealthy state"
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})
+                else:
+                    self.log.error(
+                        "%s heartbeat failed with error. Scheduler is in 
unhealthy state",
+                        self.__class__.__name__,
+                    )
+                    msg = f"{self.__class__.__name__} heartbeat failed with 
error. Scheduler is in unhealthy state"
+                    if span.is_recording():
+                        span.add_event(name="error", attributes={"message": 
msg})

Review Comment:
   You may as well combine these:
   ```suggestion
                       msg = f"{self.__class__.__name__} heartbeat failed with 
error. Scheduler is in unhealthy state"
                       self.log.error(msg)
                       if span.is_recording():
                           span.add_event(name="error", attributes={"message": 
msg})
   ```



##########
airflow/dag_processing/manager.py:
##########
@@ -562,118 +568,144 @@ def _run_parsing_loop(self):
             # in sync mode we need to be told to start a "loop"
             self.start_new_processes()
         while True:
-            loop_start_time = time.monotonic()
-            ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
-            self.heartbeat()
-            if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
-                agent_signal = self._direct_scheduler_conn.recv()
-
-                self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
-                if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
-                    self.terminate()
-                    break
-                elif agent_signal == DagParsingSignal.END_MANAGER:
-                    self.end()
-                    sys.exit(os.EX_OK)
-                elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
-                    # continue the loop to parse dags
-                    pass
-                elif isinstance(agent_signal, CallbackRequest):
-                    self._add_callback_to_queue(agent_signal)
-                else:
-                    raise ValueError(f"Invalid message {type(agent_signal)}")
-
-            if not ready and not self._async_mode:
-                # In "sync" mode we don't want to parse the DAGs until we
-                # are told to (as that would open another connection to the
-                # SQLite DB which isn't a good practice
-
-                # This shouldn't happen, as in sync mode poll should block for
-                # ever. Lets be defensive about that.
-                self.log.warning(
-                    "wait() unexpectedly returned nothing ready after infinite 
timeout (%r)!", poll_time
-                )
-
-                continue
-
-            for sentinel in ready:
-                if sentinel is not self._direct_scheduler_conn:
-                    processor = self.waitables.get(sentinel)
-                    if processor:
-                        self._collect_results_from_processor(processor)
-                        self.waitables.pop(sentinel)
-                        self._processors.pop(processor.file_path)
-
-            if self.standalone_dag_processor:
-                self._fetch_callbacks(max_callbacks_per_loop)
-            self._scan_stale_dags()
-            DagWarning.purge_inactive_dag_warnings()
-            refreshed_dag_dir = self._refresh_dag_dir()
-
-            self._kill_timed_out_processors()
-
-            # Generate more file paths to process if we processed all the 
files already. Note for this
-            # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
-            # cleared all files added as a result of callbacks
-            if not self._file_path_queue:
-                self.emit_metrics()
-                self.prepare_file_path_queue()
-
-            # if new files found in dag dir, add them
-            elif refreshed_dag_dir:
-                self.add_new_file_path_to_queue()
-
-            self._refresh_requested_filelocs()
-            self.start_new_processes()
-
-            # Update number of loop iteration.
-            self._num_run += 1
-
-            if not self._async_mode:
-                self.log.debug("Waiting for processors to finish since we're 
using sqlite")
-                # Wait until the running DAG processors are finished before
-                # sending a DagParsingStat message back. This means the Agent
-                # can tell we've got to the end of this iteration when it sees
-                # this type of message
-                self.wait_until_finished()
-
-            # Collect anything else that has finished, but don't kick off any 
more processors
-            self.collect_results()
+            with Trace.start_span(span_name="dag_parsing_loop", 
component="DagFileProcessorManager") as span:
+                loop_start_time = time.monotonic()
+                ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
+                self.heartbeat()
+                if span.is_recording():
+                    span.add_event(name="heartbeat()")
+                if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
+                    agent_signal = self._direct_scheduler_conn.recv()
+
+                    self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
+                    if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+                        self.terminate()
+                        if span.is_recording():
+                            span.add_event(name="terminate()")
+                        break
+                    elif agent_signal == DagParsingSignal.END_MANAGER:
+                        self.end()
+                        if span.is_recording():
+                            span.add_event(name="end()")

Review Comment:
   Same caveat, I haven't seen this in use as a user so feel free to disagree 
and resolve. On an instinctual level, it feels like we should add_event before 
calling the method. In this case, if heartbeat() fails and causes an issue, 
that call to heartbeat wouldn't be noted in the span and that would make it 
harder to trace back where it actually ran into the issue, no?  I see you did 
it that way in some places below, but not everywhere, so maybe there is a 
reason for the inconsistency.



##########
airflow/traces/utils.py:
##########
@@ -40,26 +40,32 @@ def _gen_id(seeds: list[str], as_int: bool = False, type: 
int = TRACE_ID) -> str
 
 
 def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
+    if dag_run.start_date is None:
+        return 1

Review Comment:
   If this is the answer to my above question about why we are checking if it 
is `1`, then I definitely feel this is the wrong option to hardcode the literal 
integer.  Either use something like ArgNotSet or create a constant with a name 
like `NO_TRACE_ID = 1` and use that everywhere.



##########
airflow/dag_processing/manager.py:
##########
@@ -562,118 +568,144 @@ def _run_parsing_loop(self):
             # in sync mode we need to be told to start a "loop"
             self.start_new_processes()
         while True:
-            loop_start_time = time.monotonic()
-            ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
-            self.heartbeat()
-            if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
-                agent_signal = self._direct_scheduler_conn.recv()
-
-                self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
-                if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
-                    self.terminate()
-                    break
-                elif agent_signal == DagParsingSignal.END_MANAGER:
-                    self.end()
-                    sys.exit(os.EX_OK)
-                elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
-                    # continue the loop to parse dags
-                    pass
-                elif isinstance(agent_signal, CallbackRequest):
-                    self._add_callback_to_queue(agent_signal)
-                else:
-                    raise ValueError(f"Invalid message {type(agent_signal)}")
-
-            if not ready and not self._async_mode:
-                # In "sync" mode we don't want to parse the DAGs until we
-                # are told to (as that would open another connection to the
-                # SQLite DB which isn't a good practice
-
-                # This shouldn't happen, as in sync mode poll should block for
-                # ever. Lets be defensive about that.
-                self.log.warning(
-                    "wait() unexpectedly returned nothing ready after infinite 
timeout (%r)!", poll_time
-                )
-
-                continue
-
-            for sentinel in ready:
-                if sentinel is not self._direct_scheduler_conn:
-                    processor = self.waitables.get(sentinel)
-                    if processor:
-                        self._collect_results_from_processor(processor)
-                        self.waitables.pop(sentinel)
-                        self._processors.pop(processor.file_path)
-
-            if self.standalone_dag_processor:
-                self._fetch_callbacks(max_callbacks_per_loop)
-            self._scan_stale_dags()
-            DagWarning.purge_inactive_dag_warnings()
-            refreshed_dag_dir = self._refresh_dag_dir()
-
-            self._kill_timed_out_processors()
-
-            # Generate more file paths to process if we processed all the 
files already. Note for this
-            # to clear down, we must have cleared all files found from 
scanning the dags dir _and_ have
-            # cleared all files added as a result of callbacks
-            if not self._file_path_queue:
-                self.emit_metrics()
-                self.prepare_file_path_queue()
-
-            # if new files found in dag dir, add them
-            elif refreshed_dag_dir:
-                self.add_new_file_path_to_queue()
-
-            self._refresh_requested_filelocs()
-            self.start_new_processes()
-
-            # Update number of loop iteration.
-            self._num_run += 1
-
-            if not self._async_mode:
-                self.log.debug("Waiting for processors to finish since we're 
using sqlite")
-                # Wait until the running DAG processors are finished before
-                # sending a DagParsingStat message back. This means the Agent
-                # can tell we've got to the end of this iteration when it sees
-                # this type of message
-                self.wait_until_finished()
-
-            # Collect anything else that has finished, but don't kick off any 
more processors
-            self.collect_results()
+            with Trace.start_span(span_name="dag_parsing_loop", 
component="DagFileProcessorManager") as span:
+                loop_start_time = time.monotonic()
+                ready = multiprocessing.connection.wait(self.waitables.keys(), 
timeout=poll_time)
+                self.heartbeat()
+                if span.is_recording():
+                    span.add_event(name="heartbeat()")
+                if self._direct_scheduler_conn is not None and 
self._direct_scheduler_conn in ready:
+                    agent_signal = self._direct_scheduler_conn.recv()
+
+                    self.log.debug("Received %s signal from 
DagFileProcessorAgent", agent_signal)
+                    if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+                        self.terminate()
+                        if span.is_recording():
+                            span.add_event(name="terminate()")
+                        break
+                    elif agent_signal == DagParsingSignal.END_MANAGER:
+                        self.end()
+                        if span.is_recording():
+                            span.add_event(name="end()")
+                        sys.exit(os.EX_OK)
+                    elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
+                        # continue the loop to parse dags
+                        pass
+                    elif isinstance(agent_signal, CallbackRequest):
+                        self._add_callback_to_queue(agent_signal)
+                    else:
+                        raise ValueError(f"Invalid message 
{type(agent_signal)}")
+
+                if not ready and not self._async_mode:
+                    # In "sync" mode we don't want to parse the DAGs until we
+                    # are told to (as that would open another connection to the
+                    # SQLite DB which isn't a good practice
+
+                    # This shouldn't happen, as in sync mode poll should block 
for
+                    # ever. Lets be defensive about that.
+                    self.log.warning(
+                        "wait() unexpectedly returned nothing ready after 
infinite timeout (%r)!", poll_time
+                    )
 
-            self._print_stat()
+                    continue
 
-            all_files_processed = all(self.get_last_finish_time(x) is not None 
for x in self.file_paths)
-            max_runs_reached = self.max_runs_reached()
+                for sentinel in ready:
+                    if sentinel is not self._direct_scheduler_conn:
+                        processor = self.waitables.get(sentinel)
+                        if processor:
+                            self._collect_results_from_processor(processor)
+                            self.waitables.pop(sentinel)
+                            self._processors.pop(processor.file_path)
+
+                if self.standalone_dag_processor:
+                    self._fetch_callbacks(max_callbacks_per_loop)
+                self._scan_stale_dags()
+                DagWarning.purge_inactive_dag_warnings()
+                refreshed_dag_dir = self._refresh_dag_dir()
+
+                self._kill_timed_out_processors()
+                if span.is_recording():
+                    span.add_event(name="_kill_timed_out_processors()")

Review Comment:
   This pattern is repeated a LOT.   For future consideration, I wonder if 
there might be a way to add a decorator to the classes which adds an event when 
it is called.  It would mean storing span as a class object (as self.span) and 
maybe something roughly along the lines of 
   
   ```
   def addBasicTraceEvent(self, func):
       def wrapper(self, func):
           if self.span.is_recording():
               span.add_event(name=func.__name__)
       return wrapper
   ```    
   
   then just adding the @addBasicTraceEvent decorator to each of these methods 
that don't have an attributes value.  Maybe that could be a future project for 
someone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to