potiuk commented on PR #34026:
URL: https://github.com/apache/airflow/pull/34026#issuecomment-1731483154

   @mhenc -> one thing to remember that we have to do as a follow-up for AIP-44 
as well - we need to implement the heartbeat callback.
   
   In the current implementaiton of AIP-44 the callback is still called on the 
client side (i.e. in the local task job) and it performs some DB methods / 
refreshing task instance and dag run from the DB so it is . But it also uses 
some "local" process manageement (terminating running task if the state 
changed, but also handle the "impersonation" case where we have additional 
parent process created in the process of switching to the impersonated user - 
so this method should be likely split into "retrieving state from DB" (with 
internal API call) and reacting to state change. 
   
   Or maybe we could refactor the heartbeat_callback approach and make a 
"dedicated" local task job heartbeat that will do it in a single Internal API 
call to both update heartbeat status AND retrieve the state and return it to be 
able to kill processes as reaction to external DB state change.
   
   cc: @bjankie1 - I think it would be great if you two think about it and 
propose some approach that would be good regarding potential optimisation of 
heartbeat in the future).
   
   Currently the callback is as follows.
   
   ```
       @provide_session
       def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
           """Self destruct task if state has been moved away from running 
externally."""
           if self.terminating:
               # ensure termination if processes are created later
               self.task_runner.terminate()
               return
   
           self.task_instance.refresh_from_db()
           ti = self.task_instance
   
           if ti.state == TaskInstanceState.RUNNING:
               fqdn = get_hostname()
               same_hostname = fqdn == ti.hostname
               if not same_hostname:
                   self.log.error(
                       "The recorded hostname %s does not match this instance's 
hostname %s",
                       ti.hostname,
                       fqdn,
                   )
                   raise AirflowException("Hostname of job runner does not 
match")
               current_pid = self.task_runner.get_process_pid()
               recorded_pid = ti.pid
               same_process = recorded_pid == current_pid
   
               if recorded_pid is not None and (ti.run_as_user or 
self.task_runner.run_as_user):
                   # when running as another user, compare the task runner pid 
to the parent of
                   # the recorded pid because user delegation becomes an extra 
process level.
                   # However, if recorded_pid is None, pass that through as it 
signals the task
                   # runner process has already completed and been cleared out. 
`psutil.Process`
                   # uses the current process if the parameter is None, which 
is not what is intended
                   # for comparison.
                   recorded_pid = psutil.Process(ti.pid).ppid()
                   same_process = recorded_pid == current_pid
   
               if recorded_pid is not None and not same_process and not 
IS_WINDOWS:
                   self.log.warning(
                       "Recorded pid %s does not match the current pid %s", 
recorded_pid, current_pid
                   )
                   raise AirflowException("PID of job runner does not match")
           elif self.task_runner.return_code() is None and 
hasattr(self.task_runner, "process"):
               if ti.state == TaskInstanceState.SKIPPED:
                   # A DagRun timeout will cause tasks to be externally marked 
as skipped.
                   dagrun = ti.get_dagrun(session=session)
                   execution_time = (dagrun.end_date or timezone.utcnow()) - 
dagrun.start_date
                   if ti.task.dag is not None:
                       dagrun_timeout = ti.task.dag.dagrun_timeout
                   else:
                       dagrun_timeout = None
                   if dagrun_timeout and execution_time > dagrun_timeout:
                       self.log.warning("DagRun timed out after %s.", 
execution_time)
   
               # potential race condition, the _run_raw_task commits `success` 
or other state
               # but task_runner does not exit right away due to slow process 
shutdown or any other reasons
               # let's do a throttle here, if the above case is true, the 
handle_task_exit will handle it
               if self._state_change_checks >= 1:  # defer to next round of 
heartbeat
                   self.log.warning(
                       "State of this instance has been externally set to %s. 
Terminating instance.", ti.state
                   )
                   self.terminating = True
               self._state_change_checks += 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to