dstandish commented on code in PR #46677:
URL: https://github.com/apache/airflow/pull/46677#discussion_r1953555955


##########
airflow/jobs/triggerer_job_runner.py:
##########
@@ -314,95 +116,222 @@ def on_kill(self):
 
         Called when there is an external kill command (via the heartbeat 
mechanism, for example).
         """
+        # TODO: signal instead.
         self.trigger_runner.stop = True
 
-    def _kill_listener(self):
-        if self.listener:
-            for h in self.listener.handlers:
-                h.close()
-            self.listener.stop()
-
     def _exit_gracefully(self, signum, frame) -> None:
         # The first time, try to exit nicely
-        if not self.trigger_runner.stop:
+        if self.trigger_runner and not self.trigger_runner.stop:
             self.log.info("Exiting gracefully upon receiving signal %s", 
signum)
             self.trigger_runner.stop = True
-            self._kill_listener()
         else:
             self.log.warning("Forcing exit due to second exit signal %s", 
signum)
+
+            self.trigger_runner.kill(signal.SIGKILL)
             sys.exit(os.EX_SOFTWARE)
 
     def _execute(self) -> int | None:
         self.log.info("Starting the triggerer")
         try:
-            # set job_id so that it can be used in log file names
-            self.trigger_runner.job_id = self.job.id
+            # Kick off runner sub-process without DB access
+            self.trigger_runner = TriggerRunnerMonitor.start(
+                job=self.job, capacity=self.capacity, 
logger=structlog.get_logger(logger_name="triggerer2")
+            )
 
-            # Kick off runner thread
-            self.trigger_runner.start()
-            # Start our own DB loop in the main thread
-            self._run_trigger_loop()
+            # Run the main DB comms loop in this process
+            self.trigger_runner.run_db_loop()
         except Exception:
-            self.log.exception("Exception when executing 
TriggererJobRunner._run_trigger_loop")
+            self.log.exception("Exception when executing 
TriggerRunnerMonitor.run_db_loop")
             raise
         finally:
             self.log.info("Waiting for triggers to clean up")
-            # Tell the subthread to stop and then wait for it.
+            # Tell the subtproc to stop and then wait for it.
             # If the user interrupts/terms again, _graceful_exit will allow 
them
             # to force-kill here.
-            self.trigger_runner.stop = True
-            self.trigger_runner.join(30)
+            # self.trigger_runner.stop = True

Review Comment:
   commented code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to