potiuk commented on code in PR #30308:
URL: https://github.com/apache/airflow/pull/30308#discussion_r1162140887


##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> BaseJob |
     :param session: Database session
     """
     return (
-        session.query(BaseJob)
-        .filter(BaseJob.job_type == job_type)
+        session.query(Job)
+        .filter(Job.job_type == job_type)
         .order_by(
             # Put "running" jobs at the front.
-            case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
-            BaseJob.latest_heartbeat.desc(),
+            case({State.RUNNING: 0}, value=Job.state, else_=1),
+            Job.latest_heartbeat.desc(),
         )
         .first()
     )
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    """
+    Prepare execution of a job.
+
+    :param job: Job to prepare execution for
+    :param session: Database session
+    """
+    if isinstance(job, Job):
+        job.prepare_for_execution()
+    else:
+        Job.get(job.id, session=session).prepare_for_execution()
+
+
+def execute_job(job: Job) -> int | None:
+    """Executes the job - does not matter if there is a session or not"""
+    ret = None
+    try:
+        ret = job.job_runner._execute()
+        # In case of max runs or max duration
+        job.state = State.SUCCESS
+    except SystemExit:
+        # In case of ^C or SIGTERM
+        job.state = State.SUCCESS
+    except Exception:
+        job.state = State.FAILED
+        raise
+    return ret
+
+
+@provide_session
+def complete_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    if isinstance(job, Job):
+        job.complete_execution()
+    else:
+        Job.get(job.id, session=session).complete_execution()
+
+
+def run_job(job: Job) -> int | None:
+    """Runs the job."""
+    prepare_for_execution(job)

Review Comment:
   Not sure if I answered the question - maybe I did not understand the 
suggestion @jedcunningham - but the whole idea is to enable to call 
"prepare_for_execution" and "complete_execution" via internal API (AIP-44) 
where "execute_job" locally (without thet DB at all). also in-between, the 
"execute_job" will run heartbeats over AIP-44 internal API to update the Job 
state. 
   
   For that we need a few things (And this split is merely enables them to tbe 
used in the future. Imagine LocalTaskJobRunner but it will be the same for 
TriggererJobRunner and DagFileProcessorJobRunner
   
   The (a) Normal/ (b) AIP-44 flow will look as follows (I am planning to 
document it visually BTW by sequence diagrams):
   
   1) create a job first (a) this will be just Job() (b) this will be AIP-44 
remote call. 
   
   2) prepare_for_execution: (a) this will be run on Job object, (b) this needs 
to be done over the Internal API, storing state date, assign TaskInstance etc. 
in the Job, 
   
   I might actually join 1) and 2) in a single method that can be decorated 
with @internal_api and return Job (which internal Api will return as JobPydantic
   
   3)  execute_job - here we need *Pydantic objects (retrieved from 1)2) 
(read-only), we wil call periodic heartbeats via AIP-44 internal-api and "do" 
the job in general
   
   4) complete_execution - here we update session state: a) directly by 
merge/commit + fire listener b) remote calling such "complete_exefution" method 
- so when we pass JobPydantic, the server should create and use Job ORM object 
corresponding to it
   
   That's about it - not all is complete yet to achieve it all, and we need to 
implement AIP-44 compliant behaviours, but the refactor and "Separation of 
concerns" should make it way easiers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to