potiuk commented on PR #34026:
URL: https://github.com/apache/airflow/pull/34026#issuecomment-1731483154
@mhenc -> one thing to remember that we have to do as a follow-up for AIP-44
as well - we need to implement the heartbeat callback.
In the current implementaiton of AIP-44 the callback is still called on the
client side (i.e. in the local task job) and it performs some DB methods /
refreshing task instance and dag run from the DB so it is . But it also uses
some "local" process manageement (terminating running task if the state
changed, but also handle the "impersonation" case where we have additional
parent process created in the process of switching to the impersonated user -
so this method should be likely split into "retrieving state from DB" (with
internal API call) and reacting to state change.
Or maybe we could refactor the heartbeat_callback approach and make a
"dedicated" local task job heartbeat that will do it in a single Internal API
call to both update heartbeat status AND retrieve the state and return it to be
able to kill processes as reaction to external DB state change.
cc: @bjankie1 - I think it would be great if you two think about it and
propose some approach that would be good regarding potential optimisation of
heartbeat in the future).
Currently the callback is as follows.
```
@provide_session
def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
"""Self destruct task if state has been moved away from running
externally."""
if self.terminating:
# ensure termination if processes are created later
self.task_runner.terminate()
return
self.task_instance.refresh_from_db()
ti = self.task_instance
if ti.state == TaskInstanceState.RUNNING:
fqdn = get_hostname()
same_hostname = fqdn == ti.hostname
if not same_hostname:
self.log.error(
"The recorded hostname %s does not match this instance's
hostname %s",
ti.hostname,
fqdn,
)
raise AirflowException("Hostname of job runner does not
match")
current_pid = self.task_runner.get_process_pid()
recorded_pid = ti.pid
same_process = recorded_pid == current_pid
if recorded_pid is not None and (ti.run_as_user or
self.task_runner.run_as_user):
# when running as another user, compare the task runner pid
to the parent of
# the recorded pid because user delegation becomes an extra
process level.
# However, if recorded_pid is None, pass that through as it
signals the task
# runner process has already completed and been cleared out.
`psutil.Process`
# uses the current process if the parameter is None, which
is not what is intended
# for comparison.
recorded_pid = psutil.Process(ti.pid).ppid()
same_process = recorded_pid == current_pid
if recorded_pid is not None and not same_process and not
IS_WINDOWS:
self.log.warning(
"Recorded pid %s does not match the current pid %s",
recorded_pid, current_pid
)
raise AirflowException("PID of job runner does not match")
elif self.task_runner.return_code() is None and
hasattr(self.task_runner, "process"):
if ti.state == TaskInstanceState.SKIPPED:
# A DagRun timeout will cause tasks to be externally marked
as skipped.
dagrun = ti.get_dagrun(session=session)
execution_time = (dagrun.end_date or timezone.utcnow()) -
dagrun.start_date
if ti.task.dag is not None:
dagrun_timeout = ti.task.dag.dagrun_timeout
else:
dagrun_timeout = None
if dagrun_timeout and execution_time > dagrun_timeout:
self.log.warning("DagRun timed out after %s.",
execution_time)
# potential race condition, the _run_raw_task commits `success`
or other state
# but task_runner does not exit right away due to slow process
shutdown or any other reasons
# let's do a throttle here, if the above case is true, the
handle_task_exit will handle it
if self._state_change_checks >= 1: # defer to next round of
heartbeat
self.log.warning(
"State of this instance has been externally set to %s.
Terminating instance.", ti.state
)
self.terminating = True
self._state_change_checks += 1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]