This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34d9bd14205 Add preliminary coordinator and Java SDK documentation 
(#67699)
34d9bd14205 is described below

commit 34d9bd14205e1c01793df78f01876cd83d0eeb9c
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Jun 2 12:15:57 2026 +0800

    Add preliminary coordinator and Java SDK documentation (#67699)
    
    Co-authored-by: Kaxil Naik <[email protected]>
---
 .../docs/authoring-and-scheduling/index.rst        |   1 +
 .../language-sdks/index.rst                        | 162 ++++++++++
 .../language-sdks/java.rst                         | 360 +++++++++++++++++++++
 docs/spelling_wordlist.txt                         |   1 +
 task-sdk/docs/api.rst                              |  10 +
 5 files changed, 534 insertions(+)

diff --git a/airflow-core/docs/authoring-and-scheduling/index.rst 
b/airflow-core/docs/authoring-and-scheduling/index.rst
index cbcac076063..fa7867ebf44 100644
--- a/airflow-core/docs/authoring-and-scheduling/index.rst
+++ b/airflow-core/docs/authoring-and-scheduling/index.rst
@@ -33,6 +33,7 @@ It's recommended that you first review the pages in 
:doc:`core concepts </core-c
     connections
     dynamic-task-mapping
     assets
+    Non-Python Task SDKs <language-sdks/index>
 
 .. _scheduling-section:
 
diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst 
b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst
new file mode 100644
index 00000000000..ec7a4d88cc1
--- /dev/null
+++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst
@@ -0,0 +1,162 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _language-sdks:
+
+Non-Python Task SDKs
+====================
+
+|experimental|
+
+Airflow Dags are always defined in Python, but individual task 
*implementations* can be written in other
+languages. When a task runs, Airflow's worker calls out the target language to 
execute the task logic, which
+communicates the result back. The Dag author uses a lightweight Python *stub* 
to declare where the task lives;
+everything else, including the actual business logic, Airflow API calls, and 
any library dependencies, lives
+in a non-Python implementation.
+
+.. list-table:: Available language SDKs
+   :header-rows: 1
+   :widths: 15 40 15 30
+
+   * - Language
+     - Coordinator class
+     - Min. runtime
+     - Guide
+   * - JVM languages (e.g. Java)
+     - :class:`task-sdk:airflow.sdk.coordinators.java.JavaCoordinator`
+     - JRE 17
+     - :doc:`java`
+
+.. toctree::
+   :hidden:
+
+   java
+
+How it works
+------------
+
+The execution model has three moving parts.
+
+**Stub tasks in the Dag**
+   The Dag file declares tasks using :func:`@task.stub 
<airflow.sdk.task.stub>`. A stub is a normal Airflow
+   task from the scheduler's perspective. It participates in dependencies, 
retries, pools, and all other
+   task-level features exactly like other ``@task``-decorated Python 
functions. The only difference is that
+   the worker does not execute the Python code inside the function definition; 
instead it delegates execution
+   to a *coordinator*.
+
+**Coordinators**
+   A coordinator is a Python object registered in the ``[sdk] coordinators`` 
configuration. This is considered
+   a part of an Airflow worker. When the worker picks up a stub task, it looks 
up the coordinator mapped to
+   that task's specified ``queue``, and uses the coordinator to execute the 
task. The coordinator is
+   responsible for managing the target language's runtime, forwarding messages 
from, and relaying results back
+   to Airflow. All coordinators extend
+   :class:`task-sdk:airflow.sdk.execution_time.coordinator.BaseCoordinator`.
+
+**Language runtime**
+   The coordinator calls out one short-lived runtime per task instance. In 
most cases, this would be a
+   subprocess of an executable implemented in a non-Python language. The 
runtime receives messages from the
+   worker to identify the workload, executes the task, and communicates 
through the coordinator as a proxy
+   back to the worker process.
+
+.. _language-sdks/stub-tasks:
+
+Stub tasks
+----------
+
+A stub task is declared with the :func:`@task.stub <airflow.sdk.task.stub>` 
decorator. Since it is still a
+Python task declaration, every parameter available on a normal Dag or task 
applies. Task dependencies are also
+defined in the Python Dag file. The scheduler treats a stub like any other 
task.
+
+.. code-block:: python
+
+    import datetime
+
+    from airflow.sdk import dag, task
+
+
+    @dag
+    def my_pipeline():
+        raw = fetch_data()  # normal Python task
+
+        @task.stub(
+            queue="java",  # routes to the JavaCoordinator
+            retries=3,
+            retry_delay=datetime.timedelta(minutes=5),
+            execution_timeout=datetime.timedelta(hours=1),
+            pool="heavy_tasks",
+        )
+        def process(raw_value): ...  # implemented in Java
+
+        @task.stub(queue="java")
+        def export(processed_value): ...
+
+        export(process(raw))
+
+
+    my_pipeline()
+
+The ``queue`` parameter determines which coordinator handles the task. Any 
other ``@task`` keyword argument is
+stored on the task instance and honored by Airflow's scheduler and worker as 
usual.
+
+XCom values produced by a stub task are visible to downstream Python tasks and 
vice-versa. However, although
+XCom references should be defined inside the Python Dag (they are task 
dependencies), you still need to
+actually read the values out in the language implementation, and vice versa. 
See specific language SDK
+documentation on how to do this correctly.
+
+.. _language-sdks/coordinator-config:
+
+Coordinator configuration
+-------------------------
+
+Coordinators are registered in ``airflow.cfg`` (or via environment variables) 
under ``[sdk]``.
+
+``coordinators``
+    A JSON object mapping a logical coordinator name to its class and keyword 
arguments:
+
+    .. code-block:: ini
+
+        [sdk]
+        coordinators = {
+            "my-coordinator": {
+                "classpath": "path.to.CoordinatorClass",
+                "kwargs": {}
+            }
+        }
+
+    The ``classpath`` value must be importable by the worker.  The ``kwargs`` 
are passed directly
+    to the coordinator's constructor.  See the language-specific guide for the 
accepted kwargs
+    of each coordinator (e.g. :ref:`java-sdk/coordinator-config` for
+    :class:`~airflow.sdk.coordinators.java.JavaCoordinator`).
+
+``queue_to_coordinator``
+    A JSON object mapping Celery queue names to coordinator names:
+
+    .. code-block:: ini
+
+        [sdk]
+        queue_to_coordinator = {"jdk17": "my-coordinator"}
+
+    Tasks with ``queue="jdk17"`` on their stub will be dispatched to the 
coordinator named
+    ``"my-coordinator"``.  A single coordinator can serve multiple queues; a 
queue can only
+    map to one coordinator.
+
+Both settings can be supplied as environment variables using the standard 
Airflow convention:
+
+.. code-block:: bash
+
+    AIRFLOW__SDK__COORDINATORS='{"my-coordinator": {...}}'
+    AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"jdk17": "my-coordinator"}'
diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst 
b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
new file mode 100644
index 00000000000..a752bee8017
--- /dev/null
+++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
@@ -0,0 +1,360 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _java-sdk:
+
+Java SDK
+========
+
+|experimental|
+
+The Java SDK lets you implement Airflow task logic in Java, Kotlin, or any 
other JVM language. The Dag and its
+scheduling remain in Python; individual tasks delegate to a JVM subprocess 
that is spawned by
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` for each task instance.
+
+.. contents:: Contents
+   :local:
+   :depth: 2
+
+Prerequisites
+-------------
+
+* JRE 17 or later must be available on the Airflow worker nodes.
+* The compiled task JAR(s) and JVM dependencies must be accessible from the 
worker.
+* The ``apache-airflow-task-sdk`` package (installed with Airflow) provides 
the coordinator;
+  no additional Python packages are needed.
+
+Quick start
+-----------
+
+The following example shows the minimal moving parts: a Python Dag with two 
stub tasks, and a Java
+implementation of those tasks.
+
+Python Dag (the scheduling side)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: python
+
+    from airflow.sdk import dag, task
+
+
+    @dag
+    def sales_pipeline():
+        @task.stub(queue="java")
+        def extract(): ...
+
+        @task.stub(queue="java")
+        def transform(extracted): ...
+
+        @task()
+        def load(transformed):
+            print(f"Loaded: {transformed}")
+
+        load(transform(extract()))
+
+
+    sales_pipeline()
+
+Java implementation
+~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: java
+
+    import org.apache.airflow.sdk.*;
+
+    @Builder.Dag(id = "sales_pipeline")
+    public class SalesPipeline {
+
+      @Builder.Task(id = "extract")
+      public long extract(Client client) {
+        var conn = client.getConnection("sales_db");
+        // ... fetch data using conn.host, conn.login, conn.password ...
+        return recordCount;
+      }
+
+      @Builder.Task(id = "transform")
+      public long transform(
+        Client client,
+        @Builder.XCom(task = "extract") long recordCount
+      ) {
+        var threshold = (String) client.getVariable("transform_threshold");
+        // ... process data ...
+        return transformedCount;
+      }
+    }
+
+.. note::
+
+  See how both ``transform`` in Python and Java need to have an argument to 
accept upstream XCom. The
+  Python one is needed to declare dependency, and the Java one is needed to 
actually retrieve the value.
+
+Java entry point
+~~~~~~~~~~~~~~~~
+
+.. code-block:: java
+
+    public class Main implements BundleBuilder {
+      @Override
+      public Iterable<Dag> getDags() {
+        return List.of(SalesPipelineBuilder.build());  // SalesPipelineBuilder 
generated at compile time
+      }
+
+      public static void main(String[] args) {
+        Server.create(args).serve(new Main().build());
+      }
+    }
+
+Coordinator configuration
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: ini
+
+    [sdk]
+    coordinators = {
+      "java-jdk17": {
+        "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+        "kwargs": {"jars_root": ["/opt/airflow/jars"]}
+      }
+    }
+    queue_to_coordinator = {"java": "java-jdk17"}
+
+See :ref:`java-sdk/coordinator-config` for the full list of accepted 
``kwargs``.
+
+Writing tasks
+-------------
+
+The Java SDK offers two APIs for implementing tasks. Both produce the same 
runtime behavior; the choice is a
+matter of style.
+
+.. _java-sdk/annotation-api:
+
+Annotation-based API
+~~~~~~~~~~~~~~~~~~~~
+
+Annotate a plain Java class and let the SDK generate the boilerplate at 
compile time.
+
+.. list-table::
+   :header-rows: 1
+   :widths: 30 70
+
+   * - Annotation
+     - Purpose
+   * - ``@Builder.Dag(id = "...")``
+     - Marks the class as a task container.  The ``id`` must match the 
``dag_id`` in the Python Dag.
+   * - ``@Builder.Task(id = "...")``
+     - Marks a method as a task implementation.  The ``id`` must match the 
``@task.stub`` function
+       name in the Python Dag.  If ``id`` is omitted the method name is used.
+   * - ``@Builder.XCom(task = "...")``
+     - Injects the ``return_value`` XCom from the named upstream task as a 
method parameter.
+       The parameter type must be compatible with the stored value (see 
:ref:`java-sdk/types`).
+
+The annotation processor generates a ``<ClassName>Builder`` class that wires 
up the task
+registry and handles XCom injection automatically.
+
+.. code-block:: java
+
+    @Builder.Dag(id = "my_dag")
+    public class MyDag {
+
+      @Builder.Task(id = "fetch")
+      public String fetch(Client client) throws Exception {
+        var conn = client.getConnection("my_api");
+        // implement task logic
+        return result;
+      }
+
+      @Builder.Task(id = "process")
+      public long process(
+        Client client,
+        @Builder.XCom(task = "fetch") String fetched
+      ) {
+        var threshold = (String) client.getVariable("process_threshold");
+        // implement task logic
+        return count;
+      }
+    }
+
+A task method may declare ``throws Exception``; any uncaught exception causes 
the task instance to be marked
+as failed in Airflow (triggering retries if configured on the stub).
+
+.. _java-sdk/interface-api:
+
+Interface-based API
+~~~~~~~~~~~~~~~~~~~
+
+Implement the ``Task`` interface directly for full control over how tasks are 
registered and how XComs are
+read.
+
+.. code-block:: java
+
+    import org.apache.airflow.sdk.*;
+
+    public class FetchTask implements Task {
+      @Override
+      public void execute(Context context, Client client) throws Exception {
+        var conn = client.getConnection("my_api");
+        // implement task logic
+        client.setXCom(result);
+      }
+    }
+
+Register tasks manually in a ``BundleBuilder``:
+
+.. code-block:: java
+
+    public class MyBundle implements BundleBuilder {
+      @Override
+      public Iterable<Dag> getDags() {
+        var dag = new Dag("my_dag");
+        dag.addTask("fetch", FetchTask.class);
+        dag.addTask("process", ProcessTask.class);
+        return List.of(dag);
+      }
+    }
+
+See the Java SDK's published JavaDoc for more details.
+
+.. TODO: (AIP-108) Put a link here once we publish the JavaDoc.
+
+.. _java-sdk/types:
+
+XCom type mapping
+-----------------
+
+XCom values are stored as JSON in Airflow's metadata database.  The table 
below shows how JSON types are
+represented as Java objects when read back via ``getXCom``.
+
+.. list-table::
+   :header-rows: 1
+   :widths: 30 35 35
+
+   * - Python type
+     - JSON
+     - Java type (from ``getXCom``)
+   * - ``int``
+     - number (integer)
+     - ``Long`` (for values that fit; ``BigInteger`` otherwise)
+   * - ``float``
+     - number (decimal)
+     - ``Double``
+   * - ``str``
+     - string
+     - ``String``
+   * - ``bool``
+     - boolean
+     - ``Boolean``
+   * - ``None``
+     - null
+     - ``null``
+   * - ``list``
+     - array
+     - ``List<Object>``
+   * - ``dict``
+     - object
+     - ``Map<String, Object>``
+
+.. _java-sdk/build:
+
+Building and packaging
+-----------------------
+
+The Java SDK is distributed as a JAR. Use any build tool; Gradle is shown here.
+
+**Gradle setup**
+
+Add the SDK dependency to your ``build.gradle.kts``:
+
+.. code-block:: kotlin
+
+    dependencies {
+      implementation("org.apache.airflow:airflow-java-sdk:<version>")
+      annotationProcessor("org.apache.airflow:airflow-java-sdk:<version>")
+    }
+
+    tasks.withType<Jar> {
+      manifest {
+        attributes("Main-Class" to "com.example.Main")
+      }
+    }
+
+.. note::
+
+  You only need the ``annotationProcessor`` entry if you use the 
annotation-based API. It is not needed for
+  the interface-based API.
+
+.. note::
+
+  The ``Main-Class`` manifest value is needed for the coordinator to know how 
to run the JAR. You can choose
+  to set this *on the coordinator itself* too by adding the ``main_class`` 
kwarg in coordinator configuration.
+
+Building a distribution
+~~~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: bash
+
+    ./gradlew :myproject:installDist
+
+The ``lib/`` directory of the resulting distribution contains all required 
JARs. Copy or mount it into the
+directory pointed to by ``jars_root`` in the coordinator configuration.
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` scans ``jars_root``
+recursively and builds the classpath automatically.
+
+.. _java-sdk/coordinator-config:
+
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` configuration
+-------------------------------------------------------------------------------
+
+All ``kwargs`` in the ``coordinators`` config entry are passed to the
+:class:`~airflow.sdk.coordinators.java.JavaCoordinator` constructor:
+
+.. list-table::
+   :header-rows: 1
+   :widths: 30 15 55
+
+   * - Parameter
+     - Default
+     - Description
+   * - ``jars_root``
+     - *(required)*
+     - One or more directories scanned recursively for ``.jar`` files. Accepts 
a string,
+       a path, or a list of strings/paths.
+   * - ``java_executable``
+     - ``"java"``
+     - Path to the ``java`` binary.  Defaults to ``java`` on ``$PATH``.
+   * - ``jvm_args``
+     - ``[]``
+     - Extra JVM arguments such as ``["-Xmx1g", "-Dsome.property=value"]``.
+   * - ``main_class``
+     - *(auto-detect)*
+     - Explicit entry-point class. If omitted,
+       :class:`~airflow.sdk.coordinators.java.JavaCoordinator` scans 
``jars_root`` for a
+       JAR whose manifest sets ``Main-Class``. If multiple executable JARs are 
found the
+       result is non-deterministic; set ``main_class`` explicitly in that case.
+   * - ``task_startup_timeout``
+     - ``10.0``
+     - Seconds to wait for the JVM subprocess to connect after launch.  
Increase this if your
+       JVM startup is slow (e.g. on constrained hardware or with a large 
classpath).
+
+.. _java-sdk/limitations:
+
+Limitations
+-----------
+
+* **One JVM subprocess per task instance.**  Each task instance spawns a fresh 
JVM. Tasks that need to share
+  in-process state between instances should use XCom or an external store 
instead.
+* **Limited support for assets, deferral, and other Airflow features.** They 
may be implemented in the future
+  based on user feedback and demand.
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 14a611c0f96..5d5b82c27bb 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -725,6 +725,7 @@ googleapis
 GoogleDisplayVideo
 gpu
 gpus
+Gradle
 Grafana
 graphviz
 Groq
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index c2bb0a19fbc..5be912803b3 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -254,6 +254,16 @@ collector interfaces used to capture and retrieve asset 
lineage metadata during
 
 .. autoapimodule:: airflow.sdk.lineage
 
+Coordinators
+------------
+
+Coordinators bridge the Airflow worker and a non-Python language runtime.
+See :doc:`airflow:authoring-and-scheduling/language-sdks/index` for a 
conceptual overview.
+
+.. rubric:: JVM
+
+.. autoapiclass:: airflow.sdk.coordinators.java.JavaCoordinator
+
 Execution Time Components
 -------------------------
 .. rubric:: Context

Reply via email to