jedcunningham commented on code in PR #30308:
URL: https://github.com/apache/airflow/pull/30308#discussion_r1161842643


##########
airflow/jobs/job.py:
##########
@@ -222,59 +222,47 @@ def heartbeat(self, only_if_necessary: bool = False) -> 
None:
             # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
             self.latest_heartbeat = previous_heartbeat
 
-    def run(self) -> int | None:
-        """Starts the job."""
+    @provide_session
+    def prepare_for_execution(self, session=NEW_SESSION):

Review Comment:
   ```suggestion
       def prepare_for_execution(self, session: Session = NEW_SESSION):
   ```



##########
airflow/jobs/job.py:
##########
@@ -222,59 +222,47 @@ def heartbeat(self, only_if_necessary: bool = False) -> 
None:
             # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
             self.latest_heartbeat = previous_heartbeat
 
-    def run(self) -> int | None:
-        """Starts the job."""
+    @provide_session
+    def prepare_for_execution(self, session=NEW_SESSION):
+        """Prepares the job for execution."""
         Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
-        # Adding an entry in the DB
-        ret = None
-        with create_session() as session:
-            self.state = State.RUNNING
-            session.add(self)
-            session.commit()
-            make_transient(self)
-            try:
-                ret = self.job_runner._execute()
-                # In case of max runs or max duration
-                self.state = State.SUCCESS
-            except SystemExit:
-                # In case of ^C or SIGTERM
-                self.state = State.SUCCESS
-            except Exception:
-                self.state = State.FAILED
-                raise
-            finally:
-                get_listener_manager().hook.before_stopping(component=self)
-                self.end_date = timezone.utcnow()
-                session.merge(self)
-                session.commit()
+        self.state = State.RUNNING
+        self.start_date = timezone.utcnow()
+        self.state = State.RUNNING
+        session.add(self)
+        session.commit()
+        make_transient(self)
 
+    @provide_session
+    def complete_execution(self, session=NEW_SESSION):

Review Comment:
   ```suggestion
       def complete_execution(self, session: Session = NEW_SESSION):
   ```



##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> BaseJob |
     :param session: Database session
     """
     return (
-        session.query(BaseJob)
-        .filter(BaseJob.job_type == job_type)
+        session.query(Job)
+        .filter(Job.job_type == job_type)
         .order_by(
             # Put "running" jobs at the front.
-            case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
-            BaseJob.latest_heartbeat.desc(),
+            case({State.RUNNING: 0}, value=Job.state, else_=1),
+            Job.latest_heartbeat.desc(),
         )
         .first()
     )
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    """
+    Prepare execution of a job.
+
+    :param job: Job to prepare execution for
+    :param session: Database session
+    """
+    if isinstance(job, Job):
+        job.prepare_for_execution()
+    else:
+        Job.get(job.id, session=session).prepare_for_execution()
+
+
+def execute_job(job: Job) -> int | None:
+    """Executes the job - does not matter if there is a session or not"""
+    ret = None
+    try:
+        ret = job.job_runner._execute()
+        # In case of max runs or max duration
+        job.state = State.SUCCESS
+    except SystemExit:
+        # In case of ^C or SIGTERM
+        job.state = State.SUCCESS
+    except Exception:
+        job.state = State.FAILED
+        raise
+    return ret
+
+
+@provide_session
+def complete_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    if isinstance(job, Job):
+        job.complete_execution()
+    else:
+        Job.get(job.id, session=session).complete_execution()
+
+
+def run_job(job: Job) -> int | None:
+    """Runs the job."""
+    prepare_for_execution(job)

Review Comment:
   It might be worth just doing these 3 steps in line here. Then you only need 
to check job vs jobpydantic once, and these prepare/completes are just wrappers 
to hit jobs methods anyways.



##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> BaseJob |
     :param session: Database session
     """
     return (
-        session.query(BaseJob)
-        .filter(BaseJob.job_type == job_type)
+        session.query(Job)
+        .filter(Job.job_type == job_type)
         .order_by(
             # Put "running" jobs at the front.
-            case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
-            BaseJob.latest_heartbeat.desc(),
+            case({State.RUNNING: 0}, value=Job.state, else_=1),
+            Job.latest_heartbeat.desc(),
         )
         .first()
     )
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    """
+    Prepare execution of a job.
+
+    :param job: Job to prepare execution for
+    :param session: Database session
+    """
+    if isinstance(job, Job):
+        job.prepare_for_execution()

