rapsealk opened a new issue, #67033:
URL: https://github.com/apache/airflow/issues/67033

   > **Type:** Epic / new provider package
   > **Proposed package:** `apache-airflow-providers-backend-ai`
   > **Status:** Draft for discussion
   > **Author:** [email protected]
   
   ### Decisions locked in
   
   - **Distribution: community provider.** This will be contributed to
     `apache/airflow` under `providers/backend_ai/` and go through the ASF
     acceptance process (`[DISCUSSION]` → `[VOTE]`/lazy consensus). It is *not* 
a
     third-party / out-of-tree provider.
   - **Retry: Airflow task retries for v1.** The operator does not implement its
     own retry/backoff. Failures raise and rely on the task's `retries` /
     `retry_delay`. Backend.AI native session retry (BEP-1053) is a later
     enhancement, not v1 scope.
   
   ---
   
   ## 1. Summary
   
   Add an Apache Airflow provider package for 
[Backend.AI](https://www.backend.ai/).
   It lets a Dag task run a command or script inside a Backend.AI batch compute
   session, with CPU/memory/GPU allocation and virtual-folder storage, and waits
   for it to finish before returning the result and logs to Airflow.
   
   The deliverable is a new directory `providers/backend_ai/` that ships:
   
   - a hook wrapping the official `backend.ai-client` SDK,
   - a typed Airflow connection for Backend.AI keypair credentials,
   - operators that create a batch session, run a workload, and report results,
   - a sensor for polling long-running sessions,
   - a deferrable operator and trigger so long jobs do not pin a worker slot,
   - unit tests, a runnable system-test Dag, and how-to documentation.
   
   The Backend.AI repository has no Airflow integration today, so this is a
   greenfield effort.
   
   ---
   
   ## 2. Motivation
   
   Backend.AI is a GPU/accelerator workload-orchestration platform: users 
request
   *compute sessions* on container images with declared resource slots
   (`cpu`, `mem`, `cuda.devices`, …), mount *virtual folders* (vfolders) for
   input/output data, and run interactive or **batch** jobs.
   
   Airflow is widely used to orchestrate ML and data pipelines, but today an
   Airflow user who wants a pipeline step to run on a Backend.AI cluster must
   hand-roll a `PythonOperator` that imports the Backend.AI client, manages the
   session lifecycle, polls status, and cleans up. That is error-prone (leaked
   sessions on failure, no deferral, no templating, no typed connection).
   
   A dedicated provider replaces the hand-rolled script with a declarative
   operator, a managed connection, deferrable execution, and tested lifecycle
   handling, consistent with how the `docker` and `cncf.kubernetes` providers 
wrap
   their backends.
   
   ---
   
   ## 3. Background
   
   ### 3.1 Airflow provider anatomy (from `providers/amazon`, 
`providers/docker`)
   
   A provider package lives under `providers/<name>/` with:
   
   ```
   providers/backend_ai/
   ├── provider.yaml            # metadata: operators, hooks, sensors, 
connection-types
   ├── pyproject.toml           # build config + dependency on backend.ai-client
   ├── README.rst
   ├── src/airflow/providers/backend_ai/
   │   ├── __init__.py
   │   ├── get_provider_info.py # AUTO-GENERATED from provider.yaml
   │   ├── hooks/
   │   ├── operators/
   │   ├── sensors/
   │   └── triggers/
   ├── tests/
   │   ├── unit/backend_ai/{hooks,operators,sensors,triggers}/
   │   └── system/backend_ai/example_backend_ai.py
   └── docs/{operators,connections}/
   ```
   
   Key mechanics learned from the codebase:
   
   - **Discovery** is via the `pyproject.toml` entry point
     `[project.entry-points."apache_airflow_provider"] provider_info = 
"...get_provider_info:get_provider_info"`.
   - `get_provider_info.py` is **auto-generated** from `provider.yaml` via a 
