jedcunningham commented on code in PR #30308: URL: https://github.com/apache/airflow/pull/30308#discussion_r1163240521
########## airflow/jobs/JOB_LIFECYCLE.md: ########## @@ -0,0 +1,158 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +Those sequence diagrams explain what is the lifecycle of the Job with relation to the database Review Comment: ```suggestion These sequence diagrams explain the lifecycle of a Job with relation to the database ``` ########## airflow/jobs/JOB_LIFECYCLE.md: ########## @@ -0,0 +1,158 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +Those sequence diagrams explain what is the lifecycle of the Job with relation to the database +operation in the context of the internal API of Airflow. + +As part of AIP-44 implementation we separated the ORM Job instance from the code the job runs, +introducing a concept of Job Runners. The Job Runner is a class that is responsible for running +the code and it might execute either in-process when direct database is used, or remotely when +the job is run remotely and communicates via internal API (this part is a work-in-progress and we +will keep on updating the state. + +Those chart apply to all kinds of CLI components Airflow runs (Scheduler, DagFileProcessor, Triggerer, Review Comment: ```suggestion This apply to all of the CLI components Airflow runs (Scheduler, DagFileProcessor, Triggerer, ``` ########## airflow/jobs/JOB_LIFECYCLE.md: ########## @@ -0,0 +1,158 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +Those sequence diagrams explain what is the lifecycle of the Job with relation to the database +operation in the context of the internal API of Airflow. + +As part of AIP-44 implementation we separated the ORM Job instance from the code the job runs, +introducing a concept of Job Runners. The Job Runner is a class that is responsible for running +the code and it might execute either in-process when direct database is used, or remotely when +the job is run remotely and communicates via internal API (this part is a work-in-progress and we +will keep on updating the state. + +Those chart apply to all kinds of CLI components Airflow runs (Scheduler, DagFileProcessor, Triggerer, +Worker) that can run the job. The AIP-44 implementation is not yet complete, but when complete it will Review Comment: ```suggestion Worker) that run a job. The AIP-44 implementation is not yet complete, but when complete it will ``` ########## airflow/jobs/job.py: ########## @@ -292,3 +264,79 @@ def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | None ) .first() ) + + +@provide_session +def run_job(job: Job | JobPydantic, session: Session = NEW_SESSION) -> int | None: + """ + Runs the job. The Job is always an ORM object and setting the state is happening within the + same DB session and the session is kept open throughout the whole execution + + :meta private: + + TODO: Maybe we should not keep the session during job execution ?. + """ + # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it + # once final AIP-44 changes are completed. + assert isinstance(job, Job), "Job should be ORM object not Pydantic one here (AIP-44 WIP)" + job.prepare_for_execution(session=session) + try: + return execute_job(job) + finally: + job.complete_execution(session=session) + + +def execute_job(job: Job | JobPydantic) -> int | None: + """ + Executes the job. + + Job execution requires no session as generally executing session does not require an + active database connection. The session might be temporary acquired and used if the job + runs heartbeat during execution, but this connection is only acquired for the time of heartbeat + and in case of AIP-44 implementation it happens over the Internal API rather than directly via + the database. + + After the job is completed, state of the Job is updated and it should be updated in the database, + which happens in the "complete_execution" step (which again can be executed locally in case of + database operations or over the Internal API call. + + :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does + not really matter, because except of running the heartbeat and state setting, + the runner should not modify the job state. + + :meta private: + Review Comment: ```suggestion ``` nit ########## airflow/jobs/JOB_LIFECYCLE.md: ########## @@ -0,0 +1,158 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> + +Those sequence diagrams explain what is the lifecycle of the Job with relation to the database +operation in the context of the internal API of Airflow. + +As part of AIP-44 implementation we separated the ORM Job instance from the code the job runs, +introducing a concept of Job Runners. The Job Runner is a class that is responsible for running +the code and it might execute either in-process when direct database is used, or remotely when +the job is run remotely and communicates via internal API (this part is a work-in-progress and we +will keep on updating the state. Review Comment: ```suggestion will keep on updating these lifecycle diagrams). ``` ########## airflow/jobs/job.py: ########## @@ -292,3 +264,79 @@ def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | None ) .first() ) + + +@provide_session +def run_job(job: Job | JobPydantic, session: Session = NEW_SESSION) -> int | None: + """ + Runs the job. The Job is always an ORM object and setting the state is happening within the + same DB session and the session is kept open throughout the whole execution + + :meta private: + + TODO: Maybe we should not keep the session during job execution ?. + """ + # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it + # once final AIP-44 changes are completed. + assert isinstance(job, Job), "Job should be ORM object not Pydantic one here (AIP-44 WIP)" + job.prepare_for_execution(session=session) + try: + return execute_job(job) + finally: + job.complete_execution(session=session) + + +def execute_job(job: Job | JobPydantic) -> int | None: + """ + Executes the job. + + Job execution requires no session as generally executing session does not require an + active database connection. The session might be temporary acquired and used if the job + runs heartbeat during execution, but this connection is only acquired for the time of heartbeat + and in case of AIP-44 implementation it happens over the Internal API rather than directly via + the database. + + After the job is completed, state of the Job is updated and it should be updated in the database, + which happens in the "complete_execution" step (which again can be executed locally in case of + database operations or over the Internal API call. + + :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does + not really matter, because except of running the heartbeat and state setting, + the runner should not modify the job state. + + :meta private: + + """ + ret = None + try: + # This job_runner reference and type-ignore will be removed by further refactoring step + ret = job.job_runner._execute() # type:ignore[union-attr] + # In case of max runs or max duration + job.state = State.SUCCESS + except SystemExit: + # In case of ^C or SIGTERM + job.state = State.SUCCESS + except Exception: + job.state = State.FAILED + raise + return ret + + +def perform_heartbeat(job: Job | JobPydantic, only_if_necessary: bool) -> None: + """ + Performs heartbeat for the Job passed to it,optionally checking if it is necessary. + + :param job: job to perform heartbeat for + :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for + triggerer for example) + """ + # The below assert is a temporary one, to make MyPy happy with partial AIP-44 work - we will remove it + # once final AIP-44 changes are completed. + assert isinstance(job, Job), "Job should be ORM object not Pydantic one here (AIP-44 WIP)" + seconds_remaining: float = 0.0 + if job.latest_heartbeat and job.heartrate: + seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds() + if seconds_remaining > 0 and only_if_necessary: + return + with create_session() as session: + job.heartbeat(session=session) Review Comment: Do we need to create a session explicitly? Can't we just not pass one (and maybe comment that that is intentional)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
