xintongsong commented on code in PR #451:
URL: https://github.com/apache/flink-agents/pull/451#discussion_r2704263653


##########
docs/content/docs/development/workflow_agent.md:
##########
@@ -277,40 +277,123 @@ public static void processInput(InputEvent event, 
RunnerContext ctx) throws Exce
 
 {{< /tabs >}}
 
-### Async Execution
-
-{{< hint warning >}}
-Async Execution is only supported in Python currently.
-{{< /hint >}}
+### Durable Execution
 
-When an action needs to perform time-consuming I/O operations (such as calling 
external APIs, database queries, or network requests), you can use 
`ctx.execute_async()` to execute these operations asynchronously. This allows 
Flink to efficiently manage resources and avoid blocking the main processing 
thread.
+Use durable execution when you wrap a time-consuming or side-effecting 
operation. The framework persists the result and replays it on recovery when 
the same call is encountered, so the function will not be called again and side 
effects are avoided.
 
-To use async execution, define your action as an `async def` function and use 
`await` with `ctx.execute_async()`:
+**Constraints:**
+- The function must be deterministic and called in the same order on recovery.
+- Access to Memory and `send_event` is prohibited inside the function/callable.
+- Arguments and results must be serializable.
 
+{{< tabs "Durable Execution" >}}
+{{< tab "Python" >}}
+Python actions can call `ctx.durable_execute(...)` to run a synchronous 
durable code block.
 ```python
 @action(InputEvent)
 @staticmethod
-async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+def process_input(event: InputEvent, ctx: RunnerContext) -> None:
     def slow_external_call(data: str) -> str:
-        # Simulate a slow external API call
         time.sleep(2)
         return f"Processed: {data}"
-    
-    # Execute the slow operation asynchronously
-    result = await ctx.execute_async(slow_external_call, event.input)
-    
+
+    # Synchronous durable execution
+    result = ctx.durable_execute(slow_external_call, event.input)
     ctx.send_event(OutputEvent(output=result))
 ```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+Java actions use `DurableCallable<T>` with `ctx.durableExecute(...)`, where 
`getId()` must be stable and `getResultClass()` supports recovery 
deserialization.
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void processInput(InputEvent event, RunnerContext ctx) throws 
Exception {
+    DurableCallable<String> call = new DurableCallable<>() {
+        @Override
+        public String getId() {
+            // Stable, deterministic ID for this call
+            return "slow_external_call";
+        }
+
+        @Override
+        public Class<String> getResultClass() {
+            return String.class;
+        }
+
+        @Override
+        public String call() throws Exception {
+            Thread.sleep(2000);
+            return "Processed: " + event.getInput();
+        }
+    };
+
+    String result = ctx.durableExecute(call);
+    ctx.sendEvent(new OutputEvent(result));
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Async Execution
+
+Async execution uses the same durable semantics but yields while waiting for a 
thread-pool task. This is useful for high-latency I/O.
 
-**Key points:**
-- Use `async def` to define the action function
-- Use `await ctx.execute_async(func, *args, **kwargs)` to execute slow 
operations
-- The function passed to `execute_async` will be submitted to a thread pool
-- Access to memory is prohibited within the function passed to `execute_async`
+{{< tabs "Async Execution" >}}
+{{< tab "Python" >}}
+Define an `async def` action and `await ctx.durable_execute_async(...)`.
+```python
+@action(InputEvent)
+@staticmethod
+async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+    def slow_external_call(data: str) -> str:
+        time.sleep(2)
+        return f"Processed: {data}"
 
+    result = await ctx.durable_execute_async(slow_external_call, event.input)
+    ctx.send_event(OutputEvent(output=result))
+```
 {{< hint info >}}
-Only `await ctx.execute_async(...)` is supported. Standard asyncio functions 
like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and 
`asyncio.sleep` are **NOT** supported because there is no asyncio event loop 
running.
+Python async actions only support `await ctx.durable_execute_async(...)`. 
Standard asyncio
+functions like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
+`asyncio.sleep` are **NOT** supported because there is no asyncio event loop.
 {{< /hint >}}
+{{< /tab >}}
+
+{{< tab "Java" >}}
+Use `ctx.durableExecuteAsync(DurableCallable)`; on **JDK 21+** it yields using 
Continuation,
+and on **JDK < 21** it falls back to synchronous execution.
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void processInput(InputEvent event, RunnerContext ctx) throws 
Exception {
+    DurableCallable<String> call = new DurableCallable<>() {
+        @Override
+        public String getId() {
+            return "slow_external_call";
+        }
+
+        @Override
+        public Class<String> getResultClass() {
+            return String.class;
+        }
+
+        @Override
+        public String call() throws Exception {
+            Thread.sleep(2000);
+            return "Processed: " + event.getInput();
+        }
+    };
+
+    String result = ctx.durableExecuteAsync(call);
+    ctx.sendEvent(new OutputEvent(result));
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### API Summary
+
+- `durable_execute`: synchronous, durable, blocks the action until completion.
+- `durable_execute_async`: asynchronous, durable, yields while waiting; Java 
async is effective on JDK 21+ and falls back to sync on JDK < 21.

Review Comment:
   This feels like a sub-content of the Async Execution section.



##########
docs/content/docs/development/workflow_agent.md:
##########
@@ -277,40 +277,123 @@ public static void processInput(InputEvent event, 
RunnerContext ctx) throws Exce
 
 {{< /tabs >}}
 
-### Async Execution
-
-{{< hint warning >}}
-Async Execution is only supported in Python currently.
-{{< /hint >}}
+### Durable Execution

Review Comment:
   There are several things worth mentioning.
   1. The feature requires an external action state backend. What are the 
supported options? How to set up? What happens if the backend is not set?
   2. It avoids repeating calls with best efforts, rather than strict 
guarantees. In which cases would it fail to reuse the previous call results?
   3. The action codes (outside durable_execute) are always re-executed during 
recovery.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to