Breeze
     Jinja2 template (`dev/breeze/.../templates/`) — never hand-edit it.
   - **Operator** = subclass of `BaseOperator`, declares `template_fields`,
     implements `execute(context)`; deferrable operators additionally call
     `self.defer(trigger=..., method_name="execute_complete")` and implement
     `execute_complete(context, event)`.
   - **Hook** = subclass of `BaseHook`, resolves an Airflow connection by id and
     builds the upstream client.
   - **Connection type** = registered in `provider.yaml` under 
`connection-types`
     with `ui-field-behaviour` (field relabeling/hiding/placeholders).
   - **Trigger** = subclass of `BaseTrigger`, implements `serialize()` →
     `(class_path, params_dict)` and an async `run()` generator that yields a
     `TriggerEvent`.
   - New providers must satisfy `providers/ACCEPTING_PROVIDERS.rst` (see §9).
   
   ### 3.2 Backend.AI client SDK (from `backend.ai-dev`)
   
   - **pip package:** `backend.ai-client` — **import path:** 
`ai.backend.client`.
   - **Current platform version:** `26.4.4rc4`. **License:** LGPL-3.0. The 
provider
     declares `backend.ai-client` as a runtime dependency and does not vendor or
     modify it. ASF has specific rules for weak-copyleft dependencies, so final
     license compatibility must be confirmed with the PMC (see §10).
   - **Auth:** API keypair — access key, secret key, and endpoint URL. Env vars:
     `BACKEND_ENDPOINT`, `BACKEND_ACCESS_KEY`, `BACKEND_SECRET_KEY`,
     `BACKEND_ENDPOINT_TYPE` (`api` keypair vs `session` cookie), 
`BACKEND_DOMAIN`,
     `BACKEND_GROUP`. Configured programmatically via `APIConfig(...)`.
   - **Both sync and async clients exist:** `Session` (sync, runs an event loop 
on
     a worker thread) and `AsyncSession` (native async/await), both in
     `ai.backend.client.session`. The session entity `ComputeSession` lives in
     `ai.backend.client.func.session`.
   - **Session types** (`ai.backend.common.types.SessionTypes`, a `CIStrEnum`):
     `INTERACTIVE`, `BATCH` (the one we want), `INFERENCE`, `SYSTEM`. 
`get_or_create`
     takes the string value `"batch"` (the enum member also works, since 
`CIStrEnum`
     subclasses `str`).
   - **Lifecycle states:** `PENDING → SCHEDULED → PREPARING → PULLING → 
PREPARED →
     CREATING → RUNNING → TERMINATING → TERMINATED`; terminal: `TERMINATED`,
     `CANCELLED`, `ERROR`.
   - **Batch-session execution model.** A `BATCH`-type session runs the
     `startup_command` passed to `get_or_create` and then terminates on its own;
     `batch_timeout` bounds its runtime. This is distinct from
     `ComputeSession.execute(mode="batch", ...)`, which is the *interactive*
     run-loop API (build/clean/exec `opts`, `run_id` continuation) used against 
an
     already-running interactive session — the provider does **not** use it.
   - **Key client calls:**
     - `ComputeSession.get_or_create(image, *, name, type_, startup_command,
       batch_timeout, mounts, mount_map, envs, resources, resource_opts,
       cluster_size, cluster_mode, bootstrap_script, domain_name, group_name,
       max_wait, enqueue_only, …)` — creates the session; for `type_="batch"` 
the
       `startup_command` is the workload. `cluster_mode` is a `ClusterMode` enum
       (`single-node` / `multi-node`); the operator accepts a string and 
coerces it.
       `batch_timeout` accepts `str | int`.
     - `.get_info()` → brief status dict; `.detail(fields=...)` → full metadata.
       Completion is read from the `result` field (`SessionResult`: `undefined` 
/
       `success` / `failure`) once the session reaches a terminal status.
     - `.get_logs()` → dict containing the console log of the session container;
       the operator extracts the log field from it.
     - `.upload(files, ...)` / `.download(files, basedir, ...)`.
     - `.destroy(forced=..., recursive=...)`.
     - VFolder API: `VFolder.create/list/upload/download`.
   - Backend.AI is **adding native session-level retry** (BEP-1053, batch 
sessions
     first). The provider should not duplicate this; see Open Questions §10.
   
   ---
   
   ## 4. Goals and non-goals
   
   ### Goals
   
   - Ship `apache-airflow-providers-backend-ai` with hook, connection, 
