This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 07a7a5297d6 Java-SDK: support UP_FOR_RETRY in coordinator mode with 
e2e coverage (#68555)
07a7a5297d6 is described below

commit 07a7a5297d696c8aaaa0c929b9004b5f03621e75
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 16 07:42:46 2026 +0800

    Java-SDK: support UP_FOR_RETRY in coordinator mode with e2e coverage 
(#68555)
    
    * Fix Java SDK not retrying failed tasks
    
    * Verify Java SDK UP_FOR_RETRY end-to-end in e2e bundle
    
    Make the Java example bundle's 'load' task fail on its first attempt and
    succeed on the retry (retries=1), wire it into the annotation example Dag,
    and assert end-to-end that the task passes through UP_FOR_RETRY and ends
    success on try_number 2. This exercises the RetryTask path the Java
    coordinator now returns.
    
    * Add RetryTask reference in contributing-docs/30_new_language_sdk.rst
    
    * Split load and python_task_2 dep syntax
    
    * Simplify failed-or-retry conditional
    
    * Fix CI error
    
    ---------
    
    Co-authored-by: Arnav <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 .../java_sdk_tests/test_java_sdk_dag.py            | 44 +++++++++++++++++++++-
 contributing-docs/30_new_language_sdk.rst          |  4 ++
 .../apache/airflow/example/AnnotationExample.java  | 12 +++++-
 .../example/src/resources/dags/java_examples.py    |  7 ++++
 .../org/apache/airflow/sdk/execution/Task.kt       | 15 +++++++-
 .../org/apache/airflow/sdk/execution/TaskTest.kt   | 11 ++++++
 6 files changed, 88 insertions(+), 5 deletions(-)

diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
index 0e99a399f2d..b1e04fb2dde 100644
--- 
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
@@ -26,7 +26,7 @@ What is verified
 The test triggers the ``java_annotation_example`` Dag, which has this task
 graph::
 
-    python_task_1 >> extract >> transform >> python_task_2
+    python_task_1 >> extract >> transform >> [load, python_task_2]
 
 * ``extract`` and ``transform`` are ``@task.stub(queue="java")`` stubs whose
   implementations live in ``AnnotationExample.java``.  Both run via
@@ -35,8 +35,11 @@ graph::
   connection, and returns a timestamp (long).
 * ``transform`` reads the XCom from ``extract``, fetches the ``my_variable``
   Airflow variable, and returns a timestamp (long).
+* ``load`` (``retries=1``) reads the XCom from ``transform``, throws on its
+  first attempt and returns normally on the retry, exercising the UP_FOR_RETRY
+  path through the Java coordinator.
 
-The test asserts that both Java task instances reach state ``success``, which
+The test asserts that the Java task instances reach state ``success``, which
 confirms:
 
 1. ``JavaCoordinator`` correctly discovers and launches the JVM JAR.
@@ -44,6 +47,9 @@ confirms:
    ``StartupDetails`` and the task result (``SucceedTask``/``TaskState``).
 3. XCom reads and API calls (getXCom, getConnection, getVariable) work
    end-to-end through the Task Execution API.
+4. A Java task that throws with retries left returns ``RetryTask`` rather than 
a
+   terminal ``FAILED``, so the supervisor marks it UP_FOR_RETRY and re-runs it;
+   ``load`` therefore ends ``success`` on its second attempt (try_number 2).
 """
 
 from __future__ import annotations
@@ -126,3 +132,37 @@ class TestJavaSDKAnnotationExample:
         assert value > 0, (
             f"Expected 'transform' XCom to be a positive integer (millisecond 
timestamp), got {value!r}"
         )
+
+    def test_load_retried_then_succeeded(self):
+        """``load`` fails once (UP_FOR_RETRY) then succeeds on the second 
attempt.
+
+        The Java coordinator must return ``RetryTask`` (not terminal 
``FAILED``)
+        when the task throws with retries left, so the supervisor re-runs it. 
The
+        end state is ``success`` reached on ``try_number`` 2.
+        """
+        resp = self.airflow_client.trigger_dag(
+            "java_annotation_example",
+            json={"logical_date": datetime.now(timezone.utc).isoformat()},
+        )
+        run_id = resp["dag_run_id"]
+
+        dag_state = self.airflow_client.wait_for_dag_run(
+            dag_id="java_annotation_example",
+            run_id=run_id,
+            timeout=_JAVA_TASK_TIMEOUT,
+        )
+
+        ti_resp = 
self.airflow_client.get_task_instances(dag_id="java_annotation_example", 
run_id=run_id)
+        ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", 
[])}
+        load_ti = ti_map.get("load", {})
+
+        assert load_ti.get("state") == "success", (
+            f"Java 'load' task should succeed on retry.\n"
+            f"  task state : {load_ti.get('state')!r}\n"
+            f"  dag state  : {dag_state!r}\n"
+            f"  all tasks  : { {k: v.get('state') for k, v in ti_map.items()} 
}"
+        )
+        assert load_ti.get("try_number") == 2, (
+            f"Java 'load' task should have run twice (fail then retry); "
+            f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}"
+        )
diff --git a/contributing-docs/30_new_language_sdk.rst 
b/contributing-docs/30_new_language_sdk.rst
index 603231ef883..e83194b6e45 100644
--- a/contributing-docs/30_new_language_sdk.rst
+++ b/contributing-docs/30_new_language_sdk.rst
@@ -242,6 +242,10 @@ Error handling
 
 * If the task raises an unhandled error, the SDK MUST send a ``TaskState``
   message with ``"state": "failed"`` before closing the comm socket.
+* If the task fails but still has retries left, the SDK MUST send a
+  ``RetryTask`` message instead so the supervisor moves the task to
+  ``up_for_retry``. Field names MUST match the Supervisor Schema exactly — the
+  failure detail key is ``retry_reason``, not ``reason``.
 * If the process exits without sending a terminal message, the supervisor marks
   the task instance ``failed`` based on the abnormal exit, but the task log may
   be incomplete.
diff --git 
a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java 
b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
index 9c8fc0b8696..3915d8e5c38 100644
--- 
a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
+++ 
b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
@@ -59,9 +59,17 @@ public class AnnotationExample {
     return new Date().getTime();
   }
 
+  // load fails on its first attempt and succeeds on the retry. With retries
+  // configured on the stub task, the first failure makes the supervisor mark
+  // the task UP_FOR_RETRY -- which only works because the Java SDK now returns
+  // RetryTask (instead of a terminal FAILED) when ti_context.should_retry is
+  // set. The retry then runs this task again and it returns normally.
   @Builder.Task
-  public void load(@Builder.XCom(task = "transform") long transformed) {
+  public void load(Context context, @Builder.XCom(task = "transform") long 
transformed) {
     logger.info("Got XCom from 'transform' {}", transformed);
-    throw new RuntimeException("I failed");
+    if (context.ti.tryNumber == 1) {
+      throw new RuntimeException("I failed");
+    }
+    logger.info("Recovered on retry, try number {}", context.ti.tryNumber);
   }
 }
diff --git a/java-sdk/example/src/resources/dags/java_examples.py 
b/java-sdk/example/src/resources/dags/java_examples.py
index a85bbce3056..e3d217e8eaa 100644
--- a/java-sdk/example/src/resources/dags/java_examples.py
+++ b/java-sdk/example/src/resources/dags/java_examples.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from datetime import timedelta
+
 from airflow.sdk import dag, task
 
 
@@ -35,6 +37,10 @@ def extract(): ...
 def transform(): ...
 
 
[email protected](queue="java", retries=1, retry_delay=timedelta(seconds=5))
+def load(): ...
+
+
 @task()
 def python_task_2(transformed):
     print("python_task_2")
@@ -54,6 +60,7 @@ def java_annotation_example():
     transformed = transform()
     python_task_1() >> extract() >> transformed
     python_task_2(transformed)
+    transformed >> load()
 
 
 java_interface_example()
diff --git 
a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt 
b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
index 70e093a288f..8e26af77fee 100644
--- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
@@ -23,6 +23,7 @@ import org.apache.airflow.sdk.Bundle
 import org.apache.airflow.sdk.Client
 import org.apache.airflow.sdk.Context
 import org.apache.airflow.sdk.execution.comm.AssetProfile
+import org.apache.airflow.sdk.execution.comm.RetryTask
 import org.apache.airflow.sdk.execution.comm.StartupDetails
 import org.apache.airflow.sdk.execution.comm.SucceedTask
 import org.apache.airflow.sdk.execution.comm.TaskState
@@ -42,6 +43,14 @@ internal object TaskResult {
     it.renderedMapIndex = renderedMapIndex
   }
 
+  fun retry(
+    endDate: OffsetDateTime = OffsetDateTime.now(),
+    renderedMapIndex: String? = null,
+  ) = RetryTask().also {
+    it.endDate = endDate
+    it.renderedMapIndex = renderedMapIndex
+  }
+
   fun of(
     state: TaskState.State,
     endDate: OffsetDateTime = OffsetDateTime.now(),
@@ -68,7 +77,11 @@ internal object TaskRunner {
     } catch (e: Exception) {
       logger.error("Error executing task", mapOf("ti" to request.ti, "error" 
to e, "trace" to e.stackTraceToString()))
       e.printStackTrace()
-      TaskResult.of(TaskState.State.FAILED)
+      if (request.tiContext.shouldRetry) {
+        TaskResult.retry()
+      } else {
+        TaskResult.of(TaskState.State.FAILED)
+      }
     }
   }
 }
diff --git 
a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt 
b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
index 0542516228d..27d618e3c67 100644
--- a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
+++ b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
@@ -26,6 +26,7 @@ import org.apache.airflow.sdk.Dag
 import org.apache.airflow.sdk.Task
 import org.apache.airflow.sdk.execution.comm.BundleInfo
 import org.apache.airflow.sdk.execution.comm.DagRun
+import org.apache.airflow.sdk.execution.comm.RetryTask
 import org.apache.airflow.sdk.execution.comm.StartupDetails
 import org.apache.airflow.sdk.execution.comm.SucceedTask
 import org.apache.airflow.sdk.execution.comm.TIRunContext
@@ -64,6 +65,16 @@ class TaskTest {
     Assertions.assertEquals(TaskState.State.FAILED, (result as 
TaskState).state)
   }
 
+  @Test
+  @DisplayName("Should return retry when task throws and should_retry is true")
+  fun shouldReturnRetryWhenTaskThrowsAndShouldRetryIsTrue() {
+    val details = startupDetails(taskId = "failing")
+    details.tiContext?.shouldRetry = true
+    val result = runTask(bundleWith("failing", FailingTask::class.java), 
details, noOpClient())
+
+    Assertions.assertInstanceOf(RetryTask::class.java, result)
+  }
+
   private fun bundleWith(
     taskId: String,
     taskClass: Class<out Task>,

Reply via email to