jedcunningham commented on a change in pull request #17846:
URL: https://github.com/apache/airflow/pull/17846#discussion_r697732194



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1131,6 +1134,44 @@ def _emit_pool_metrics(self, session: Session = None) -> 
None:
     @provide_session
     def heartbeat_callback(self, session: Session = None) -> None:
         Stats.incr('scheduler_heartbeat', 1, 1)
+        if self.terminating:
+            return
+        if not self.is_alive():
+            self.on_kill()
+
+    @provide_session
+    def on_kill(self, session=None) -> None:
+        self.log.error("SchedulerJob failed. Job ID: %s", self.id)
+        self.state = State.FAILED

Review comment:
       Don't we get here by the state being in a terminating state already? Is 
there another path?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1131,6 +1134,44 @@ def _emit_pool_metrics(self, session: Session = None) -> 
None:
     @provide_session
     def heartbeat_callback(self, session: Session = None) -> None:
         Stats.incr('scheduler_heartbeat', 1, 1)
+        if self.terminating:
+            return
+        if not self.is_alive():
+            self.on_kill()
+
+    @provide_session
+    def on_kill(self, session=None) -> None:
+        self.log.error("SchedulerJob failed. Job ID: %s", self.id)
+        self.state = State.FAILED
+        self.end_date = timezone.utcnow()
+        self.terminating = True

Review comment:
       Assuming we end up here because of:
   
https://github.com/apache/airflow/blob/0abbd2d918ad9027948fd8a33ebb42487e4aa000/airflow/jobs/base_job.py#L205-L206
   
   Won't we not even make it to the next `heartbeat_callback`, meaning the 
`terminating` stuff you're adding won't be used? Am I missing something?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1131,6 +1134,44 @@ def _emit_pool_metrics(self, session: Session = None) -> 
None:
     @provide_session
     def heartbeat_callback(self, session: Session = None) -> None:
         Stats.incr('scheduler_heartbeat', 1, 1)
+        if self.terminating:
+            return
+        if not self.is_alive():
+            self.on_kill()

Review comment:
       ```suggestion
               self.on_kill()
   ```
   
   Should this call `.kill()` instead?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1131,6 +1134,44 @@ def _emit_pool_metrics(self, session: Session = None) -> 
None:
     @provide_session
     def heartbeat_callback(self, session: Session = None) -> None:
         Stats.incr('scheduler_heartbeat', 1, 1)
+        if self.terminating:
+            return
+        if not self.is_alive():
+            self.on_kill()
+
+    @provide_session
+    def on_kill(self, session=None) -> None:
+        self.log.error("SchedulerJob failed. Job ID: %s", self.id)
+        self.state = State.FAILED
+        self.end_date = timezone.utcnow()

Review comment:
       Isn't this done in: 
https://github.com/apache/airflow/blob/0abbd2d918ad9027948fd8a33ebb42487e4aa000/airflow/jobs/base_job.py#L152




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to