operators,
     sensors, and a deferrable operator + trigger.
   - Cover the full batch-session lifecycle: create → (optional upload) → run →
     poll → collect result → always destroy (no leaked sessions on failure).
   - Support resource slots (`cpu`, `mem`, `cuda.devices`/accelerators), vfolder
     mounts, environment variables, container image selection, and cluster size.
   - Make session-affecting parameters `template_fields` so they participate in
     Jinja templating / XCom.
   - Deferrable execution so long GPU jobs do not occupy a worker slot.
   - Unit tests (mocked SDK), one runnable system-test Dag, how-to docs.
   
   ### Non-goals (initial release)
   
   - Interactive / inference / system session types — batch only for v1.
   - A Backend.AI **executor** or **triggerer transport** — operator-level only.
   - Re-implementing retry/backoff inside the operator (defer to Airflow task
     retries and/or Backend.AI BEP-1053 native retry).
   - Bundling the Backend.AI client — it is a normal declared dependency.
   - vfolder *management* operators (create/delete vfolder) beyond mounting
     existing vfolders — possible follow-up.
   
   ---
   
   ## 5. Proposed architecture
   
   ### 5.1 Connection type — `backend_ai`
   
   Registered in `provider.yaml` under `connection-types`, backed by the hook.
   
   | Airflow connection field | Backend.AI meaning            |
   |--------------------------|-------------------------------|
   | `host`                   | endpoint URL (`BACKEND_ENDPOINT`) |
   | `login`                  | access key                    |
   | `password`               | secret key                    |
   | `extra`                  | JSON: `endpoint_type`, `domain`, `group`, 
`skip_sslcert_validation` |
   
   `ui-field-behaviour` hides `schema`/`port`, relabels `login`→"Access Key",
   `password`→"Secret Key", `host`→"Endpoint URL", with placeholders.
   
   ### 5.2 Hook — `BackendAIHook`
   
   - `src/airflow/providers/backend_ai/hooks/backend_ai.py`
   - Subclass `BaseHook`; `conn_name_attr = "backend_ai_conn_id"`,
     `default_conn_name = "backend_ai_default"`, `conn_type = "backend_ai"`.
   - Resolves the connection and builds an `APIConfig`.
   - Provides `get_conn()` returning a configured `Session`, plus thin helpers:
     `create_batch_session(...)`, `run_batch(...)`, `get_session_info(...)`,
     `destroy_session(...)`, `get_session_logs(...)`.
   - Provides `get_async_conn()` returning an `AsyncSession` for the trigger.
   - Implements `get_ui_field_behaviour()` / `get_connection_form_widgets()` 
and a
     `test_connection()` method (a lightweight `ComputeSession.hello()` ping).
   
   ### 5.3 Operators
   
   `BackendAIRunBatchOperator` (`operators/backend_ai.py`) is the primary 
operator.
   It creates a `BATCH` compute session whose `startup_command` is the workload,
   waits for the session to reach a terminal status, reads the `result` field,
   fetches the container logs, returns output via XCom, and destroys the 
