potiuk commented on code in PR #30308:
URL: https://github.com/apache/airflow/pull/30308#discussion_r1162140887
##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session =
NEW_SESSION) -> BaseJob |
:param session: Database session
"""
return (
- session.query(BaseJob)
- .filter(BaseJob.job_type == job_type)
+ session.query(Job)
+ .filter(Job.job_type == job_type)
.order_by(
# Put "running" jobs at the front.
- case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
- BaseJob.latest_heartbeat.desc(),
+ case({State.RUNNING: 0}, value=Job.state, else_=1),
+ Job.latest_heartbeat.desc(),
)
.first()
)
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):
+ """
+ Prepare execution of a job.
+
+ :param job: Job to prepare execution for
+ :param session: Database session
+ """
+ if isinstance(job, Job):
+ job.prepare_for_execution()
+ else:
+ Job.get(job.id, session=session).prepare_for_execution()
+
+
+def execute_job(job: Job) -> int | None:
+ """Executes the job - does not matter if there is a session or not"""
+ ret = None
+ try:
+ ret = job.job_runner._execute()
+ # In case of max runs or max duration
+ job.state = State.SUCCESS
+ except SystemExit:
+ # In case of ^C or SIGTERM
+ job.state = State.SUCCESS
+ except Exception:
+ job.state = State.FAILED
+ raise
+ return ret
+
+
+@provide_session
+def complete_execution(job: Job | JobPydantic, session=NEW_SESSION):
+ if isinstance(job, Job):
+ job.complete_execution()
+ else:
+ Job.get(job.id, session=session).complete_execution()
+
+
+def run_job(job: Job) -> int | None:
+ """Runs the job."""
+ prepare_for_execution(job)
Review Comment:
Not sure if I answered the question - maybe I did not understand the
suggestion @jedcunningham - but the whole idea is to enable to call
"prepare_for_execution" and "complete_execution" via internal API (AIP-44)
where "execute_job" locally (without thet DB at all). also in-between, the
"execute_job" will run heartbeats over AIP-44 internal API to update the Job
state.
For that we need a few things (And this split is merely enables them to tbe
used in the future. Imagine LocalTaskJobRunner but it will be the same for
TriggererJobRunner and DagFileProcessorJobRunner
The (a) Normal/ (b) AIP-44 flow will look as follows (I am planning to
document it visually BTW by sequence diagrams):
1) create a job first (a) this will be just Job() (b) this will be AIP-44
remote call.
2) prepare_for_execution: (a) this will be run on Job object, (b) this needs
to be done over the Internal API, storing state date, assign TaskInstance etc.
in the Job,
I might actually join 1) and 2) in a single method that can be decorated
with @internal_api and return Job (which internal Api will return as JobPydantic
3) execute_job - here we need *Pydantic objects (retrieved from 1)2)
(read-only), we wil call periodic heartbeats via AIP-44 internal-api and "do"
the job in general
4) complete_execution - here we update session state: a) directly by
merge/commit + fire listener b) remote calling such "complete_exefution" method
- so when we pass JobPydantic, the server should create and use Job ORM object
corresponding to it
That's about it - not all is complete yet to achieve it all, and we need to
implement AIP-44 compliant behaviours, but the refactor and "Separation of
concerns" should make it way easiers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]