howardyoo commented on code in PR #40802:
URL: https://github.com/apache/airflow/pull/40802#discussion_r1683583354
##########
airflow/dag_processing/manager.py:
##########
@@ -562,118 +568,144 @@ def _run_parsing_loop(self):
# in sync mode we need to be told to start a "loop"
self.start_new_processes()
while True:
- loop_start_time = time.monotonic()
- ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
- self.heartbeat()
- if self._direct_scheduler_conn is not None and
self._direct_scheduler_conn in ready:
- agent_signal = self._direct_scheduler_conn.recv()
-
- self.log.debug("Received %s signal from
DagFileProcessorAgent", agent_signal)
- if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
- self.terminate()
- break
- elif agent_signal == DagParsingSignal.END_MANAGER:
- self.end()
- sys.exit(os.EX_OK)
- elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
- # continue the loop to parse dags
- pass
- elif isinstance(agent_signal, CallbackRequest):
- self._add_callback_to_queue(agent_signal)
- else:
- raise ValueError(f"Invalid message {type(agent_signal)}")
-
- if not ready and not self._async_mode:
- # In "sync" mode we don't want to parse the DAGs until we
- # are told to (as that would open another connection to the
- # SQLite DB which isn't a good practice
-
- # This shouldn't happen, as in sync mode poll should block for
- # ever. Lets be defensive about that.
- self.log.warning(
- "wait() unexpectedly returned nothing ready after infinite
timeout (%r)!", poll_time
- )
-
- continue
-
- for sentinel in ready:
- if sentinel is not self._direct_scheduler_conn:
- processor = self.waitables.get(sentinel)
- if processor:
- self._collect_results_from_processor(processor)
- self.waitables.pop(sentinel)
- self._processors.pop(processor.file_path)
-
- if self.standalone_dag_processor:
- self._fetch_callbacks(max_callbacks_per_loop)
- self._scan_stale_dags()
- DagWarning.purge_inactive_dag_warnings()
- refreshed_dag_dir = self._refresh_dag_dir()
-
- self._kill_timed_out_processors()
-
- # Generate more file paths to process if we processed all the
files already. Note for this
- # to clear down, we must have cleared all files found from
scanning the dags dir _and_ have
- # cleared all files added as a result of callbacks
- if not self._file_path_queue:
- self.emit_metrics()
- self.prepare_file_path_queue()
-
- # if new files found in dag dir, add them
- elif refreshed_dag_dir:
- self.add_new_file_path_to_queue()
-
- self._refresh_requested_filelocs()
- self.start_new_processes()
-
- # Update number of loop iteration.
- self._num_run += 1
-
- if not self._async_mode:
- self.log.debug("Waiting for processors to finish since we're
using sqlite")
- # Wait until the running DAG processors are finished before
- # sending a DagParsingStat message back. This means the Agent
- # can tell we've got to the end of this iteration when it sees
- # this type of message
- self.wait_until_finished()
-
- # Collect anything else that has finished, but don't kick off any
more processors
- self.collect_results()
+ with Trace.start_span(span_name="dag_parsing_loop",
component="DagFileProcessorManager") as span:
+ loop_start_time = time.monotonic()
+ ready = multiprocessing.connection.wait(self.waitables.keys(),
timeout=poll_time)
+ self.heartbeat()
+ if span.is_recording():
+ span.add_event(name="heartbeat()")
+ if self._direct_scheduler_conn is not None and
self._direct_scheduler_conn in ready:
+ agent_signal = self._direct_scheduler_conn.recv()
+
+ self.log.debug("Received %s signal from
DagFileProcessorAgent", agent_signal)
+ if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
+ self.terminate()
+ if span.is_recording():
+ span.add_event(name="terminate()")
+ break
+ elif agent_signal == DagParsingSignal.END_MANAGER:
+ self.end()
+ if span.is_recording():
+ span.add_event(name="end()")
+ sys.exit(os.EX_OK)
+ elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
+ # continue the loop to parse dags
+ pass
+ elif isinstance(agent_signal, CallbackRequest):
+ self._add_callback_to_queue(agent_signal)
+ else:
+ raise ValueError(f"Invalid message
{type(agent_signal)}")
+
+ if not ready and not self._async_mode:
+ # In "sync" mode we don't want to parse the DAGs until we
+ # are told to (as that would open another connection to the
+ # SQLite DB which isn't a good practice
+
+ # This shouldn't happen, as in sync mode poll should block
for
+ # ever. Lets be defensive about that.
+ self.log.warning(
+ "wait() unexpectedly returned nothing ready after
infinite timeout (%r)!", poll_time
+ )
- self._print_stat()
+ continue
- all_files_processed = all(self.get_last_finish_time(x) is not None
for x in self.file_paths)
- max_runs_reached = self.max_runs_reached()
+ for sentinel in ready:
+ if sentinel is not self._direct_scheduler_conn:
+ processor = self.waitables.get(sentinel)
+ if processor:
+ self._collect_results_from_processor(processor)
+ self.waitables.pop(sentinel)
+ self._processors.pop(processor.file_path)
+
+ if self.standalone_dag_processor:
+ self._fetch_callbacks(max_callbacks_per_loop)
+ self._scan_stale_dags()
+ DagWarning.purge_inactive_dag_warnings()
+ refreshed_dag_dir = self._refresh_dag_dir()
+
+ self._kill_timed_out_processors()
+ if span.is_recording():
+ span.add_event(name="_kill_timed_out_processors()")
Review Comment:
> This pattern is repeated a LOT. For future consideration, I wonder if
there might be a way to add a decorator to the classes which adds an event when
it is called. It would mean storing span as a class object (as self.span) and
maybe something roughly along the lines of
>
> ```
> def addBasicTraceEvent(self, func):
> def wrapper(self, func):
> if self.span.is_recording():
> span.add_event(name=func.__name__)
> return wrapper
> ```
>
> then just adding the @addBasicTraceEvent decorator to each of these
methods that don't have an attributes value. Maybe that could be a future
project for someone.
Yes, definitely think the pattern can be wrapped to a decorator. I'd like to
defer this to the next project :-)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]