session in
   a `finally` block (configurable `on_finish_action`: `destroy` or `keep`).
   
   Constructor (sketch):
   
   ```python
   class BackendAIRunBatchOperator(BaseOperator):
       template_fields: Sequence[str] = (
           "image", "command", "bootstrap_script", "session_name",
           "environment", "resources", "vfolder_mounts",
       )
   
       def __init__(
           self,
           *,
           image: str,
           command: str,                           # the batch session's 
startup_command
           bootstrap_script: str | None = None,    # optional setup run at 
container bootstrap
           backend_ai_conn_id: str = "backend_ai_default",
           session_name: str | None = None,        # default: 
f"airflow-{task_id}-{run_id}"
           resources: dict | None = None,          # {"cpu": "4", "mem": "8g", 
"cuda.devices": "1"}
           vfolder_mounts: list[str] | None = None,
           mount_map: dict[str, str] | None = None,
           environment: dict[str, str] | None = None,
           cluster_size: int = 1,
           cluster_mode: str = "single-node",      # "single-node" | 
"multi-node"
           domain_name: str | None = None,
           group_name: str | None = None,
           startup_timeout: int = 600,             # max_wait for the session 
to start
           batch_timeout: int | None = None,       # bounds the workload's 
runtime
           poll_interval: int = 15,                # status poll cadence 
(seconds)
           on_finish_action: str = "destroy",      # "destroy" | "keep"
           deferrable: bool = conf.getboolean("operators", 
"default_deferrable", fallback=False),
           **kwargs,
       ): ...
   ```
   
   `execute()` synchronous path: call `get_or_create(type_="batch",
   startup_command=command, batch_timeout=...)`, then poll `get_info()` on
   `poll_interval` until the session reaches a terminal status. A `failure` 
result
   (or a `CANCELLED` / `ERROR` status) raises a dedicated 
`BackendAIJobFailedError`;
   a `success` result fetches `get_logs()` and returns the output. The
   `deferrable=True` path enqueues the session, then calls
   `self.defer(trigger=BackendAISessionTrigger(...), 
method_name="execute_complete")`.
   
   Supporting operators (small, optional for v1 but cheap):
   
   - **`BackendAICreateSessionOperator`** — create a session, return its id (for
     Dags that want create / sense / destroy as separate tasks).
   - **`BackendAIDestroySessionOperator`** — terminate a session by id (cleanup
     task, `trigger_rule="all_done"`).
   
   ### 5.4 Sensor — `BackendAISessionSensor`
   
   `sensors/backend_ai.py`, subclass `BaseSensorOperator`. `poke()` calls
   `get_info()`; returns `True` on `TERMINATED`, raises `AirflowFailException` 
on
   `ERROR`/`CANCELLED`, else `False`. Supports `deferrable=True` reusing the
   trigger below.
   
   ### 5.5 Trigger — `BackendAISessionTrigger`
   
   `triggers/backend_ai.py`, subclass `BaseTrigger`.
   
   - `serialize()` → `(class_path, {session_id, backend_ai_conn_id, 
poll_interval,
     ...})`.
   - `async run()` → opens `AsyncSession` via `hook.get_async_conn()`, polls
     `get_info()` on `poll_interval`, yields
     `TriggerEvent({"status": "success"|"error", "session_id": ..., "message": 
...})`
     on a terminal state.
   - The operator's `execute_complete(context, event)` raises on
     `event["status"] == "error"`, otherwise fetches final logs/output and 
returns.
   
   ### 5.6 Lifecycle / failure handling
   
   - Session creation and the poll loop are wrapped so a failure (startup 
timeout,
     workload failure, or an Airflow task kill via `on_kill()`) still calls
     `destroy()` when `on_finish_action="destroy"`.
   - `on_kill()` is implemented to terminate the remote session if the Airflow 
