potiuk commented on code in PR #30308:
URL: https://github.com/apache/airflow/pull/30308#discussion_r1163276448
##########
airflow/jobs/job.py:
##########
@@ -292,3 +264,79 @@ def most_recent_job(job_type: str, session: Session =
NEW_SESSION) -> Job | None
)
.first()
)
+
+
+@provide_session
+def run_job(job: Job | JobPydantic, session: Session = NEW_SESSION) -> int |
None:
+ """
+ Runs the job. The Job is always an ORM object and setting the state is
happening within the
+ same DB session and the session is kept open throughout the whole execution
+
+ :meta private:
+
+ TODO: Maybe we should not keep the session during job execution ?.
+ """
+ # The below assert is a temporary one, to make MyPy happy with partial
AIP-44 work - we will remove it
+ # once final AIP-44 changes are completed.
+ assert isinstance(job, Job), "Job should be ORM object not Pydantic one
here (AIP-44 WIP)"
+ job.prepare_for_execution(session=session)
+ try:
+ return execute_job(job)
+ finally:
+ job.complete_execution(session=session)
+
+
+def execute_job(job: Job | JobPydantic) -> int | None:
+ """
+ Executes the job.
+
+ Job execution requires no session as generally executing session does not
require an
+ active database connection. The session might be temporary acquired and
used if the job
+ runs heartbeat during execution, but this connection is only acquired for
the time of heartbeat
+ and in case of AIP-44 implementation it happens over the Internal API
rather than directly via
+ the database.
+
+ After the job is completed, state of the Job is updated and it should be
updated in the database,
+ which happens in the "complete_execution" step (which again can be
executed locally in case of
+ database operations or over the Internal API call.
+
+ :param job: Job to execute - it can be either DB job or it's Pydantic
serialized version. It does
+ not really matter, because except of running the heartbeat and state
setting,
+ the runner should not modify the job state.
+
+ :meta private:
+
+ """
+ ret = None
+ try:
+ # This job_runner reference and type-ignore will be removed by further
refactoring step
+ ret = job.job_runner._execute() # type:ignore[union-attr]
+ # In case of max runs or max duration
+ job.state = State.SUCCESS
+ except SystemExit:
+ # In case of ^C or SIGTERM
+ job.state = State.SUCCESS
+ except Exception:
+ job.state = State.FAILED
+ raise
+ return ret
+
+
+def perform_heartbeat(job: Job | JobPydantic, only_if_necessary: bool) -> None:
+ """
+ Performs heartbeat for the Job passed to it,optionally checking if it is
necessary.
+
+ :param job: job to perform heartbeat for
+ :param only_if_necessary: only heartbeat if it is necessary (i.e. if there
are things to run for
+ triggerer for example)
+ """
+ # The below assert is a temporary one, to make MyPy happy with partial
AIP-44 work - we will remove it
+ # once final AIP-44 changes are completed.
+ assert isinstance(job, Job), "Job should be ORM object not Pydantic one
here (AIP-44 WIP)"
+ seconds_remaining: float = 0.0
+ if job.latest_heartbeat and job.heartrate:
+ seconds_remaining = job.heartrate - (timezone.utcnow() -
job.latest_heartbeat).total_seconds()
+ if seconds_remaining > 0 and only_if_necessary:
+ return
+ with create_session() as session:
+ job.heartbeat(session=session)
Review Comment:
This is an excellent question! And I have some likely good answers after
becoming intimately familiar with the Job code now :). This had been somewhat a
surprise for me to learn how things work currently, so happy to share my
findings when asked (I wanted to do it later, but since you asked, there you go
:):
This is how it was done originally (and I also added a note about it in the
diagrams that it leads to multiple sessions being open at the same time). The
problem is (and this is even in the current code) that at the place where
heartbeat is executed, there is no session available from the upper-stack. I
wanted to avoid any "logic" change in the series of refactors.
See the code here (this is before any of my refactorings):
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L233
That was also quite a bit of surprise to me that this is happening and
actually this is one of the cool things with this refactoring that those things
become plain obvious as we decouple the database code from actual logic.
So as result of this refactor I left two TODOs in the code (the heartbeat
one might have been lost with the changes/rebases I've done):
1) Maybe we should NOT keep the session opened during the whole
"execute_job" ? We currently keep a session opened, and possibly Connection,.
Variable, Xcom retrieval can make use of it (but likely it does not as the
session is not passed down during execute:
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L254
I am not 100% sure if the session can be passed down from the current thread
(thread local?) in our use of it. but I guess it must be passed down
explicitly. In which case it seems we keep the session opened while the task is
running while we are NOT USING it. This is quite a waste of the session -
TCP/IP connection opened, for postgres there is a process running on the other
end (pgbouncer somewhat mitigates it), but it seems that we keep the session
opened while even an hour-long task is running while we cannot use that
session!
But I deliberately wanted to keep the original behaviour, to make sure that
my refactors are just that - refactors. I am painfully aware that joining both
refactors and functionality changes is a very bad idea. However, once we
cut-off the 2.6 branch I am planning to explore that and possibly optimise the
use of sessions there. The decoupling I've done will be rather helpful in
making it "cleanly" I think. And maybe I found that we can vastly optimize the
session/DB usage here and maybe we won't need pgbouncer any more if we complete
it ? I certainly want to explore the consequences of changing the session
allocation here. I think things might looka a bit differently in SchedulerJob,
so any changes there should be done carefully.
2) Then, heartbeat session is kinda connected. Heartbeat actually might
simply not be able to get the session differently because execute() method does
not have session to pass. So we are creating new session for heartbeat with
`with create_session()`:
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L233
and I `guess` the idea for doing it without using @provide_session were that
first part of the heartbeat check and "only_if_necessary" can be done without
opening a session, so using "with create_session" is there to optimize the
usage/creation of session to only create it when needed. In my AIP-44 variant
of heartbeat (now deleted) I split heartbeat method into two, to achieve the
same with @provide_session , and possibly we can do it also in the future in
similar way but (YAGNI) - maybe not so I deleted it without looking back.
I will need to confirm the findings with others (@ashb? @uranusjr - maybe I
missed some session sharing mechanisms here) but I tried to carefully replicate
with the refactor what was originally in the code.
I hope the answer is not too long :D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]