rapsealk opened a new issue, #67033: URL: https://github.com/apache/airflow/issues/67033
> **Type:** Epic / new provider package > **Proposed package:** `apache-airflow-providers-backend-ai` > **Status:** Draft for discussion > **Author:** [email protected] ### Decisions locked in - **Distribution: community provider.** This will be contributed to `apache/airflow` under `providers/backend_ai/` and go through the ASF acceptance process (`[DISCUSSION]` → `[VOTE]`/lazy consensus). It is *not* a third-party / out-of-tree provider. - **Retry: Airflow task retries for v1.** The operator does not implement its own retry/backoff. Failures raise and rely on the task's `retries` / `retry_delay`. Backend.AI native session retry (BEP-1053) is a later enhancement, not v1 scope. --- ## 1. Summary Add an Apache Airflow provider package for [Backend.AI](https://www.backend.ai/). It lets a Dag task run a command or script inside a Backend.AI batch compute session, with CPU/memory/GPU allocation and virtual-folder storage, and waits for it to finish before returning the result and logs to Airflow. The deliverable is a new directory `providers/backend_ai/` that ships: - a hook wrapping the official `backend.ai-client` SDK, - a typed Airflow connection for Backend.AI keypair credentials, - operators that create a batch session, run a workload, and report results, - a sensor for polling long-running sessions, - a deferrable operator and trigger so long jobs do not pin a worker slot, - unit tests, a runnable system-test Dag, and how-to documentation. The Backend.AI repository has no Airflow integration today, so this is a greenfield effort. --- ## 2. Motivation Backend.AI is a GPU/accelerator workload-orchestration platform: users request *compute sessions* on container images with declared resource slots (`cpu`, `mem`, `cuda.devices`, …), mount *virtual folders* (vfolders) for input/output data, and run interactive or **batch** jobs. Airflow is widely used to orchestrate ML and data pipelines, but today an Airflow user who wants a pipeline step to run on a Backend.AI cluster must hand-roll a `PythonOperator` that imports the Backend.AI client, manages the session lifecycle, polls status, and cleans up. That is error-prone (leaked sessions on failure, no deferral, no templating, no typed connection). A dedicated provider replaces the hand-rolled script with a declarative operator, a managed connection, deferrable execution, and tested lifecycle handling, consistent with how the `docker` and `cncf.kubernetes` providers wrap their backends. --- ## 3. Background ### 3.1 Airflow provider anatomy (from `providers/amazon`, `providers/docker`) A provider package lives under `providers/<name>/` with: ``` providers/backend_ai/ ├── provider.yaml # metadata: operators, hooks, sensors, connection-types ├── pyproject.toml # build config + dependency on backend.ai-client ├── README.rst ├── src/airflow/providers/backend_ai/ │ ├── __init__.py │ ├── get_provider_info.py # AUTO-GENERATED from provider.yaml │ ├── hooks/ │ ├── operators/ │ ├── sensors/ │ └── triggers/ ├── tests/ │ ├── unit/backend_ai/{hooks,operators,sensors,triggers}/ │ └── system/backend_ai/example_backend_ai.py └── docs/{operators,connections}/ ``` Key mechanics learned from the codebase: - **Discovery** is via the `pyproject.toml` entry point `[project.entry-points."apache_airflow_provider"] provider_info = "...get_provider_info:get_provider_info"`. - `get_provider_info.py` is **auto-generated** from `provider.yaml` via a Breeze Jinja2 template (`dev/breeze/.../templates/`) — never hand-edit it. - **Operator** = subclass of `BaseOperator`, declares `template_fields`, implements `execute(context)`; deferrable operators additionally call `self.defer(trigger=..., method_name="execute_complete")` and implement `execute_complete(context, event)`. - **Hook** = subclass of `BaseHook`, resolves an Airflow connection by id and builds the upstream client. - **Connection type** = registered in `provider.yaml` under `connection-types` with `ui-field-behaviour` (field relabeling/hiding/placeholders). - **Trigger** = subclass of `BaseTrigger`, implements `serialize()` → `(class_path, params_dict)` and an async `run()` generator that yields a `TriggerEvent`. - New providers must satisfy `providers/ACCEPTING_PROVIDERS.rst` (see §9). ### 3.2 Backend.AI client SDK (from `backend.ai-dev`) - **pip package:** `backend.ai-client` — **import path:** `ai.backend.client`. - **Current platform version:** `26.4.4rc4`. **License:** LGPL-3.0. The provider declares `backend.ai-client` as a runtime dependency and does not vendor or modify it. ASF has specific rules for weak-copyleft dependencies, so final license compatibility must be confirmed with the PMC (see §10). - **Auth:** API keypair — access key, secret key, and endpoint URL. Env vars: `BACKEND_ENDPOINT`, `BACKEND_ACCESS_KEY`, `BACKEND_SECRET_KEY`, `BACKEND_ENDPOINT_TYPE` (`api` keypair vs `session` cookie), `BACKEND_DOMAIN`, `BACKEND_GROUP`. Configured programmatically via `APIConfig(...)`. - **Both sync and async clients exist:** `Session` (sync, runs an event loop on a worker thread) and `AsyncSession` (native async/await), both in `ai.backend.client.session`. The session entity `ComputeSession` lives in `ai.backend.client.func.session`. - **Session types** (`ai.backend.common.types.SessionTypes`, a `CIStrEnum`): `INTERACTIVE`, `BATCH` (the one we want), `INFERENCE`, `SYSTEM`. `get_or_create` takes the string value `"batch"` (the enum member also works, since `CIStrEnum` subclasses `str`). - **Lifecycle states:** `PENDING → SCHEDULED → PREPARING → PULLING → PREPARED → CREATING → RUNNING → TERMINATING → TERMINATED`; terminal: `TERMINATED`, `CANCELLED`, `ERROR`. - **Batch-session execution model.** A `BATCH`-type session runs the `startup_command` passed to `get_or_create` and then terminates on its own; `batch_timeout` bounds its runtime. This is distinct from `ComputeSession.execute(mode="batch", ...)`, which is the *interactive* run-loop API (build/clean/exec `opts`, `run_id` continuation) used against an already-running interactive session — the provider does **not** use it. - **Key client calls:** - `ComputeSession.get_or_create(image, *, name, type_, startup_command, batch_timeout, mounts, mount_map, envs, resources, resource_opts, cluster_size, cluster_mode, bootstrap_script, domain_name, group_name, max_wait, enqueue_only, …)` — creates the session; for `type_="batch"` the `startup_command` is the workload. `cluster_mode` is a `ClusterMode` enum (`single-node` / `multi-node`); the operator accepts a string and coerces it. `batch_timeout` accepts `str | int`. - `.get_info()` → brief status dict; `.detail(fields=...)` → full metadata. Completion is read from the `result` field (`SessionResult`: `undefined` / `success` / `failure`) once the session reaches a terminal status. - `.get_logs()` → dict containing the console log of the session container; the operator extracts the log field from it. - `.upload(files, ...)` / `.download(files, basedir, ...)`. - `.destroy(forced=..., recursive=...)`. - VFolder API: `VFolder.create/list/upload/download`. - Backend.AI is **adding native session-level retry** (BEP-1053, batch sessions first). The provider should not duplicate this; see Open Questions §10. --- ## 4. Goals and non-goals ### Goals - Ship `apache-airflow-providers-backend-ai` with hook, connection, operators, sensors, and a deferrable operator + trigger. - Cover the full batch-session lifecycle: create → (optional upload) → run → poll → collect result → always destroy (no leaked sessions on failure). - Support resource slots (`cpu`, `mem`, `cuda.devices`/accelerators), vfolder mounts, environment variables, container image selection, and cluster size. - Make session-affecting parameters `template_fields` so they participate in Jinja templating / XCom. - Deferrable execution so long GPU jobs do not occupy a worker slot. - Unit tests (mocked SDK), one runnable system-test Dag, how-to docs. ### Non-goals (initial release) - Interactive / inference / system session types — batch only for v1. - A Backend.AI **executor** or **triggerer transport** — operator-level only. - Re-implementing retry/backoff inside the operator (defer to Airflow task retries and/or Backend.AI BEP-1053 native retry). - Bundling the Backend.AI client — it is a normal declared dependency. - vfolder *management* operators (create/delete vfolder) beyond mounting existing vfolders — possible follow-up. --- ## 5. Proposed architecture ### 5.1 Connection type — `backend_ai` Registered in `provider.yaml` under `connection-types`, backed by the hook. | Airflow connection field | Backend.AI meaning | |--------------------------|-------------------------------| | `host` | endpoint URL (`BACKEND_ENDPOINT`) | | `login` | access key | | `password` | secret key | | `extra` | JSON: `endpoint_type`, `domain`, `group`, `skip_sslcert_validation` | `ui-field-behaviour` hides `schema`/`port`, relabels `login`→"Access Key", `password`→"Secret Key", `host`→"Endpoint URL", with placeholders. ### 5.2 Hook — `BackendAIHook` - `src/airflow/providers/backend_ai/hooks/backend_ai.py` - Subclass `BaseHook`; `conn_name_attr = "backend_ai_conn_id"`, `default_conn_name = "backend_ai_default"`, `conn_type = "backend_ai"`. - Resolves the connection and builds an `APIConfig`. - Provides `get_conn()` returning a configured `Session`, plus thin helpers: `create_batch_session(...)`, `run_batch(...)`, `get_session_info(...)`, `destroy_session(...)`, `get_session_logs(...)`. - Provides `get_async_conn()` returning an `AsyncSession` for the trigger. - Implements `get_ui_field_behaviour()` / `get_connection_form_widgets()` and a `test_connection()` method (a lightweight `ComputeSession.hello()` ping). ### 5.3 Operators `BackendAIRunBatchOperator` (`operators/backend_ai.py`) is the primary operator. It creates a `BATCH` compute session whose `startup_command` is the workload, waits for the session to reach a terminal status, reads the `result` field, fetches the container logs, returns output via XCom, and destroys the session in a `finally` block (configurable `on_finish_action`: `destroy` or `keep`). Constructor (sketch): ```python class BackendAIRunBatchOperator(BaseOperator): template_fields: Sequence[str] = ( "image", "command", "bootstrap_script", "session_name", "environment", "resources", "vfolder_mounts", ) def __init__( self, *, image: str, command: str, # the batch session's startup_command bootstrap_script: str | None = None, # optional setup run at container bootstrap backend_ai_conn_id: str = "backend_ai_default", session_name: str | None = None, # default: f"airflow-{task_id}-{run_id}" resources: dict | None = None, # {"cpu": "4", "mem": "8g", "cuda.devices": "1"} vfolder_mounts: list[str] | None = None, mount_map: dict[str, str] | None = None, environment: dict[str, str] | None = None, cluster_size: int = 1, cluster_mode: str = "single-node", # "single-node" | "multi-node" domain_name: str | None = None, group_name: str | None = None, startup_timeout: int = 600, # max_wait for the session to start batch_timeout: int | None = None, # bounds the workload's runtime poll_interval: int = 15, # status poll cadence (seconds) on_finish_action: str = "destroy", # "destroy" | "keep" deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ): ... ``` `execute()` synchronous path: call `get_or_create(type_="batch", startup_command=command, batch_timeout=...)`, then poll `get_info()` on `poll_interval` until the session reaches a terminal status. A `failure` result (or a `CANCELLED` / `ERROR` status) raises a dedicated `BackendAIJobFailedError`; a `success` result fetches `get_logs()` and returns the output. The `deferrable=True` path enqueues the session, then calls `self.defer(trigger=BackendAISessionTrigger(...), method_name="execute_complete")`. Supporting operators (small, optional for v1 but cheap): - **`BackendAICreateSessionOperator`** — create a session, return its id (for Dags that want create / sense / destroy as separate tasks). - **`BackendAIDestroySessionOperator`** — terminate a session by id (cleanup task, `trigger_rule="all_done"`). ### 5.4 Sensor — `BackendAISessionSensor` `sensors/backend_ai.py`, subclass `BaseSensorOperator`. `poke()` calls `get_info()`; returns `True` on `TERMINATED`, raises `AirflowFailException` on `ERROR`/`CANCELLED`, else `False`. Supports `deferrable=True` reusing the trigger below. ### 5.5 Trigger — `BackendAISessionTrigger` `triggers/backend_ai.py`, subclass `BaseTrigger`. - `serialize()` → `(class_path, {session_id, backend_ai_conn_id, poll_interval, ...})`. - `async run()` → opens `AsyncSession` via `hook.get_async_conn()`, polls `get_info()` on `poll_interval`, yields `TriggerEvent({"status": "success"|"error", "session_id": ..., "message": ...})` on a terminal state. - The operator's `execute_complete(context, event)` raises on `event["status"] == "error"`, otherwise fetches final logs/output and returns. ### 5.6 Lifecycle / failure handling - Session creation and the poll loop are wrapped so a failure (startup timeout, workload failure, or an Airflow task kill via `on_kill()`) still calls `destroy()` when `on_finish_action="destroy"`. - `on_kill()` is implemented to terminate the remote session if the Airflow task is killed. - Deferrable path: the trigger owns polling; `execute_complete` (or a dedicated cleanup) handles teardown. --- ## 6. Work breakdown (sub-issues / checklist) Each item below should become a tracked sub-issue of this epic. ### Milestone 1 — Provider scaffold - [ ] Create `providers/backend_ai/` skeleton (`provider.yaml`, `pyproject.toml`, `README.rst`, `src/` tree, `__init__.py`, generated `get_provider_info.py`). - [ ] Declare dependencies: `apache-airflow>=2.11.0`, `apache-airflow-providers-common-compat`, and `backend.ai-client` (pin a concrete lower bound — see §10). - [ ] Wire the `apache_airflow_provider` entry point; confirm `airflow providers list` discovers it. - [ ] Register the package in the monorepo workspace / provider lists. ### Milestone 2 — Connection + Hook - [ ] Implement `BackendAIHook` (sync + async conn, lifecycle helpers). - [ ] Register the `backend_ai` connection type with `ui-field-behaviour`. - [ ] Implement `test_connection()`. - [ ] Unit tests for the hook (mocked `Session`/`AsyncSession`). ### Milestone 3 — Core operator - [ ] Implement `BackendAIRunBatchOperator` (synchronous path). - [ ] Implement `on_kill()` + guaranteed session teardown. - [ ] Define `BackendAIJobFailedError` / related exceptions. - [ ] Unit tests: success, workload failure, startup timeout, batch-timeout, teardown-on-error, `on_finish_action="keep"`. ### Milestone 4 — Deferrable execution - [ ] Implement `BackendAISessionTrigger` (`serialize()` + async `run()`). - [ ] Add the `deferrable` path + `execute_complete()` to the operator. - [ ] Unit tests for the trigger (success / error events, serialization round-trip). ### Milestone 5 — Sensors + auxiliary operators - [ ] `BackendAISessionSensor` (sync + deferrable). - [ ] `BackendAICreateSessionOperator` / `BackendAIDestroySessionOperator`. - [ ] Unit tests. ### Milestone 6 — Docs, examples, release readiness - [ ] How-to docs under `docs/operators/` and `docs/connections/`. - [ ] Runnable system-test Dag `tests/system/backend_ai/example_backend_ai.py`. - [ ] `README.rst`, changelog/versioning per provider conventions. - [ ] Pass `prek` static checks, mypy, ruff; run selective-checks test subset. ### Milestone 7 — Community acceptance - [ ] `[DISCUSSION]` thread on `[email protected]` per `providers/ACCEPTING_PROVIDERS.rst`. - [ ] Secure committer sponsorship + ≥2 maintainers (see §9). - [ ] `[VOTE]` / lazy consensus; land as an incubating provider. --- ## 7. Example target Dag ```python from airflow import DAG from airflow.providers.backend_ai.operators.backend_ai import BackendAIRunBatchOperator with DAG("backend_ai_training", schedule="@daily", catchup=False) as dag: train = BackendAIRunBatchOperator( task_id="train_model", backend_ai_conn_id="backend_ai_default", image="cr.backend.ai/stable/python-pytorch:2.3-py310-cuda12", bootstrap_script="pip install -r /home/work/training-data/requirements.txt", command="python /home/work/training-data/train.py --epochs 10", resources={"cpu": "8", "mem": "32g", "cuda.devices": "1"}, vfolder_mounts=["training-data"], environment={"WANDB_MODE": "offline"}, startup_timeout=900, on_finish_action="destroy", deferrable=True, ) ``` --- ## 8. Testing strategy - **Unit tests** (`tests/unit/backend_ai/...`): mock the Backend.AI client with `autospec`; no network. Cover success, workload failure, startup timeout, teardown-on-error, `on_kill()`, trigger serialization, and connection parsing. Mirror source layout per project testing standards; use pytest with `@pytest.mark.parametrize`, and `time_machine` for any time logic. - **System test** (`tests/system/backend_ai/example_backend_ai.py`): a real Dag exercising the full lifecycle against a live cluster, gated on credentials, for release validation. It follows the Airflow system-test convention (`get_test_run` at module end, a `watcher()` task to propagate final state). - **Static checks:** `prek` (ruff, ruff-format, mypy), provider yaml/metadata consistency checks, selective-checks-driven test subset. --- ## 9. Community / governance considerations **Decision: this ships as a community provider in `apache/airflow`.** Per `providers/ACCEPTING_PROVIDERS.rst`, that requires: - A working, functional codebase (this epic delivers it). - A **stewardship commitment** — at least 2 individuals to maintain it. - **Committer sponsorship** — at least one existing Airflow committer. - Quality bar — tests + documentation. - Process: `[DISCUSSION]` → `[VOTE]`/lazy consensus on `[email protected]`; provider enters the **incubation** lifecycle stage first. **Action items before / alongside Milestone 1:** 1. Identify the 2+ maintainers who will commit to stewardship (Lablup / Backend.AI side is the natural source). 2. Line up at least one Airflow committer willing to sponsor and shepherd the `[DISCUSSION]`/`[VOTE]` thread. 3. Open the `[DISCUSSION]` thread on `[email protected]` early — code can be developed in parallel, but the provider cannot be merged until the community process completes. Because Backend.AI is an established FOSS project, lazy consensus (with comprehensive tests + docs in place) is a viable path. The third-party / out-of-tree route (`providers/THIRD_PARTY_PROVIDERS.rst`) has been **explicitly declined** in favour of the community path. --- ## 10. Resolved decisions and open questions **Resolved** (see "Decisions locked in" at the top): - Distribution — community provider in `apache/airflow`. - Retry ownership — Airflow task-level retries for v1; BEP-1053 native session retry is a later enhancement. **Still open:** 1. **Sync vs. async client in the hook.** Use the sync `Session` on the worker and `AsyncSession` only in the trigger (recommended), or async throughout? 2. **Log delivery.** v1 fetches the container log via `get_logs()` once the session finishes; live log tailing into the Airflow task log is a follow-up. 3. **`backend.ai-client` version floor.** Pin a concrete lower bound against a published PyPI release before scaffolding (the platform dev version is `26.4.4rc4`; the client ships on the same calendar versioning). The `apache-airflow` floor is set to `>=2.11.0`, matching the `docker` provider. 4. **License compatibility.** Confirm with the PMC that an LGPL-3.0 runtime dependency is acceptable for a community provider, and under what conditions. 5. **Image / registry conventions.** Should the operator validate image references against the connection's registry, or pass them through verbatim? 6. **vfolder management.** Out of scope for v1 (mount existing vfolders only) — confirm, or pull a `create vfolder` operator into scope. 7. **Multi-node batch.** Expose `cluster_size` / `cluster_mode="multi-node"` in v1, or defer multi-node sessions to a follow-up? --- ## 11. References - Airflow provider acceptance: `providers/ACCEPTING_PROVIDERS.rst`, `providers/THIRD_PARTY_PROVIDERS.rst`, `providers/PROVIDER_GOVERNANCE.rst` - Reference providers: `providers/amazon/`, `providers/docker/`, `providers/databricks/` - Provider scaffolding templates: `dev/breeze/src/airflow_breeze/templates/` - Backend.AI project: <https://www.backend.ai/> (product/docs), <https://github.com/lablup/backend.ai> (source, license, releases) - Backend.AI client SDK: `ai.backend.client.session` (`Session`, `AsyncSession`), `ai.backend.client.func.session.ComputeSession`, `ai.backend.client.config.APIConfig`, `ai.backend.common.types` (`SessionTypes`, `SessionResult`, `ClusterMode`) - Backend.AI native retry proposal: BEP-1053 (agent batch retry) --- Drafted-by: Claude Code (Opus 4.7); reviewed by @rapsealk before posting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
