ashb commented on code in PR #46677:
URL: https://github.com/apache/airflow/pull/46677#discussion_r1953560138


##########
airflow/jobs/triggerer_job_runner.py:
##########
@@ -314,95 +116,222 @@ def on_kill(self):
 
         Called when there is an external kill command (via the heartbeat 
mechanism, for example).
         """
+        # TODO: signal instead.
         self.trigger_runner.stop = True
 
-    def _kill_listener(self):
-        if self.listener:
-            for h in self.listener.handlers:
-                h.close()
-            self.listener.stop()
-
     def _exit_gracefully(self, signum, frame) -> None:
         # The first time, try to exit nicely
-        if not self.trigger_runner.stop:
+        if self.trigger_runner and not self.trigger_runner.stop:
             self.log.info("Exiting gracefully upon receiving signal %s", 
signum)
             self.trigger_runner.stop = True
-            self._kill_listener()
         else:
             self.log.warning("Forcing exit due to second exit signal %s", 
signum)
+
+            self.trigger_runner.kill(signal.SIGKILL)
             sys.exit(os.EX_SOFTWARE)
 
     def _execute(self) -> int | None:
         self.log.info("Starting the triggerer")
         try:
-            # set job_id so that it can be used in log file names
-            self.trigger_runner.job_id = self.job.id
+            # Kick off runner sub-process without DB access
+            self.trigger_runner = TriggerRunnerMonitor.start(
+                job=self.job, capacity=self.capacity, 
logger=structlog.get_logger(logger_name="triggerer2")
+            )
 
-            # Kick off runner thread
-            self.trigger_runner.start()
-            # Start our own DB loop in the main thread
-            self._run_trigger_loop()
+            # Run the main DB comms loop in this process
+            self.trigger_runner.run_db_loop()
         except Exception:
-            self.log.exception("Exception when executing 
TriggererJobRunner._run_trigger_loop")
+            self.log.exception("Exception when executing 
TriggerRunnerMonitor.run_db_loop")
             raise
         finally:
             self.log.info("Waiting for triggers to clean up")
-            # Tell the subthread to stop and then wait for it.
+            # Tell the subtproc to stop and then wait for it.
             # If the user interrupts/terms again, _graceful_exit will allow 
them
             # to force-kill here.
-            self.trigger_runner.stop = True
-            self.trigger_runner.join(30)
+            # self.trigger_runner.stop = True
+            self.trigger_runner.kill(escalation_delay=10, force=True)
             self.log.info("Exited trigger loop")
         return None
 
-    def _run_trigger_loop(self) -> None:
-        """Run synchronously and handle all database reads/writes; the 
main-thread trigger loop."""
-        while not self.trigger_runner.stop:
-            if not self.trigger_runner.is_alive():
-                self.log.error("Trigger runner thread has died! Exiting.")
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
+
+
+class StartTriggerer(BaseModel):
+    requests_fd: int
+    type: Literal["StartTriggerer"] = "StartTriggerer"
+
+
+class CancelTriggers(BaseModel):
+    ids: Iterable[int]
+    type: Literal["CancelTriggersMessage"] = "CancelTriggersMessage"
+
+
+ToAsyncProcess = Annotated[
+    Union[workloads.RunTrigger, CancelTriggers, StartTriggerer],
+    Field(discriminator="type"),
+]
+
+
+class TriggerStateChanges(BaseModel):
+    """Report state change about triggers back to the TriggerRunnerMonitor."""
+
+    type: Literal["TriggerStateChanges"] = "TriggerStateChanges"
+    events: Annotated[
+        list[tuple[int, events.DiscrimatedTriggerEvent]] | None,
+        # We have to specify a default here, as otherwise Pydantic stuggles to 
deal with the discrimated union
+        # :shrug:
+        Field(default=None),
+    ]
+    failures: list[tuple[int, str]] | None = None
+    finished: list[int] | None = None
+
+
+ToSyncProcess = Annotated[
+    Union[TriggerStateChanges],
+    Field(discriminator="type"),
+]
+
+
[email protected](kw_only=True)
+class TriggerLoggingFactory:
+    log_path: str
+
+    bound_logger: WrappedLogger = attrs.field(init=False)
+
+    def __call__(self, processors: Iterable[structlog.typing.Processor]) -> 
WrappedLogger:
+        if hasattr(self, "bound_logger"):
+            return self.bound_logger
+
+        from airflow.sdk.log import init_log_file
+
+        log_file = init_log_file(self.log_path)
+
+        pretty_logs = False
+        if pretty_logs:
+            underlying_logger: WrappedLogger = 
structlog.WriteLogger(log_file.open("w", buffering=1))
+        else:
+            underlying_logger = structlog.BytesLogger(log_file.open("wb"))
+        logger = structlog.wrap_logger(underlying_logger, 
processors=processors).bind()
+        self.bound_logger = logger
+        return logger
+
+
[email protected](kw_only=True)
+class TriggerRunnerMonitor(WatchedSubprocess):
+    """
+    TriggerRunnerProcess runs an async trigger loop in another thread to 
ensure database isolation.
+
+    For speed of implementation when moving from Airflow 2 to Airflow 3 (in 
process database access to
+    isolated) we have kept the reactor/async loop running in a thread, when in 
practice it could be done
+    directly in the main thread by reading the JSON messages on stdin, but 
that.
+
+    In short: this class and the thread should be avoided -- it's writen this 
way for speed of implementation.
+    Sorry folks. @ashb -- 2025/02/10
+    """
+
+    job: Job
+    capacity: int
+
+    health_check_threshold = conf.getint("triggerer", 
"triggerer_health_check_threshold")
+
+    runner: TriggerRunner | None = None
+    stop: bool = False
+
+    decoder: ClassVar[TypeAdapter[ToSyncProcess]] = TypeAdapter(ToSyncProcess)
+
+    # Maps trigger IDs that we think are running in the sub process
+    running_triggers: set[int] = attrs.field(factory=set, init=False)
+
+    logger_cache: dict[int, TriggerLoggingFactory] = attrs.field(factory=dict, 
init=False)
+
+    # A list of triggers that we have told the async process to cancel. We 
keep them here until we receive the
+    # FinishedTriggers message
+    cancelling_triggers: set[int] = attrs.field(factory=set, init=False)
+
+    # Outbound queue of events
+    events: deque[tuple[int, events.TriggerEvent]] = 
attrs.field(factory=deque, init=False)
+
+    # Outbound queue of failed triggers
+    # failed_triggers: deque[tuple[int, BaseException]] = 
attrs.field(factory=deque, init=False)
+    failed_triggers: deque[tuple[int, str]] = attrs.field(factory=deque, 
init=False)
+
+    def is_alive(self) -> bool:
+        # Set by `_service_subprocess` in the loop
+        return self._exit_code is None
+
+    @classmethod
+    def start(  # type: ignore[override]
+        cls,
+        *,
+        job: Job,
+        logger=None,
+        **kwargs,
+    ):
+        proc = super().start(id=job.id, job=job, target=cls.run_in_process, 
logger=logger, **kwargs)
+
+        msg = 
StartTriggerer(requests_fd=proc._requests_fd).model_dump_json().encode() + b"\n"
+        proc.stdin.write(msg)
+        return proc
+
+    def _handle_request(self, msg: ToSyncProcess, log: FilteringBoundLogger) 
-> None:  # type: ignore[override]
+        if isinstance(msg, TriggerStateChanges):
+            log.debug("State change from async process", state=msg)
+            if msg.events:
+                self.events.extend(msg.events)
+            if msg.failures:
+                self.failed_triggers.extend(msg.failures)
+            for id in msg.finished or tuple():
+                self.running_triggers.discard(id)
+                self.cancelling_triggers.discard(id)
+                # TODO: Close logger? Or is deleting it enough
+                self.logger_cache.pop(id, None)
+            return
+
+        raise ValueError(f"Unknown message type {type(msg)}")
+
+    def run_db_loop(self) -> None:
+        """Run synchronously and handle all database reads/writes."""
+        while not self.stop:
+            if not self.is_alive():
+                log.error("Trigger runner thread has died! Exiting.")
                 break
             with Trace.start_span(span_name="triggerer_job_loop", 
component="TriggererJobRunner") as span:
-                # Clean out unused triggers
-                if span.is_recording():
-                    span.add_event(name="Trigger.clean_unused")
-                Trigger.clean_unused()
-                # Load/delete triggers
-                if span.is_recording():
-                    span.add_event(name="load_triggers")
+                # Wait for up to 1 second for activity
+                self._service_subprocess(1)
+
                 self.load_triggers()
-                # Handle events
-                if span.is_recording():
-                    span.add_event(name="handle_events")

Review Comment:
   This was pointless noise -- handle_events is already decorated with 
`@add_span` so we can almost nothing by having an event too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to