task
     is killed.
   - Deferrable path: the trigger owns polling; `execute_complete` (or a 
dedicated
     cleanup) handles teardown.
   
   ---
   
   ## 6. Work breakdown (sub-issues / checklist)
   
   Each item below should become a tracked sub-issue of this epic.
   
   ### Milestone 1 — Provider scaffold
   - [ ] Create `providers/backend_ai/` skeleton (`provider.yaml`, 
`pyproject.toml`,
         `README.rst`, `src/` tree, `__init__.py`, generated 
`get_provider_info.py`).
   - [ ] Declare dependencies: `apache-airflow>=2.11.0`,
         `apache-airflow-providers-common-compat`, and `backend.ai-client` (pin 
a
         concrete lower bound — see §10).
   - [ ] Wire the `apache_airflow_provider` entry point; confirm `airflow 
providers
         list` discovers it.
   - [ ] Register the package in the monorepo workspace / provider lists.
   
   ### Milestone 2 — Connection + Hook
   - [ ] Implement `BackendAIHook` (sync + async conn, lifecycle helpers).
   - [ ] Register the `backend_ai` connection type with `ui-field-behaviour`.
   - [ ] Implement `test_connection()`.
   - [ ] Unit tests for the hook (mocked `Session`/`AsyncSession`).
   
   ### Milestone 3 — Core operator
   - [ ] Implement `BackendAIRunBatchOperator` (synchronous path).
   - [ ] Implement `on_kill()` + guaranteed session teardown.
   - [ ] Define `BackendAIJobFailedError` / related exceptions.
   - [ ] Unit tests: success, workload failure, startup timeout, batch-timeout,
         teardown-on-error, `on_finish_action="keep"`.
   
   ### Milestone 4 — Deferrable execution
   - [ ] Implement `BackendAISessionTrigger` (`serialize()` + async `run()`).
   - [ ] Add the `deferrable` path + `execute_complete()` to the operator.
   - [ ] Unit tests for the trigger (success / error events, serialization
         round-trip).
   
   ### Milestone 5 — Sensors + auxiliary operators
   - [ ] `BackendAISessionSensor` (sync + deferrable).
   - [ ] `BackendAICreateSessionOperator` / `BackendAIDestroySessionOperator`.
   - [ ] Unit tests.
   
   ### Milestone 6 — Docs, examples, release readiness
   - [ ] How-to docs under `docs/operators/` and `docs/connections/`.
   - [ ] Runnable system-test Dag 
`tests/system/backend_ai/example_backend_ai.py`.
   - [ ] `README.rst`, changelog/versioning per provider conventions.
   - [ ] Pass `prek` static checks, mypy, ruff; run selective-checks test 
subset.
   
   ### Milestone 7 — Community acceptance
   - [ ] `[DISCUSSION]` thread on `[email protected]` per
         `providers/ACCEPTING_PROVIDERS.rst`.
   - [ ] Secure committer sponsorship + ≥2 maintainers (see §9).
   - [ ] `[VOTE]` / lazy consensus; land as an incubating provider.
   
   ---
   
   ## 7. Example target Dag
   
   ```python
   from airflow import DAG
   from airflow.providers.backend_ai.operators.backend_ai import 
BackendAIRunBatchOperator
   
   with DAG("backend_ai_training", schedule="@daily", catchup=False) as dag:
       train = BackendAIRunBatchOperator(
           task_id="train_model",
           backend_ai_conn_id="backend_ai_default",
           image="cr.backend.ai/stable/python-pytorch:2.3-py310-cuda12",
           bootstrap_script="pip install -r 
/home/work/training-data/requirements.txt",
           command="python /home/work/training-data/train.py --epochs 10",
           resources={"cpu": "8", "mem": "32g", "cuda.devices": "1"},
           vfolder_mounts=["training-data"],
           environment={"WANDB_MODE": "offline"},
           startup_timeout=900,
           on_finish_action="destroy",
           deferrable=True,
       )
   ```
   
   ---
   
   ## 8. Testing strategy
   
   - **Unit tests** (`tests/unit/backend_ai/...`): mock the Backend.AI client 
with
     `autospec`; no network. Cover success, workload failure, startup timeout,
     teardown-on-error, `on_kill()`, trigger serialization, and connection
     parsing. Mirror source layout per project testing standards; use pytest 
with
     `@pytest.mark.parametrize`, and `time_machine` for any time logic.
   - **System test** (`tests/system/backend_ai/example_backend_ai.py`): a real 
