vincbeck commented on code in PR #67919: URL: https://github.com/apache/airflow/pull/67919#discussion_r3352143075
########## airflow-core/src/airflow/api_fastapi/execution_api/routes/jobs.py: ########## @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Execution API routes for ``Job`` registration and liveness (AIP-92). + +A DB-free triggerer has no metadata-DB access of its own, but trigger assignment in +``Trigger.assign_unassigned`` keys off a live ``Job`` row (it skips triggerers whose +heartbeat is stale). These endpoints let such a triggerer register a ``Job`` over HTTP +and keep it alive, so its triggers are not reassigned to other triggerers. +""" + +from __future__ import annotations + +import logging + +from cadwyn import VersionedAPIRouter +from fastapi import HTTPException, status + +from airflow._shared.timezones import timezone +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.execution_api.datamodels.job import JobRegisterBody, JobRegisterResponse +from airflow.jobs.job import Job, JobState + +router = VersionedAPIRouter() + +log = logging.getLogger(__name__) + + [email protected]("", status_code=status.HTTP_201_CREATED) +def register_job(body: JobRegisterBody, session: SessionDep) -> JobRegisterResponse: + """Create a started + heartbeated ``Job`` row and return its id.""" + # ``Job.__init__`` already stamps hostname/start_date/latest_heartbeat/unixname; we only + # need to mark it RUNNING (mirrors ``Job.prepare_for_execution``) so it counts as alive. + job = Job(job_type=body.job_type, state=JobState.RUNNING) + # ``Job.__init__`` records the api-server's hostname; overwrite it with the registering + # process's hostname so log retrieval (FileTaskHandler) reaches the right host. + job.hostname = body.hostname + session.add(job) + session.flush() + return JobRegisterResponse(job_id=job.id) + + [email protected]("/{job_id}/heartbeat", status_code=status.HTTP_204_NO_CONTENT) +def heartbeat_job(job_id: int, session: SessionDep) -> None: + """Record a fresh heartbeat for the given ``Job`` so it stays alive.""" + job = session.get(Job, job_id) + if job is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job {job_id} not found") + job.latest_heartbeat = timezone.utcnow() + job.end_date = None + + [email protected]("/{job_id}/complete", status_code=status.HTTP_204_NO_CONTENT) Review Comment: nit: technically this is not a creation but an update, it should be `PUT` or `PATCH` ########## airflow-core/src/airflow/jobs/triggerer_job_runner.py: ########## @@ -240,16 +275,19 @@ def _execute(self) -> int | None: export_legacy_names=conf.getboolean("metrics", "legacy_names_on"), ) try: - # Kick off runner sub-process without DB access + # AIP-92: the triggerer orchestrates through the Execution API with no metadata-DB + # access. Kick off the runner sub-process without DB access. ``job=None``: the + # supervisor registers and heartbeats its liveness Job through the Execution API, + # not a metadata-DB row. self.trigger_runner = TriggerRunnerSupervisor.start( - job=self.job, + job=None, Review Comment: Do we need to keep `job` as a parameter of `TriggerRunnerSupervisor.start`? ########## airflow-core/src/airflow/api_fastapi/execution_api/routes/triggers.py: ########## @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Execution API routes for the triggerer (AIP-92). + +These endpoints let a DB-free triggerer orchestrate over HTTP instead of accessing the +metadata database directly: claim triggers, fetch the corresponding runnable workloads, +and report events/failures/cleanup back to the database. +""" + +from __future__ import annotations + +import logging + +from cadwyn import VersionedAPIRouter +from fastapi import status + +from airflow.api_fastapi.common.dagbag import DagBagDep +from airflow.api_fastapi.common.db.common import SessionDep +from airflow.api_fastapi.execution_api.datamodels.trigger import ( + TriggerEventBody, + TriggerFailureBody, + TriggerIdsResponse, + TriggerLoadBody, + TriggerWorkloadsBody, + TriggerWorkloadsResponse, +) +from airflow.jobs.triggerer_workloads import build_run_trigger_workloads +from airflow.models.trigger import Trigger +from airflow.utils.helpers import log_filename_template_renderer + +router = VersionedAPIRouter() + +log = logging.getLogger(__name__) + + [email protected]("/load", status_code=status.HTTP_200_OK) +def load_triggers(body: TriggerLoadBody, session: SessionDep) -> TriggerIdsResponse: + """Assign unassigned triggers to the triggerer and return its assigned trigger IDs.""" + queues = set(body.queues) if body.queues else None + Trigger.assign_unassigned( + body.triggerer_id, + body.capacity, + body.health_check_threshold, + queues=queues, + team_name=body.team_name, + session=session, + ) + trigger_ids = Trigger.ids_for_triggerer( + body.triggerer_id, + queues=queues, + team_name=body.team_name, + session=session, + ) + return TriggerIdsResponse(trigger_ids=trigger_ids) + + [email protected]("/workloads", status_code=status.HTTP_200_OK) Review Comment: ```suggestion @router.get("/workloads", status_code=status.HTTP_200_OK) ``` ########## airflow-core/src/airflow/api_fastapi/execution_api/datamodels/trigger.py: ########## @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any + +from pydantic import JsonValue + +from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel + + +class TriggerLoadBody(StrictBaseModel): + """Body for requesting triggers to be assigned to and loaded for a triggerer.""" + + triggerer_id: int + """Id of the requesting triggerer's Job row.""" + capacity: int + """Maximum number of triggers the triggerer can run concurrently.""" + health_check_threshold: float + """Seconds since last heartbeat after which a triggerer is considered dead.""" + queues: list[str] | None = None + """Optional set of trigger queues this triggerer serves.""" + team_name: str | None = None + """Optional team this triggerer serves (multi-team setups).""" + + +class TriggerIdsResponse(BaseModel): + """Response listing the trigger IDs currently assigned to a triggerer.""" + + trigger_ids: list[int] + + +class TriggerWorkloadsBody(StrictBaseModel): + """Body for requesting runnable workloads for a set of trigger IDs.""" + + trigger_ids: list[int] + + +class TriggerWorkloadsResponse(BaseModel): + """Response carrying serialized ``RunTrigger`` workloads.""" + + workloads: list[dict[str, Any]] + """Each entry is a ``RunTrigger.model_dump(mode="json")``.""" + + +class TriggerEventBody(StrictBaseModel): + """Body for submitting a trigger event payload.""" + + payload: JsonValue = None + """ + The event payload to resume a deferred task instance with. Review Comment: nit: triggers have no longer one goal: resuming task. They can also send update to assets (event driven scheduling) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
