This is an automated email from the ASF dual-hosted git repository.
jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 07a7a5297d6 Java-SDK: support UP_FOR_RETRY in coordinator mode with
e2e coverage (#68555)
07a7a5297d6 is described below
commit 07a7a5297d696c8aaaa0c929b9004b5f03621e75
Author: Jason(Zhe-You) Liu <[email protected]>
AuthorDate: Tue Jun 16 07:42:46 2026 +0800
Java-SDK: support UP_FOR_RETRY in coordinator mode with e2e coverage
(#68555)
* Fix Java SDK not retrying failed tasks
* Verify Java SDK UP_FOR_RETRY end-to-end in e2e bundle
Make the Java example bundle's 'load' task fail on its first attempt and
succeed on the retry (retries=1), wire it into the annotation example Dag,
and assert end-to-end that the task passes through UP_FOR_RETRY and ends
success on try_number 2. This exercises the RetryTask path the Java
coordinator now returns.
* Add RetryTask reference in contributing-docs/30_new_language_sdk.rst
* Split load and python_task_2 dep syntax
* Simplify failed-or-retry conditional
* Fix CI error
---------
Co-authored-by: Arnav <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
.../java_sdk_tests/test_java_sdk_dag.py | 44 +++++++++++++++++++++-
contributing-docs/30_new_language_sdk.rst | 4 ++
.../apache/airflow/example/AnnotationExample.java | 12 +++++-
.../example/src/resources/dags/java_examples.py | 7 ++++
.../org/apache/airflow/sdk/execution/Task.kt | 15 +++++++-
.../org/apache/airflow/sdk/execution/TaskTest.kt | 11 ++++++
6 files changed, 88 insertions(+), 5 deletions(-)
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
index 0e99a399f2d..b1e04fb2dde 100644
---
a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py
@@ -26,7 +26,7 @@ What is verified
The test triggers the ``java_annotation_example`` Dag, which has this task
graph::
- python_task_1 >> extract >> transform >> python_task_2
+ python_task_1 >> extract >> transform >> [load, python_task_2]
* ``extract`` and ``transform`` are ``@task.stub(queue="java")`` stubs whose
implementations live in ``AnnotationExample.java``. Both run via
@@ -35,8 +35,11 @@ graph::
connection, and returns a timestamp (long).
* ``transform`` reads the XCom from ``extract``, fetches the ``my_variable``
Airflow variable, and returns a timestamp (long).
+* ``load`` (``retries=1``) reads the XCom from ``transform``, throws on its
+ first attempt and returns normally on the retry, exercising the UP_FOR_RETRY
+ path through the Java coordinator.
-The test asserts that both Java task instances reach state ``success``, which
+The test asserts that the Java task instances reach state ``success``, which
confirms:
1. ``JavaCoordinator`` correctly discovers and launches the JVM JAR.
@@ -44,6 +47,9 @@ confirms:
``StartupDetails`` and the task result (``SucceedTask``/``TaskState``).
3. XCom reads and API calls (getXCom, getConnection, getVariable) work
end-to-end through the Task Execution API.
+4. A Java task that throws with retries left returns ``RetryTask`` rather than
a
+ terminal ``FAILED``, so the supervisor marks it UP_FOR_RETRY and re-runs it;
+ ``load`` therefore ends ``success`` on its second attempt (try_number 2).
"""
from __future__ import annotations
@@ -126,3 +132,37 @@ class TestJavaSDKAnnotationExample:
assert value > 0, (
f"Expected 'transform' XCom to be a positive integer (millisecond
timestamp), got {value!r}"
)
+
+ def test_load_retried_then_succeeded(self):
+ """``load`` fails once (UP_FOR_RETRY) then succeeds on the second
attempt.
+
+ The Java coordinator must return ``RetryTask`` (not terminal
``FAILED``)
+ when the task throws with retries left, so the supervisor re-runs it.
The
+ end state is ``success`` reached on ``try_number`` 2.
+ """
+ resp = self.airflow_client.trigger_dag(
+ "java_annotation_example",
+ json={"logical_date": datetime.now(timezone.utc).isoformat()},
+ )
+ run_id = resp["dag_run_id"]
+
+ dag_state = self.airflow_client.wait_for_dag_run(
+ dag_id="java_annotation_example",
+ run_id=run_id,
+ timeout=_JAVA_TASK_TIMEOUT,
+ )
+
+ ti_resp =
self.airflow_client.get_task_instances(dag_id="java_annotation_example",
run_id=run_id)
+ ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances",
[])}
+ load_ti = ti_map.get("load", {})
+
+ assert load_ti.get("state") == "success", (
+ f"Java 'load' task should succeed on retry.\n"
+ f" task state : {load_ti.get('state')!r}\n"
+ f" dag state : {dag_state!r}\n"
+ f" all tasks : { {k: v.get('state') for k, v in ti_map.items()}
}"
+ )
+ assert load_ti.get("try_number") == 2, (
+ f"Java 'load' task should have run twice (fail then retry); "
+ f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}"
+ )
diff --git a/contributing-docs/30_new_language_sdk.rst
b/contributing-docs/30_new_language_sdk.rst
index 603231ef883..e83194b6e45 100644
--- a/contributing-docs/30_new_language_sdk.rst
+++ b/contributing-docs/30_new_language_sdk.rst
@@ -242,6 +242,10 @@ Error handling
* If the task raises an unhandled error, the SDK MUST send a ``TaskState``
message with ``"state": "failed"`` before closing the comm socket.
+* If the task fails but still has retries left, the SDK MUST send a
+ ``RetryTask`` message instead so the supervisor moves the task to
+ ``up_for_retry``. Field names MUST match the Supervisor Schema exactly — the
+ failure detail key is ``retry_reason``, not ``reason``.
* If the process exits without sending a terminal message, the supervisor marks
the task instance ``failed`` based on the abnormal exit, but the task log may
be incomplete.
diff --git
a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
index 9c8fc0b8696..3915d8e5c38 100644
---
a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
+++
b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java
@@ -59,9 +59,17 @@ public class AnnotationExample {
return new Date().getTime();
}
+ // load fails on its first attempt and succeeds on the retry. With retries
+ // configured on the stub task, the first failure makes the supervisor mark
+ // the task UP_FOR_RETRY -- which only works because the Java SDK now returns
+ // RetryTask (instead of a terminal FAILED) when ti_context.should_retry is
+ // set. The retry then runs this task again and it returns normally.
@Builder.Task
- public void load(@Builder.XCom(task = "transform") long transformed) {
+ public void load(Context context, @Builder.XCom(task = "transform") long
transformed) {
logger.info("Got XCom from 'transform' {}", transformed);
- throw new RuntimeException("I failed");
+ if (context.ti.tryNumber == 1) {
+ throw new RuntimeException("I failed");
+ }
+ logger.info("Recovered on retry, try number {}", context.ti.tryNumber);
}
}
diff --git a/java-sdk/example/src/resources/dags/java_examples.py
b/java-sdk/example/src/resources/dags/java_examples.py
index a85bbce3056..e3d217e8eaa 100644
--- a/java-sdk/example/src/resources/dags/java_examples.py
+++ b/java-sdk/example/src/resources/dags/java_examples.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from datetime import timedelta
+
from airflow.sdk import dag, task
@@ -35,6 +37,10 @@ def extract(): ...
def transform(): ...
[email protected](queue="java", retries=1, retry_delay=timedelta(seconds=5))
+def load(): ...
+
+
@task()
def python_task_2(transformed):
print("python_task_2")
@@ -54,6 +60,7 @@ def java_annotation_example():
transformed = transform()
python_task_1() >> extract() >> transformed
python_task_2(transformed)
+ transformed >> load()
java_interface_example()
diff --git
a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
index 70e093a288f..8e26af77fee 100644
--- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
+++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Task.kt
@@ -23,6 +23,7 @@ import org.apache.airflow.sdk.Bundle
import org.apache.airflow.sdk.Client
import org.apache.airflow.sdk.Context
import org.apache.airflow.sdk.execution.comm.AssetProfile
+import org.apache.airflow.sdk.execution.comm.RetryTask
import org.apache.airflow.sdk.execution.comm.StartupDetails
import org.apache.airflow.sdk.execution.comm.SucceedTask
import org.apache.airflow.sdk.execution.comm.TaskState
@@ -42,6 +43,14 @@ internal object TaskResult {
it.renderedMapIndex = renderedMapIndex
}
+ fun retry(
+ endDate: OffsetDateTime = OffsetDateTime.now(),
+ renderedMapIndex: String? = null,
+ ) = RetryTask().also {
+ it.endDate = endDate
+ it.renderedMapIndex = renderedMapIndex
+ }
+
fun of(
state: TaskState.State,
endDate: OffsetDateTime = OffsetDateTime.now(),
@@ -68,7 +77,11 @@ internal object TaskRunner {
} catch (e: Exception) {
logger.error("Error executing task", mapOf("ti" to request.ti, "error"
to e, "trace" to e.stackTraceToString()))
e.printStackTrace()
- TaskResult.of(TaskState.State.FAILED)
+ if (request.tiContext.shouldRetry) {
+ TaskResult.retry()
+ } else {
+ TaskResult.of(TaskState.State.FAILED)
+ }
}
}
}
diff --git
a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
index 0542516228d..27d618e3c67 100644
--- a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
+++ b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/TaskTest.kt
@@ -26,6 +26,7 @@ import org.apache.airflow.sdk.Dag
import org.apache.airflow.sdk.Task
import org.apache.airflow.sdk.execution.comm.BundleInfo
import org.apache.airflow.sdk.execution.comm.DagRun
+import org.apache.airflow.sdk.execution.comm.RetryTask
import org.apache.airflow.sdk.execution.comm.StartupDetails
import org.apache.airflow.sdk.execution.comm.SucceedTask
import org.apache.airflow.sdk.execution.comm.TIRunContext
@@ -64,6 +65,16 @@ class TaskTest {
Assertions.assertEquals(TaskState.State.FAILED, (result as
TaskState).state)
}
+ @Test
+ @DisplayName("Should return retry when task throws and should_retry is true")
+ fun shouldReturnRetryWhenTaskThrowsAndShouldRetryIsTrue() {
+ val details = startupDetails(taskId = "failing")
+ details.tiContext?.shouldRetry = true
+ val result = runTask(bundleWith("failing", FailingTask::class.java),
details, noOpClient())
+
+ Assertions.assertInstanceOf(RetryTask::class.java, result)
+ }
+
private fun bundleWith(
taskId: String,
taskClass: Class<out Task>,