This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new cf86ed2c [fix] Fix ReActAgent failure when output schema is null (#837)
cf86ed2c is described below
commit cf86ed2ca78ff7325c1089a14876f73e09e7c9d2
Author: Wenjin Xie <[email protected]>
AuthorDate: Thu Jun 11 19:00:12 2026 +0800
[fix] Fix ReActAgent failure when output schema is null (#837)
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../apache/flink/agents/api/agents/ReActAgent.java | 5 +-
.../agents/integration/test/ReActAgentTest.java | 105 +++++++++++++++++----
python/flink_agents/api/agents/react_agent.py | 2 +-
.../e2e_tests_integration/react_agent_test.py | 85 +++++++++++++++++
4 files changed, 176 insertions(+), 21 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
b/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
index 0b394baa..088f3efd 100644
--- a/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
+++ b/api/src/main/java/org/apache/flink/agents/api/agents/ReActAgent.java
@@ -60,6 +60,7 @@ public class ReActAgent extends Agent {
public ReActAgent(
ResourceDescriptor descriptor, @Nullable Prompt prompt, @Nullable
Object outputSchema) {
this.addResource(DEFAULT_CHAT_MODEL, ResourceType.CHAT_MODEL,
descriptor);
+ Map<String, Object> actionConfig = new HashMap<>();
if (outputSchema != null) {
String jsonSchema;
@@ -82,15 +83,13 @@ public class ReActAgent extends Agent {
"The final response should be json format,
and match the schema %s",
jsonSchema));
this.addResource(DEFAULT_SCHEMA_PROMPT, ResourceType.PROMPT,
schemaPrompt);
+ actionConfig.put("output_schema", outputSchema);
}
if (prompt != null) {
this.addResource(DEFAULT_USER_PROMPT, ResourceType.PROMPT, prompt);
}
- Map<String, Object> actionConfig = new HashMap<>();
- actionConfig.put("output_schema", outputSchema);
-
try {
Method method =
this.getClass().getMethod("startAction", Event.class,
RunnerContext.class);
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
index 1baa40d1..7bff4550 100644
---
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ReActAgentTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import static
org.apache.flink.agents.api.agents.AgentExecutionOptions.ERROR_HANDLING_STRATEGY;
@@ -114,7 +116,7 @@ public class ReActAgentTest {
agentsEnv.getConfig().set(MAX_RETRIES, 3);
// Declare the ReAct agent.
- Agent agent = getAgent();
+ Agent agent = getAgent(true);
// Create input table from sample data
Table inputTable =
@@ -152,8 +154,74 @@ public class ReActAgentTest {
checkResult(results);
}
- // create ReAct agent.
- private static Agent getAgent() {
+ @Test
+ public void testReActAgentNoOutputSchema() throws Exception {
+ Assumptions.assumeTrue(ollamaReady, String.format("%s is not ready",
OLLAMA_MODEL));
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ // Create the table environment
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
+ tableEnv.getConfig().set("table.exec.result.display.max-column-width",
"100");
+
+ // Create agents execution environment
+ AgentsExecutionEnvironment agentsEnv =
+ AgentsExecutionEnvironment.getExecutionEnvironment(env,
tableEnv);
+
+ // register resource to agents execution environment.
+ agentsEnv
+ .addResource(
+ "ollama",
+ ResourceType.CHAT_MODEL_CONNECTION,
+ ResourceDescriptor.Builder.newBuilder(
+
ResourceName.ChatModel.OLLAMA_CONNECTION)
+ .addInitialArgument("endpoint",
"http://localhost:11434")
+ .addInitialArgument("requestTimeout", 240)
+ .build())
+ .addResource(
+ "add",
+ ResourceType.TOOL,
+ Tool.fromMethod(
+ ReActAgentTest.class.getMethod("add",
Double.class, Double.class)))
+ .addResource(
+ "multiply",
+ ResourceType.TOOL,
+ Tool.fromMethod(
+ ReActAgentTest.class.getMethod(
+ "multiply", Double.class,
Double.class)));
+
+ agentsEnv.getConfig().set(ERROR_HANDLING_STRATEGY,
ReActAgent.ErrorHandlingStrategy.RETRY);
+ agentsEnv.getConfig().set(MAX_RETRIES, 3);
+
+ // Declare the ReAct agent without an output schema.
+ Agent agent = getAgent(false);
+
+ // Create input table from sample data
+ Table inputTable =
+ tableEnv.fromValues(
+ DataTypes.ROW(
+ DataTypes.FIELD("a", DataTypes.DOUBLE()),
+ DataTypes.FIELD("b", DataTypes.DOUBLE()),
+ DataTypes.FIELD("c", DataTypes.DOUBLE())),
+ Row.of(2131, 29847, 3));
+
+ // Apply agent to the Table; without an output schema the result is a
string.
+ DataStream<Object> out =
+ agentsEnv
+ .fromTable(
+ inputTable,
+ (KeySelector<Object, Double>)
+ value -> (Double) ((Row)
value).getField("a"))
+ .apply(agent)
+ .toDataStream();
+
+ out.print();
+
+ env.execute();
+ }
+
+ // create ReAct agent; pass false to skip the output schema.
+ private static Agent getAgent(boolean withSchema) {
ResourceDescriptor chatModelDescriptor =
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP)
.addInitialArgument("connection", "ollama")
@@ -162,21 +230,24 @@ public class ReActAgentTest {
.addInitialArgument("extract_reasoning", true)
.build();
- Prompt prompt =
- Prompt.fromMessages(
- List.of(
- new ChatMessage(
- MessageRole.SYSTEM,
- "Must call function tool to do the
calculate."),
- new ChatMessage(
- MessageRole.SYSTEM,
- "An example of output is {\"result\":
30.32}"),
- new ChatMessage(MessageRole.USER, "What is
({a} + {b}) * {c}.")));
+ List<ChatMessage> messages = new ArrayList<>();
+ messages.add(
+ new ChatMessage(
+ MessageRole.SYSTEM, "Must call function tool to do the
calculate."));
+ if (withSchema) {
+ messages.add(
+ new ChatMessage(
+ MessageRole.SYSTEM, "An example of output is
{\"result\": 30.32}"));
+ }
+ messages.add(new ChatMessage(MessageRole.USER, "What is ({a} + {b}) *
{c}."));
+
RowTypeInfo outputTypeInfo =
- new RowTypeInfo(
- new TypeInformation[] {BasicTypeInfo.DOUBLE_TYPE_INFO},
- new String[] {"result"});
- return new ReActAgent(chatModelDescriptor, prompt, outputTypeInfo);
+ withSchema
+ ? new RowTypeInfo(
+ new TypeInformation[]
{BasicTypeInfo.DOUBLE_TYPE_INFO},
+ new String[] {"result"})
+ : null;
+ return new ReActAgent(chatModelDescriptor,
Prompt.fromMessages(messages), outputTypeInfo);
}
private void checkResult(CloseableIterator<?> results) {
diff --git a/python/flink_agents/api/agents/react_agent.py
b/python/flink_agents/api/agents/react_agent.py
index cef651a1..c898d119 100644
--- a/python/flink_agents/api/agents/react_agent.py
+++ b/python/flink_agents/api/agents/react_agent.py
@@ -143,7 +143,7 @@ class ReActAgent(Agent):
name="start_action",
events=[InputEvent.EVENT_TYPE],
func=self.start_action,
- output_schema=OutputSchema(output_schema=output_schema),
+ output_schema=OutputSchema(output_schema=output_schema) if
output_schema else None,
)
@staticmethod
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py
index f0c77178..ad689d05 100644
--- a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py
@@ -267,3 +267,88 @@ def test_react_agent_on_remote_runner(
# through the event-log capture path.
invocations = collect_tool_invocations(log_dir)
assert_tool_invoked(invocations, "multiply", {"a": 4444, "b": 312})
+
+
[email protected](
+ client is None, reason="Ollama client is not available or test model is
missing"
+)
+def test_react_agent_no_output_schema_on_remote_runner(
+ tmp_path: Path, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ """ReAct agent without an output_schema should emit a plain string
result."""
+ monkeypatch.setenv("OLLAMA_CHAT_MODEL", OLLAMA_MODEL)
+ stream_env = StreamExecutionEnvironment.get_execution_environment()
+
+ stream_env.set_parallelism(1)
+
+ t_env =
StreamTableEnvironment.create(stream_execution_environment=stream_env)
+
+ table = t_env.from_elements(
+ elements=[(2123, 2321, 312)],
+ schema=DataTypes.ROW(
+ [
+ DataTypes.FIELD("a", DataTypes.INT()),
+ DataTypes.FIELD("b", DataTypes.INT()),
+ DataTypes.FIELD("c", DataTypes.INT()),
+ ]
+ ),
+ )
+
+ env = AgentsExecutionEnvironment.get_execution_environment(
+ env=stream_env, t_env=t_env
+ )
+
+ env.get_config().set(
+ AgentExecutionOptions.ERROR_HANDLING_STRATEGY,
ErrorHandlingStrategy.RETRY
+ )
+
+ env.get_config().set(AgentExecutionOptions.MAX_RETRIES, 3)
+
+ log_dir = tmp_path / "event_logs"
+ log_dir.mkdir(parents=True, exist_ok=True)
+ env.get_config().set_str("baseLogDir", str(log_dir))
+
+ # register resource to execution environment
+ (
+ env.add_resource(
+ "ollama",
+ ResourceType.CHAT_MODEL_CONNECTION,
+ ResourceDescriptor(
+ clazz=ResourceName.ChatModel.OLLAMA_CONNECTION,
request_timeout=240.0
+ ),
+ )
+ .add_resource("add", ResourceType.TOOL, Tool.from_callable(add))
+ .add_resource("multiply", ResourceType.TOOL,
Tool.from_callable(multiply))
+ )
+
+ # prepare prompt
+ prompt = Prompt.from_messages(
+ messages=[
+ ChatMessage(role=MessageRole.USER, content="What is ({a} + {b}) *
{c}"),
+ ],
+ )
+
+ # create ReAct agent without an output schema; result is emitted as a
string.
+ agent = ReActAgent(
+ chat_model=ResourceDescriptor(
+ clazz=ResourceName.ChatModel.OLLAMA_SETUP,
+ connection="ollama",
+ model=OLLAMA_MODEL,
+ tools=["add", "multiply"],
+ ),
+ prompt=prompt,
+ )
+
+ output_stream = (
+ env.from_table(input=table, key_selector=MyKeySelector())
+ .apply(agent)
+ .to_datastream()
+ )
+ output_stream.print()
+
+ env.execute()
+
+ # multiply's first arg (4444 = 2123 + 2321) proves the addition was
computed
+ # correctly and threaded into multiply, even without an output schema.
+ invocations = collect_tool_invocations(log_dir)
+ assert_tool_invoked(invocations, "multiply", {"a": 4444, "b": 312})