xintongsong commented on code in PR #451:
URL: https://github.com/apache/flink-agents/pull/451#discussion_r2704263653
##########
docs/content/docs/development/workflow_agent.md:
##########
@@ -277,40 +277,123 @@ public static void processInput(InputEvent event,
RunnerContext ctx) throws Exce
{{< /tabs >}}
-### Async Execution
-
-{{< hint warning >}}
-Async Execution is only supported in Python currently.
-{{< /hint >}}
+### Durable Execution
-When an action needs to perform time-consuming I/O operations (such as calling
external APIs, database queries, or network requests), you can use
`ctx.execute_async()` to execute these operations asynchronously. This allows
Flink to efficiently manage resources and avoid blocking the main processing
thread.
+Use durable execution when you wrap a time-consuming or side-effecting
operation. The framework persists the result and replays it on recovery when
the same call is encountered, so the function will not be called again and side
effects are avoided.
-To use async execution, define your action as an `async def` function and use
`await` with `ctx.execute_async()`:
+**Constraints:**
+- The function must be deterministic and called in the same order on recovery.
+- Access to Memory and `send_event` is prohibited inside the function/callable.
+- Arguments and results must be serializable.
+{{< tabs "Durable Execution" >}}
+{{< tab "Python" >}}
+Python actions can call `ctx.durable_execute(...)` to run a synchronous
durable code block.
```python
@action(InputEvent)
@staticmethod
-async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+def process_input(event: InputEvent, ctx: RunnerContext) -> None:
def slow_external_call(data: str) -> str:
- # Simulate a slow external API call
time.sleep(2)
return f"Processed: {data}"
-
- # Execute the slow operation asynchronously
- result = await ctx.execute_async(slow_external_call, event.input)
-
+
+ # Synchronous durable execution
+ result = ctx.durable_execute(slow_external_call, event.input)
ctx.send_event(OutputEvent(output=result))
```
+{{< /tab >}}
+
+{{< tab "Java" >}}
+Java actions use `DurableCallable<T>` with `ctx.durableExecute(...)`, where
`getId()` must be stable and `getResultClass()` supports recovery
deserialization.
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void processInput(InputEvent event, RunnerContext ctx) throws
Exception {
+ DurableCallable<String> call = new DurableCallable<>() {
+ @Override
+ public String getId() {
+ // Stable, deterministic ID for this call
+ return "slow_external_call";
+ }
+
+ @Override
+ public Class<String> getResultClass() {
+ return String.class;
+ }
+
+ @Override
+ public String call() throws Exception {
+ Thread.sleep(2000);
+ return "Processed: " + event.getInput();
+ }
+ };
+
+ String result = ctx.durableExecute(call);
+ ctx.sendEvent(new OutputEvent(result));
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Async Execution
+
+Async execution uses the same durable semantics but yields while waiting for a
thread-pool task. This is useful for high-latency I/O.
-**Key points:**
-- Use `async def` to define the action function
-- Use `await ctx.execute_async(func, *args, **kwargs)` to execute slow
operations
-- The function passed to `execute_async` will be submitted to a thread pool
-- Access to memory is prohibited within the function passed to `execute_async`
+{{< tabs "Async Execution" >}}
+{{< tab "Python" >}}
+Define an `async def` action and `await ctx.durable_execute_async(...)`.
+```python
+@action(InputEvent)
+@staticmethod
+async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+ def slow_external_call(data: str) -> str:
+ time.sleep(2)
+ return f"Processed: {data}"
+ result = await ctx.durable_execute_async(slow_external_call, event.input)
+ ctx.send_event(OutputEvent(output=result))
+```
{{< hint info >}}
-Only `await ctx.execute_async(...)` is supported. Standard asyncio functions
like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
`asyncio.sleep` are **NOT** supported because there is no asyncio event loop
running.
+Python async actions only support `await ctx.durable_execute_async(...)`.
Standard asyncio
+functions like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
+`asyncio.sleep` are **NOT** supported because there is no asyncio event loop.
{{< /hint >}}
+{{< /tab >}}
+
+{{< tab "Java" >}}
+Use `ctx.durableExecuteAsync(DurableCallable)`; on **JDK 21+** it yields using
Continuation,
+and on **JDK < 21** it falls back to synchronous execution.
+```java
+@Action(listenEvents = {InputEvent.class})
+public static void processInput(InputEvent event, RunnerContext ctx) throws
Exception {
+ DurableCallable<String> call = new DurableCallable<>() {
+ @Override
+ public String getId() {
+ return "slow_external_call";
+ }
+
+ @Override
+ public Class<String> getResultClass() {
+ return String.class;
+ }
+
+ @Override
+ public String call() throws Exception {
+ Thread.sleep(2000);
+ return "Processed: " + event.getInput();
+ }
+ };
+
+ String result = ctx.durableExecuteAsync(call);
+ ctx.sendEvent(new OutputEvent(result));
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### API Summary
+
+- `durable_execute`: synchronous, durable, blocks the action until completion.
+- `durable_execute_async`: asynchronous, durable, yields while waiting; Java
async is effective on JDK 21+ and falls back to sync on JDK < 21.
Review Comment:
This feels like a sub-content of the Async Execution section.
##########
docs/content/docs/development/workflow_agent.md:
##########
@@ -277,40 +277,123 @@ public static void processInput(InputEvent event,
RunnerContext ctx) throws Exce
{{< /tabs >}}
-### Async Execution
-
-{{< hint warning >}}
-Async Execution is only supported in Python currently.
-{{< /hint >}}
+### Durable Execution
Review Comment:
There are several things worth mentioning.
1. The feature requires an external action state backend. What are the
supported options? How to set up? What happens if the backend is not set?
2. It avoids repeating calls with best efforts, rather than strict
guarantees. In which cases would it fail to reuse the previous call results?
3. The action codes (outside durable_execute) are always re-executed during
recovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]