This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 34d9bd14205 Add preliminary coordinator and Java SDK documentation
(#67699)
34d9bd14205 is described below
commit 34d9bd14205e1c01793df78f01876cd83d0eeb9c
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Jun 2 12:15:57 2026 +0800
Add preliminary coordinator and Java SDK documentation (#67699)
Co-authored-by: Kaxil Naik <[email protected]>
---
.../docs/authoring-and-scheduling/index.rst | 1 +
.../language-sdks/index.rst | 162 ++++++++++
.../language-sdks/java.rst | 360 +++++++++++++++++++++
docs/spelling_wordlist.txt | 1 +
task-sdk/docs/api.rst | 10 +
5 files changed, 534 insertions(+)
diff --git a/airflow-core/docs/authoring-and-scheduling/index.rst
b/airflow-core/docs/authoring-and-scheduling/index.rst
index cbcac076063..fa7867ebf44 100644
--- a/airflow-core/docs/authoring-and-scheduling/index.rst
+++ b/airflow-core/docs/authoring-and-scheduling/index.rst
@@ -33,6 +33,7 @@ It's recommended that you first review the pages in
:doc:`core concepts </core-c
connections
dynamic-task-mapping
assets
+ Non-Python Task SDKs <language-sdks/index>
.. _scheduling-section:
diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst
b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst
new file mode 100644
index 00000000000..ec7a4d88cc1
--- /dev/null
+++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst
@@ -0,0 +1,162 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _language-sdks:
+
+Non-Python Task SDKs
+====================
+
+|experimental|
+
+Airflow Dags are always defined in Python, but individual task
*implementations* can be written in other
+languages. When a task runs, Airflow's worker calls out the target language to
execute the task logic, which
+communicates the result back. The Dag author uses a lightweight Python *stub*
to declare where the task lives;
+everything else, including the actual business logic, Airflow API calls, and
any library dependencies, lives
+in a non-Python implementation.
+
+.. list-table:: Available language SDKs
+ :header-rows: 1
+ :widths: 15 40 15 30
+
+ * - Language
+ - Coordinator class
+ - Min. runtime
+ - Guide
+ * - JVM languages (e.g. Java)
+ - :class:`task-sdk:airflow.sdk.coordinators.java.JavaCoordinator`
+ - JRE 17
+ - :doc:`java`
+
+.. toctree::
+ :hidden:
+
+ java
+
+How it works
+------------
+
+The execution model has three moving parts.
+
+**Stub tasks in the Dag**
+ The Dag file declares tasks using :func:`@task.stub
<airflow.sdk.task.stub>`. A stub is a normal Airflow
+ task from the scheduler's perspective. It participates in dependencies,
retries, pools, and all other
+ task-level features exactly like other ``@task``-decorated Python
functions. The only difference is that
+ the worker does not execute the Python code inside the function definition;
instead it delegates execution
+ to a *coordinator*.
+
+**Coordinators**
+ A coordinator is a Python object registered in the ``[sdk] coordinators``
configuration. This is considered
+ a part of an Airflow worker. When the worker picks up a stub task, it looks
up the coordinator mapped to
+ that task's specified ``queue``, and uses the coordinator to execute the
task. The coordinator is
+ responsible for managing the target language's runtime, forwarding messages
from, and relaying results back
+ to Airflow. All coordinators extend
+ :class:`task-sdk:airflow.sdk.execution_time.coordinator.BaseCoordinator`.
+
+**Language runtime**
+ The coordinator calls out one short-lived runtime per task instance. In
most cases, this would be a
+ subprocess of an executable implemented in a non-Python language. The
runtime receives messages from the
+ worker to identify the workload, executes the task, and communicates
through the coordinator as a proxy
+ back to the worker process.
+
+.. _language-sdks/stub-tasks:
+
+Stub tasks
+----------
+
+A stub task is declared with the :func:`@task.stub <airflow.sdk.task.stub>`
decorator. Since it is still a
+Python task declaration, every parameter available on a normal Dag or task
applies. Task dependencies are also
+defined in the Python Dag file. The scheduler treats a stub like any other
task.
+
+.. code-block:: python
+
+ import datetime
+
+ from airflow.sdk import dag, task
+
+
+ @dag
+ def my_pipeline():
+ raw = fetch_data() # normal Python task
+
+ @task.stub(
+ queue="java", # routes to the JavaCoordinator
+ retries=3,
+ retry_delay=datetime.timedelta(minutes=5),
+ execution_timeout=datetime.timedelta(hours=1),
+ pool="heavy_tasks",
+ )
+ def process(raw_value): ... # implemented in Java
+
+ @task.stub(queue="java")
+ def export(processed_value): ...
+
+ export(process(raw))
+
+
+ my_pipeline()
+
+The ``queue`` parameter determines which coordinator handles the task. Any
other ``@task`` keyword argument is
+stored on the task instance and honored by Airflow's scheduler and worker as
usual.
+
+XCom values produced by a stub task are visible to downstream Python tasks and
vice-versa. However, although
+XCom references should be defined inside the Python Dag (they are task
dependencies), you still need to
+actually read the values out in the language implementation, and vice versa.
See specific language SDK
+documentation on how to do this correctly.
+
+.. _language-sdks/coordinator-config:
+
+Coordinator configuration
+-------------------------
+
+Coordinators are registered in ``airflow.cfg`` (or via environment variables)
under ``[sdk]``.
+
+``coordinators``
+ A JSON object mapping a logical coordinator name to its class and keyword
arguments:
+
+ .. code-block:: ini
+
+ [sdk]
+ coordinators = {
+ "my-coordinator": {
+ "classpath": "path.to.CoordinatorClass",
+ "kwargs": {}
+ }
+ }
+
+ The ``classpath`` value must be importable by the worker. The ``kwargs``
are passed directly
+ to the coordinator's constructor. See the language-specific guide for the
accepted kwargs
+ of each coordinator (e.g. :ref:`java-sdk/coordinator-config` for
+ :class:`~airflow.sdk.coordinators.java.JavaCoordinator`).
+
+``queue_to_coordinator``
+ A JSON object mapping Celery queue names to coordinator names:
+
+ .. code-block:: ini
+
+ [sdk]
+ queue_to_coordinator = {"jdk17": "my-coordinator"}
+
+ Tasks with ``queue="jdk17"`` on their stub will be dispatched to the
coordinator named
+ ``"my-coordinator"``. A single coordinator can serve multiple queues; a
queue can only
+ map to one coordinator.
+
+Both settings can be supplied as environment variables using the standard
Airflow convention:
+
+.. code-block:: bash
+
+ AIRFLOW__SDK__COORDINATORS='{"my-coordinator": {...}}'
+ AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"jdk17": "my-coordinator"}'
diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
new file mode 100644
index 00000000000..a752bee8017
--- /dev/null
+++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
@@ -0,0 +1,360 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _java-sdk:
+
+Java SDK
+========
+
+|experimental|
+
+The Java SDK lets you implement Airflow task logic in Java, Kotlin, or any
other JVM language. The Dag and its
+scheduling remain in Python; individual tasks delegate to a JVM subprocess
that is spawned by
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` for each task instance.
+
+.. contents:: Contents
+ :local:
+ :depth: 2
+
+Prerequisites
+-------------
+
+* JRE 17 or later must be available on the Airflow worker nodes.
+* The compiled task JAR(s) and JVM dependencies must be accessible from the
worker.
+* The ``apache-airflow-task-sdk`` package (installed with Airflow) provides
the coordinator;
+ no additional Python packages are needed.
+
+Quick start
+-----------
+
+The following example shows the minimal moving parts: a Python Dag with two
stub tasks, and a Java
+implementation of those tasks.
+
+Python Dag (the scheduling side)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: python
+
+ from airflow.sdk import dag, task
+
+
+ @dag
+ def sales_pipeline():
+ @task.stub(queue="java")
+ def extract(): ...
+
+ @task.stub(queue="java")
+ def transform(extracted): ...
+
+ @task()
+ def load(transformed):
+ print(f"Loaded: {transformed}")
+
+ load(transform(extract()))
+
+
+ sales_pipeline()
+
+Java implementation
+~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: java
+
+ import org.apache.airflow.sdk.*;
+
+ @Builder.Dag(id = "sales_pipeline")
+ public class SalesPipeline {
+
+ @Builder.Task(id = "extract")
+ public long extract(Client client) {
+ var conn = client.getConnection("sales_db");
+ // ... fetch data using conn.host, conn.login, conn.password ...
+ return recordCount;
+ }
+
+ @Builder.Task(id = "transform")
+ public long transform(
+ Client client,
+ @Builder.XCom(task = "extract") long recordCount
+ ) {
+ var threshold = (String) client.getVariable("transform_threshold");
+ // ... process data ...
+ return transformedCount;
+ }
+ }
+
+.. note::
+
+ See how both ``transform`` in Python and Java need to have an argument to
accept upstream XCom. The
+ Python one is needed to declare dependency, and the Java one is needed to
actually retrieve the value.
+
+Java entry point
+~~~~~~~~~~~~~~~~
+
+.. code-block:: java
+
+ public class Main implements BundleBuilder {
+ @Override
+ public Iterable<Dag> getDags() {
+ return List.of(SalesPipelineBuilder.build()); // SalesPipelineBuilder
generated at compile time
+ }
+
+ public static void main(String[] args) {
+ Server.create(args).serve(new Main().build());
+ }
+ }
+
+Coordinator configuration
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: ini
+
+ [sdk]
+ coordinators = {
+ "java-jdk17": {
+ "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+ "kwargs": {"jars_root": ["/opt/airflow/jars"]}
+ }
+ }
+ queue_to_coordinator = {"java": "java-jdk17"}
+
+See :ref:`java-sdk/coordinator-config` for the full list of accepted
``kwargs``.
+
+Writing tasks
+-------------
+
+The Java SDK offers two APIs for implementing tasks. Both produce the same
runtime behavior; the choice is a
+matter of style.
+
+.. _java-sdk/annotation-api:
+
+Annotation-based API
+~~~~~~~~~~~~~~~~~~~~
+
+Annotate a plain Java class and let the SDK generate the boilerplate at
compile time.
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 70
+
+ * - Annotation
+ - Purpose
+ * - ``@Builder.Dag(id = "...")``
+ - Marks the class as a task container. The ``id`` must match the
``dag_id`` in the Python Dag.
+ * - ``@Builder.Task(id = "...")``
+ - Marks a method as a task implementation. The ``id`` must match the
``@task.stub`` function
+ name in the Python Dag. If ``id`` is omitted the method name is used.
+ * - ``@Builder.XCom(task = "...")``
+ - Injects the ``return_value`` XCom from the named upstream task as a
method parameter.
+ The parameter type must be compatible with the stored value (see
:ref:`java-sdk/types`).
+
+The annotation processor generates a ``<ClassName>Builder`` class that wires
up the task
+registry and handles XCom injection automatically.
+
+.. code-block:: java
+
+ @Builder.Dag(id = "my_dag")
+ public class MyDag {
+
+ @Builder.Task(id = "fetch")
+ public String fetch(Client client) throws Exception {
+ var conn = client.getConnection("my_api");
+ // implement task logic
+ return result;
+ }
+
+ @Builder.Task(id = "process")
+ public long process(
+ Client client,
+ @Builder.XCom(task = "fetch") String fetched
+ ) {
+ var threshold = (String) client.getVariable("process_threshold");
+ // implement task logic
+ return count;
+ }
+ }
+
+A task method may declare ``throws Exception``; any uncaught exception causes
the task instance to be marked
+as failed in Airflow (triggering retries if configured on the stub).
+
+.. _java-sdk/interface-api:
+
+Interface-based API
+~~~~~~~~~~~~~~~~~~~
+
+Implement the ``Task`` interface directly for full control over how tasks are
registered and how XComs are
+read.
+
+.. code-block:: java
+
+ import org.apache.airflow.sdk.*;
+
+ public class FetchTask implements Task {
+ @Override
+ public void execute(Context context, Client client) throws Exception {
+ var conn = client.getConnection("my_api");
+ // implement task logic
+ client.setXCom(result);
+ }
+ }
+
+Register tasks manually in a ``BundleBuilder``:
+
+.. code-block:: java
+
+ public class MyBundle implements BundleBuilder {
+ @Override
+ public Iterable<Dag> getDags() {
+ var dag = new Dag("my_dag");
+ dag.addTask("fetch", FetchTask.class);
+ dag.addTask("process", ProcessTask.class);
+ return List.of(dag);
+ }
+ }
+
+See the Java SDK's published JavaDoc for more details.
+
+.. TODO: (AIP-108) Put a link here once we publish the JavaDoc.
+
+.. _java-sdk/types:
+
+XCom type mapping
+-----------------
+
+XCom values are stored as JSON in Airflow's metadata database. The table
below shows how JSON types are
+represented as Java objects when read back via ``getXCom``.
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 35 35
+
+ * - Python type
+ - JSON
+ - Java type (from ``getXCom``)
+ * - ``int``
+ - number (integer)
+ - ``Long`` (for values that fit; ``BigInteger`` otherwise)
+ * - ``float``
+ - number (decimal)
+ - ``Double``
+ * - ``str``
+ - string
+ - ``String``
+ * - ``bool``
+ - boolean
+ - ``Boolean``
+ * - ``None``
+ - null
+ - ``null``
+ * - ``list``
+ - array
+ - ``List<Object>``
+ * - ``dict``
+ - object
+ - ``Map<String, Object>``
+
+.. _java-sdk/build:
+
+Building and packaging
+-----------------------
+
+The Java SDK is distributed as a JAR. Use any build tool; Gradle is shown here.
+
+**Gradle setup**
+
+Add the SDK dependency to your ``build.gradle.kts``:
+
+.. code-block:: kotlin
+
+ dependencies {
+ implementation("org.apache.airflow:airflow-java-sdk:<version>")
+ annotationProcessor("org.apache.airflow:airflow-java-sdk:<version>")
+ }
+
+ tasks.withType<Jar> {
+ manifest {
+ attributes("Main-Class" to "com.example.Main")
+ }
+ }
+
+.. note::
+
+ You only need the ``annotationProcessor`` entry if you use the
annotation-based API. It is not needed for
+ the interface-based API.
+
+.. note::
+
+ The ``Main-Class`` manifest value is needed for the coordinator to know how
to run the JAR. You can choose
+ to set this *on the coordinator itself* too by adding the ``main_class``
kwarg in coordinator configuration.
+
+Building a distribution
+~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: bash
+
+ ./gradlew :myproject:installDist
+
+The ``lib/`` directory of the resulting distribution contains all required
JARs. Copy or mount it into the
+directory pointed to by ``jars_root`` in the coordinator configuration.
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` scans ``jars_root``
+recursively and builds the classpath automatically.
+
+.. _java-sdk/coordinator-config:
+
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` configuration
+-------------------------------------------------------------------------------
+
+All ``kwargs`` in the ``coordinators`` config entry are passed to the
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` constructor:
+
+.. list-table::
+ :header-rows: 1
+ :widths: 30 15 55
+
+ * - Parameter
+ - Default
+ - Description
+ * - ``jars_root``
+ - *(required)*
+ - One or more directories scanned recursively for ``.jar`` files. Accepts
a string,
+ a path, or a list of strings/paths.
+ * - ``java_executable``
+ - ``"java"``
+ - Path to the ``java`` binary. Defaults to ``java`` on ``$PATH``.
+ * - ``jvm_args``
+ - ``[]``
+ - Extra JVM arguments such as ``["-Xmx1g", "-Dsome.property=value"]``.
+ * - ``main_class``
+ - *(auto-detect)*
+ - Explicit entry-point class. If omitted,
+ :class:`~airflow.sdk.coordinators.java.JavaCoordinator` scans
``jars_root`` for a
+ JAR whose manifest sets ``Main-Class``. If multiple executable JARs are
found the
+ result is non-deterministic; set ``main_class`` explicitly in that case.
+ * - ``task_startup_timeout``
+ - ``10.0``
+ - Seconds to wait for the JVM subprocess to connect after launch.
Increase this if your
+ JVM startup is slow (e.g. on constrained hardware or with a large
classpath).
+
+.. _java-sdk/limitations:
+
+Limitations
+-----------
+
+* **One JVM subprocess per task instance.** Each task instance spawns a fresh
JVM. Tasks that need to share
+ in-process state between instances should use XCom or an external store
instead.
+* **Limited support for assets, deferral, and other Airflow features.** They
may be implemented in the future
+ based on user feedback and demand.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 14a611c0f96..5d5b82c27bb 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -725,6 +725,7 @@ googleapis
GoogleDisplayVideo
gpu
gpus
+Gradle
Grafana
graphviz
Groq
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index c2bb0a19fbc..5be912803b3 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -254,6 +254,16 @@ collector interfaces used to capture and retrieve asset
lineage metadata during
.. autoapimodule:: airflow.sdk.lineage
+Coordinators
+------------
+
+Coordinators bridge the Airflow worker and a non-Python language runtime.
+See :doc:`airflow:authoring-and-scheduling/language-sdks/index` for a
conceptual overview.
+
+.. rubric:: JVM
+
+.. autoapiclass:: airflow.sdk.coordinators.java.JavaCoordinator
+
Execution Time Components
-------------------------
.. rubric:: Context