Dag
     exercising the full lifecycle against a live cluster, gated on credentials,
     for release validation. It follows the Airflow system-test convention
     (`get_test_run` at module end, a `watcher()` task to propagate final 
state).
   - **Static checks:** `prek` (ruff, ruff-format, mypy), provider yaml/metadata
     consistency checks, selective-checks-driven test subset.
   
   ---
   
   ## 9. Community / governance considerations
   
   **Decision: this ships as a community provider in `apache/airflow`.** Per
   `providers/ACCEPTING_PROVIDERS.rst`, that requires:
   
   - A working, functional codebase (this epic delivers it).
   - A **stewardship commitment** — at least 2 individuals to maintain it.
   - **Committer sponsorship** — at least one existing Airflow committer.
   - Quality bar — tests + documentation.
   - Process: `[DISCUSSION]` → `[VOTE]`/lazy consensus on 
`[email protected]`;
     provider enters the **incubation** lifecycle stage first.
   
   **Action items before / alongside Milestone 1:**
   
   1. Identify the 2+ maintainers who will commit to stewardship (Lablup /
      Backend.AI side is the natural source).
   2. Line up at least one Airflow committer willing to sponsor and shepherd the
      `[DISCUSSION]`/`[VOTE]` thread.
   3. Open the `[DISCUSSION]` thread on `[email protected]` early — code 
can
      be developed in parallel, but the provider cannot be merged until the
      community process completes. Because Backend.AI is an established FOSS
      project, lazy consensus (with comprehensive tests + docs in place) is a
      viable path.
   
   The third-party / out-of-tree route (`providers/THIRD_PARTY_PROVIDERS.rst`) 
has
   been **explicitly declined** in favour of the community path.
   
   ---
   
   ## 10. Resolved decisions and open questions
   
   **Resolved** (see "Decisions locked in" at the top):
   
   - Distribution — community provider in `apache/airflow`.
   - Retry ownership — Airflow task-level retries for v1; BEP-1053 native 
session
     retry is a later enhancement.
   
   **Still open:**
   
   1. **Sync vs. async client in the hook.** Use the sync `Session` on the 
worker
      and `AsyncSession` only in the trigger (recommended), or async throughout?
   2. **Log delivery.** v1 fetches the container log via `get_logs()` once the
      session finishes; live log tailing into the Airflow task log is a 
follow-up.
   3. **`backend.ai-client` version floor.** Pin a concrete lower bound against 
a
      published PyPI release before scaffolding (the platform dev version is
      `26.4.4rc4`; the client ships on the same calendar versioning). The
      `apache-airflow` floor is set to `>=2.11.0`, matching the `docker` 
provider.
   4. **License compatibility.** Confirm with the PMC that an LGPL-3.0 runtime
      dependency is acceptable for a community provider, and under what 
conditions.
   5. **Image / registry conventions.** Should the operator validate image
      references against the connection's registry, or pass them through 
verbatim?
   6. **vfolder management.** Out of scope for v1 (mount existing vfolders 
only) —
      confirm, or pull a `create vfolder` operator into scope.
   7. **Multi-node batch.** Expose `cluster_size` / `cluster_mode="multi-node"` 
in
      v1, or defer multi-node sessions to a follow-up?
   
   ---
   
   ## 11. References
   
   - Airflow provider acceptance: `providers/ACCEPTING_PROVIDERS.rst`,
     `providers/THIRD_PARTY_PROVIDERS.rst`, `providers/PROVIDER_GOVERNANCE.rst`
   - Reference providers: `providers/amazon/`, `providers/docker/`,
     `providers/databricks/`
   - Provider scaffolding templates: `dev/breeze/src/airflow_breeze/templates/`
   - Backend.AI project: <https://www.backend.ai/> (product/docs),
     <https://github.com/lablup/backend.ai> (source, license, releases)
   - Backend.AI client SDK: `ai.backend.client.session` (`Session`,
     `AsyncSession`), `ai.backend.client.func.session.ComputeSession`,
     `ai.backend.client.config.APIConfig`, `ai.backend.common.types`
     (`SessionTypes`, `SessionResult`, `ClusterMode`)
   - Backend.AI native retry proposal: BEP-1053 (agent batch retry)
   
   ---
   
   Drafted-by: Claude Code (Opus 4.7); reviewed by @rapsealk before posting
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to