uranusjr commented on code in PR #30255: URL: https://github.com/apache/airflow/pull/30255#discussion_r1147098734
########## airflow/jobs/job_runner.py: ########## @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sqlalchemy import case + +from airflow.utils.session import provide_session +from airflow.utils.state import State + +if TYPE_CHECKING: + from airflow.jobs.base_job import BaseJob + from airflow.jobs.pydantic.base_job import BaseJobPydantic + + +class BaseJobRunner: + """Abstract class for job runners to derive from.""" + + job: BaseJob | BaseJobPydantic + + def execute(self) -> int | None: + """ + Executes the logic connected to the runner. This method should be + overridden by subclasses. + + :return: return code if available, otherwise None + """ + raise NotImplementedError() + + @staticmethod + def get_job_type() -> str: + """ + Returns the job type. This method should be overridden by subclasses. + and return the job type that then is stored in the BaseJob-mapped ORM table as job_type. + It is used to query for all entities of a specific job type. + :return: job type string + """ + raise NotImplementedError() + + @classmethod + @provide_session + def most_recent_job(cls, session=None) -> BaseJob | None: + """ + Return the most recent job of this type, if any, based on last heartbeat received. + + Jobs in "running" state take precedence over others to make sure alive + job is returned if it is available. + This method should be called on a subclass (i.e. on SchedulerJob) to + return jobs of that type. + + :param session: Database session + """ + from airflow.jobs.base_job import BaseJob + + return ( + session.query(BaseJob) + .filter(BaseJob.job_type == cls.get_job_type()) + .order_by( + # Put "running" jobs at the front. + case({State.RUNNING: 0}, value=BaseJob.state, else_=1), + BaseJob.latest_heartbeat.desc(), + ) + .limit(1) Review Comment: Nit, this `limit` is not needed (implied by `first`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