Review Comment:
   Should we pass the session too?



##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> BaseJob |
     :param session: Database session
     """
     return (
-        session.query(BaseJob)
-        .filter(BaseJob.job_type == job_type)
+        session.query(Job)
+        .filter(Job.job_type == job_type)
         .order_by(
             # Put "running" jobs at the front.
-            case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
-            BaseJob.latest_heartbeat.desc(),
+            case({State.RUNNING: 0}, value=Job.state, else_=1),
+            Job.latest_heartbeat.desc(),
         )
         .first()
     )
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):

Review Comment:
   ```suggestion
   def prepare_for_execution(job: Job | JobPydantic, session: Session = 
NEW_SESSION):
   ```



##########
airflow/jobs/job.py:
##########
@@ -285,12 +273,59 @@ def most_recent_job(job_type: str, session: Session = 
NEW_SESSION) -> BaseJob |
     :param session: Database session
     """
     return (
-        session.query(BaseJob)
-        .filter(BaseJob.job_type == job_type)
+        session.query(Job)
+        .filter(Job.job_type == job_type)
         .order_by(
             # Put "running" jobs at the front.
-            case({State.RUNNING: 0}, value=BaseJob.state, else_=1),
-            BaseJob.latest_heartbeat.desc(),
+            case({State.RUNNING: 0}, value=Job.state, else_=1),
+            Job.latest_heartbeat.desc(),
         )
         .first()
     )
+
+
+@provide_session
+def prepare_for_execution(job: Job | JobPydantic, session=NEW_SESSION):
+    """
+    Prepare execution of a job.
+
+    :param job: Job to prepare execution for
+    :param session: Database session
+    """
+    if isinstance(job, Job):
+        job.prepare_for_execution()
+    else:
+        Job.get(job.id, session=session).prepare_for_execution()
+
+
+def execute_job(job: Job) -> int | None:
+    """Executes the job - does not matter if there is a session or not"""
+    ret = None
+    try:
+        ret = job.job_runner._execute()
+        # In case of max runs or max duration
+        job.state = State.SUCCESS
+    except SystemExit:
+        # In case of ^C or SIGTERM
+        job.state = State.SUCCESS
+    except Exception:
+        job.state = State.FAILED
+        raise
+    return ret
+
+
+@provide_session
+def complete_execution(job: Job | JobPydantic, session=NEW_SESSION):

Review Comment:
   ```suggestion
   def complete_execution(job: Job | JobPydantic, session: Session = 
NEW_SESSION):
   ```



##########
airflow/jobs/job.py:
##########
@@ -222,59 +222,47 @@ def heartbeat(self, only_if_necessary: bool = False) -> 
None:
             # We didn't manage to heartbeat, so make sure that the timestamp 
isn't updated
             self.latest_heartbeat = previous_heartbeat
 
-    def run(self) -> int | None:
-        """Starts the job."""
+    @provide_session
+    def prepare_for_execution(self, session=NEW_SESSION):
+        """Prepares the job for execution."""
         Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
-        # Adding an entry in the DB
-        ret = None
-        with create_session() as session:
-            self.state = State.RUNNING
-            session.add(self)
-            session.commit()
-            make_transient(self)
-            try:
-                ret = self.job_runner._execute()
-                # In case of max runs or max duration
-                self.state = State.SUCCESS
-            except SystemExit:
-                # In case of ^C or SIGTERM
-                self.state = State.SUCCESS
-            except Exception:
-                self.state = State.FAILED
-                raise
-            finally:
-                get_listener_manager().hook.before_stopping(component=self)
-                self.end_date = timezone.utcnow()
-                session.merge(self)
-                session.commit()
+        self.state = State.RUNNING
+        self.start_date = timezone.utcnow()
+        self.state = State.RUNNING

Review Comment:
   ```suggestion
   ```
   
   Don't need this twice, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to