potiuk commented on code in PR #30308:
URL: https://github.com/apache/airflow/pull/30308#discussion_r1163276448


##########
airflow/jobs/job.py:
##########
@@ -292,3 +264,79 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> Job | None
         )
         .first()
     )
+
+
+@provide_session
+def run_job(job: Job | JobPydantic, session: Session = NEW_SESSION) -> int | 
None:
+    """
+    Runs the job. The Job is always an ORM object and setting the state is 
happening within the
+    same DB session and the session is kept open throughout the whole execution
+
+    :meta private:
+
+    TODO: Maybe we should not keep the session during job execution ?.
+    """
+    # The below assert is a temporary one, to make MyPy happy with partial 
AIP-44 work - we will remove it
+    # once final AIP-44 changes are completed.
+    assert isinstance(job, Job), "Job should be ORM object not Pydantic one 
here (AIP-44 WIP)"
+    job.prepare_for_execution(session=session)
+    try:
+        return execute_job(job)
+    finally:
+        job.complete_execution(session=session)
+
+
+def execute_job(job: Job | JobPydantic) -> int | None:
+    """
+    Executes the job.
+
+    Job execution requires no session as generally executing session does not 
require an
+    active database connection. The session might be temporary acquired and 
used if the job
+    runs heartbeat during execution, but this connection is only acquired for 
the time of heartbeat
+    and in case of AIP-44 implementation it happens over the Internal API 
rather than directly via
+    the database.
+
+    After the job is completed, state of the Job is updated and it should be 
updated in the database,
+    which happens in the "complete_execution" step (which again can be 
executed locally in case of
+    database operations or over the Internal API call.
+
+    :param job: Job to execute - it can be either DB job or it's Pydantic 
serialized version. It does
+       not really matter, because except of running the heartbeat and state 
setting,
+       the runner should not modify the job state.
+
+    :meta private:
+
+    """
+    ret = None
+    try:
+        # This job_runner reference and type-ignore will be removed by further 
refactoring step
+        ret = job.job_runner._execute()  # type:ignore[union-attr]
+        # In case of max runs or max duration
+        job.state = State.SUCCESS
+    except SystemExit:
+        # In case of ^C or SIGTERM
+        job.state = State.SUCCESS
+    except Exception:
+        job.state = State.FAILED
+        raise
+    return ret
+
+
+def perform_heartbeat(job: Job | JobPydantic, only_if_necessary: bool) -> None:
+    """
+    Performs heartbeat for the Job passed to it,optionally checking if it is 
necessary.
+
+    :param job: job to perform heartbeat for
+    :param only_if_necessary: only heartbeat if it is necessary (i.e. if there 
are things to run for
+        triggerer for example)
+    """
+    # The below assert is a temporary one, to make MyPy happy with partial 
AIP-44 work - we will remove it
+    # once final AIP-44 changes are completed.
+    assert isinstance(job, Job), "Job should be ORM object not Pydantic one 
here (AIP-44 WIP)"
+    seconds_remaining: float = 0.0
+    if job.latest_heartbeat and job.heartrate:
+        seconds_remaining = job.heartrate - (timezone.utcnow() - 
job.latest_heartbeat).total_seconds()
+    if seconds_remaining > 0 and only_if_necessary:
+        return
+    with create_session() as session:
+        job.heartbeat(session=session)

Review Comment:
   This is an excellent question! And I have some likely good answers after 
becoming intimately familiar with the Job code now :). This had been somewhat a 
surprise for me to learn how things work currently, so happy to share my 
findings when asked (I wanted to do it later, but since you asked, there you go 
:):
   
   
   This is how it was done originally (and I also added a note about it in the 
diagrams that it leads to multiple sessions being open at the same time).  The 
problem is (and this is even in the current code) that at the place where 
heartbeat is executed, there is no session available from the upper-stack. I 
wanted to avoid any "logic" change in the series of refactors.   
   
   See the code here (this is before any of my refactorings): 
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L233
   
   That was also quite a bit of surprise to me that this is happening and 
actually this is one of the cool things with this refactoring that those things 
 become plain obvious as we decouple the database code from actual logic. 
   
   So as result of this refactor I left two TODOs in the code (the heartbeat 
one might have been lost with the changes/rebases I've done):
   
   1)  Maybe we should NOT keep the session opened during the whole 
"execute_job" ? We currently keep a session opened, and possibly Connection,. 
Variable, Xcom retrieval can make use of it (but likely it does not as the 
session is not passed down during execute: 
   
   
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L254
   
   I am not 100% sure if the session can be passed down from the current thread 
(thread local?) in our use of it. but I guess it must be passed down 
explicitly. In which case it seems we keep the session opened while the task is 
running while we are NOT USING it. This is quite a waste of the session - 
TCP/IP connection opened, for postgres there is a process running on the other 
end (pgbouncer somewhat mitigates it), but it seems that we keep the session 
opened while even an hour-long task is running while we cannot use that 
session! 
   
   But I deliberately wanted to keep the original behaviour, to make sure that 
my refactors are just that - refactors. I am painfully aware that joining both 
refactors and functionality changes is a very bad idea. However, once we 
cut-off the 2.6 branch I am planning to explore that and possibly optimise the 
use of sessions there. The decoupling I've done will be rather helpful in 
making it "cleanly" I think. And maybe I found that we can vastly optimize the 
session/DB usage here and maybe we won't need pgbouncer any more if we complete 
it ? I certainly want to explore the consequences of changing the session 
allocation here. I think things might looka a bit differently in SchedulerJob, 
so any changes there should be done carefully.
   
   2) Then, heartbeat session is kinda connected. Heartbeat actually might 
simply not be able to get the session differently because execute() method does 
not have session to pass. So we are creating new session for heartbeat with 
`with create_session()`:  
https://github.com/apache/airflow/blob/1a854465b5b67e29082bb4c3e3e198cfbd67c845/airflow/jobs/base_job.py#L233
 and I `guess` the idea for doing it without using @provide_session were that 
first part of the heartbeat check and "only_if_necessary" can be done without 
opening a session, so using "with create_session" is there to optimize the 
usage of session. In my AIP-44 variant of heartbeat (now deleted) I split 
heartbeat method into two, to achieve the same with @provide_session , and 
possibly we can do it also in the future in similar way but (YAGNI) -  maybe 
not so I deleted it without looking back. 
   
   I will need to confirm the findings with others (@ashb? @uranusjr - maybe I 
missed some session sharing mechanisms here) but I tried to carefully replicate 
with the refactor what was originally in the code. 
   
   I hope the answer is not too long :D 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to