wzhero1 commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3318031018
##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/JavaResourceAdapter.java:
##########
@@ -180,6 +180,18 @@ public Object invokeJavaTool(
return response.getResult();
}
+ /** Invoke a Java static action method with positional arguments from
Python. */
+ public Object invokeJavaAction(
+ String className,
+ String methodName,
+ List<String> parameterTypes,
+ List<Object> arguments)
+ throws Exception {
+ Method method = resolveMethod(className, methodName, parameterTypes);
+ Object[] args = arguments == null ? new Object[0] :
arguments.toArray();
+ return method.invoke(null, args);
Review Comment:
`invoke(null, args)` assumes the resolved target is `static`, but
`Class.getMethod(...)` doesn't enforce that. A `JavaFunction` whose
`method_name` points at an instance method resolves fine here and then throws
`NullPointerException` from `Method.invoke` with a message that says nothing
about the static requirement — hard to diagnose from the user side.
Could we check `Modifier.isStatic(method.getModifiers())` at resolution time
and throw a message naming the method (mirroring `FunctionTool`'s static
check), so the misconfiguration surfaces where the user can act on it?
##########
api/src/test/java/org/apache/flink/agents/api/CrossLanguageEventSnapshotTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.OutputSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.tools.ToolResponse;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Cross-language event SerDe snapshot tests. */
+class CrossLanguageEventSnapshotTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final UUID FIXED_EVENT_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000001");
+ private static final UUID FIXED_REQUEST_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000002");
+ private static final String FIXED_TOOL_CALL_ID = "call_aaaa";
+ private static final long FIXED_TIMESTAMP = 1_700_000_000_000L;
+
+ private static Path snapshotDir;
+
+ @BeforeAll
+ static void resolveSnapshotDir() {
+ Path repoRoot = Paths.get(System.getProperty("user.dir")).getParent();
+ snapshotDir =
repoRoot.resolve("e2e-test/cross-language-event-snapshots");
+ }
+
+ // ── Helpers ────────────────────────────────────────────────────────────
+
+ private static boolean regenerateRequested() {
+ return Boolean.parseBoolean(System.getProperty("regenerate.snapshots",
"false"));
+ }
+
+ private static void writeJavaSnapshot(String fileName, Event event) throws
Exception {
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(event);
+ Path target = snapshotDir.resolve("java/" + fileName);
+ Files.createDirectories(target.getParent());
+ Files.writeString(target, json + "\n");
+ }
+
+ private static void assertJavaSnapshotStable(String fileName, Event event)
throws Exception {
+ String actualJson = MAPPER.writeValueAsString(event);
+ JsonNode actual = MAPPER.readTree(actualJson);
+
+ Path committed = snapshotDir.resolve("java/" + fileName);
+ assertTrue(
+ Files.exists(committed),
+ "Java snapshot "
+ + fileName
+ + " missing from "
+ + committed
+ + ". If you added a new event, regenerate with
-Dregenerate.snapshots=true and commit alongside the test.");
+ JsonNode expected = MAPPER.readTree(Files.readString(committed));
+
+ assertEquals(
+ expected,
+ actual,
+ "Java serialization of "
+ + fileName
+ + " drifted from committed snapshot; if intentional,
regenerate.");
+ }
+
+ private static Event readPythonSnapshot(String fileName) throws Exception {
+ Path pythonSnapshot = snapshotDir.resolve("python/" + fileName);
+ assertTrue(
+ Files.exists(pythonSnapshot),
+ "Python snapshot "
+ + fileName
+ + " missing from "
+ + pythonSnapshot
+ + ". Regenerate the Python side with
REGENERATE_SNAPSHOTS=1 and commit alongside this test.");
+ return Event.fromJson(Files.readString(pythonSnapshot));
+ }
+
+ // ── InputEvent ─────────────────────────────────────────────────────────
+
+ private static InputEvent buildInputEvent() {
+ Map<String, Object> attrs = new HashMap<>();
+ attrs.put("input", "hello");
+ return new InputEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateInputEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("input_event.json", buildInputEvent());
+ }
+
+ @Test
+ void inputEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("input_event.json", buildInputEvent());
+ }
+
+ @Test
+ void javaCanDeserializeInputEventFromPythonSnapshot() throws Exception {
+ Event base = readPythonSnapshot("input_event.json");
+ InputEvent typed = InputEvent.fromEvent(base);
+
+ assertEquals(
+ FIXED_EVENT_ID, typed.getId(), "ID lost when deserializing
Python InputEvent.");
+ assertEquals(InputEvent.EVENT_TYPE, typed.getType());
+ assertEquals("hello", typed.getInput(), "InputEvent.input mismatch.");
+ }
+
+ // ── OutputEvent ────────────────────────────────────────────────────────
+
+ private static OutputEvent buildOutputEvent() {
+ Map<String, Object> attrs = new HashMap<>();
+ attrs.put("output", "world");
+ return new OutputEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateOutputEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("output_event.json", buildOutputEvent());
+ }
+
+ @Test
+ void outputEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("output_event.json", buildOutputEvent());
+ }
+
+ @Test
+ void javaCanDeserializeOutputEventFromPythonSnapshot() throws Exception {
+ Event base = readPythonSnapshot("output_event.json");
+ OutputEvent typed = OutputEvent.fromEvent(base);
+
+ assertEquals(
+ FIXED_EVENT_ID, typed.getId(), "ID lost when deserializing
Python OutputEvent.");
+ assertEquals(OutputEvent.EVENT_TYPE, typed.getType());
+ assertEquals("world", typed.getOutput(), "OutputEvent.output
mismatch.");
+ }
+
+ // ── ChatRequestEvent ───────────────────────────────────────────────────
+
+ private static ChatRequestEvent buildChatRequestEvent() {
+ Map<String, Object> attrs = new LinkedHashMap<>();
+ attrs.put("model", "test-model");
+ attrs.put("messages", List.of(new ChatMessage(MessageRole.USER, "hello
world")));
+ return new ChatRequestEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateChatRequestEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("chat_request_event.json", buildChatRequestEvent());
+ }
+
+ @Test
+ void chatRequestEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("chat_request_event.json",
buildChatRequestEvent());
+ }
+
+ @Test
+ void javaCanDeserializeChatRequestEventFromPythonSnapshot() throws
Exception {
+ Event base = readPythonSnapshot("chat_request_event.json");
+ ChatRequestEvent typed = ChatRequestEvent.fromEvent(base);
+
+ assertEquals(FIXED_EVENT_ID, typed.getId());
+ assertEquals(ChatRequestEvent.EVENT_TYPE, typed.getType());
+ assertEquals("test-model", typed.getModel());
+ assertNotNull(typed.getMessages());
+ assertEquals(1, typed.getMessages().size(), "Expected one message.");
+ ChatMessage msg = typed.getMessages().get(0);
+ assertEquals(MessageRole.USER, msg.getRole(), "Role mismatch on
Python-produced message.");
+ assertEquals("hello world", msg.getContent());
+ }
+
+ @Test
+ void chatRequestOutputSchemaWireFormatIsJavaShaped() throws Exception {
+ OutputSchema schema =
+ new OutputSchema(
+ new RowTypeInfo(
+ new TypeInformation[]
{BasicTypeInfo.STRING_TYPE_INFO},
+ new String[] {"name"}));
+ ChatRequestEvent event =
+ new ChatRequestEvent(
+ "test-model", List.of(new
ChatMessage(MessageRole.USER, "hi")), schema);
+ String json = MAPPER.writeValueAsString(event);
+
+ assertTrue(json.contains("\"fieldNames\""), "Java wire format uses
`fieldNames`.");
+ assertFalse(json.contains("\"names\""), "Java wire format does not use
Python's `names`.");
+ }
+
+ // ── ChatResponseEvent ──────────────────────────────────────────────────
+
+ private static ChatResponseEvent buildChatResponseEvent() {
+ Map<String, Object> attrs = new LinkedHashMap<>();
+ attrs.put("request_id", FIXED_REQUEST_ID);
+ attrs.put("response", new ChatMessage(MessageRole.ASSISTANT, "hi
there"));
+ attrs.put("retry_count", 0);
+ attrs.put("total_retry_wait_sec", 0);
+ return new ChatResponseEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateChatResponseEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("chat_response_event.json",
buildChatResponseEvent());
+ }
+
+ @Test
+ void chatResponseEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("chat_response_event.json",
buildChatResponseEvent());
+ }
+
+ @Test
+ void javaCanDeserializeChatResponseEventFromPythonSnapshot() throws
Exception {
+ Event base = readPythonSnapshot("chat_response_event.json");
+ ChatResponseEvent typed = ChatResponseEvent.fromEvent(base);
+
+ assertEquals(FIXED_EVENT_ID, typed.getId());
+ assertEquals(ChatResponseEvent.EVENT_TYPE, typed.getType());
+ assertEquals(FIXED_REQUEST_ID, typed.getRequestId(), "request_id
mismatch.");
+ ChatMessage response = typed.getResponse();
+ assertNotNull(response, "response field is null.");
+ assertEquals(MessageRole.ASSISTANT, response.getRole(), "Role mismatch
on response.");
+ assertEquals("hi there", response.getContent());
+ }
+
+ // ── ToolRequestEvent ───────────────────────────────────────────────────
+
+ private static ToolRequestEvent buildToolRequestEvent() {
+ Map<String, Object> toolCall = new LinkedHashMap<>();
+ toolCall.put("id", FIXED_TOOL_CALL_ID);
+ toolCall.put("name", "echo");
+ toolCall.put("arguments", Map.of("value", "ping"));
+
+ Map<String, Object> attrs = new LinkedHashMap<>();
+ attrs.put("model", "test-model");
+ attrs.put("tool_calls", List.of(toolCall));
+ return new ToolRequestEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateToolRequestEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("tool_request_event.json", buildToolRequestEvent());
+ }
+
+ @Test
+ void toolRequestEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("tool_request_event.json",
buildToolRequestEvent());
+ }
+
+ @Test
+ void javaCanDeserializeToolRequestEventFromPythonSnapshot() throws
Exception {
+ Event base = readPythonSnapshot("tool_request_event.json");
+ ToolRequestEvent typed = ToolRequestEvent.fromEvent(base);
+
+ assertEquals(FIXED_EVENT_ID, typed.getId());
+ assertEquals(ToolRequestEvent.EVENT_TYPE, typed.getType());
+ assertEquals("test-model", typed.getModel());
+ List<Map<String, Object>> toolCalls = typed.getToolCalls();
+ assertNotNull(toolCalls);
+ assertEquals(1, toolCalls.size());
+ assertEquals(FIXED_TOOL_CALL_ID, toolCalls.get(0).get("id"));
+ }
+
+ // ── ToolResponseEvent ──────────────────────────────────────────────────
+
+ private static ToolResponseEvent buildToolResponseEvent() {
+ Map<String, Object> attrs = new LinkedHashMap<>();
+ attrs.put("request_id", FIXED_REQUEST_ID);
+ attrs.put("responses", Map.of(FIXED_TOOL_CALL_ID,
ToolResponse.success("pong")));
+ attrs.put("success", Map.of(FIXED_TOOL_CALL_ID, true));
+ attrs.put("error", new HashMap<String, String>());
+ attrs.put("external_ids", new HashMap<String, String>());
+ attrs.put("timestamp", FIXED_TIMESTAMP);
+ return new ToolResponseEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateToolResponseEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("tool_response_event.json",
buildToolResponseEvent());
+ }
+
+ @Test
+ void toolResponseEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("tool_response_event.json",
buildToolResponseEvent());
+ }
+
+ @Test
+ void pythonToolResponseEventLosesDataWhenConsumedByJava() throws Exception
{
Review Comment:
This test pins the data loss as the expected contract:
`ToolResponseEvent.fromEvent` only handles `ToolResponse`/`Map` values (no
`String` branch), so a Python action emitting `responses={"call_id": "pong"}`
(string-valued, which Python does emit) yields an empty `responses` map on the
Java side with no exception — the core Python-action → Java-action path.
Because this asserts the loss, the suite stays green while the data is gone and
a fix has to break this test first.
Could we add a `String` branch in `fromEvent` so string responses survive,
or — if they're genuinely out of scope for 0.3 — rename this to make the gap
explicit (e.g. `..._knownGap_NotYetSupported`) and note it in the PR
description rather than encoding silent loss as the expected behavior?
##########
api/src/test/java/org/apache/flink/agents/api/CrossLanguageEventSnapshotTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.OutputSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.tools.ToolResponse;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Cross-language event SerDe snapshot tests. */
+class CrossLanguageEventSnapshotTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final UUID FIXED_EVENT_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000001");
+ private static final UUID FIXED_REQUEST_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000002");
+ private static final String FIXED_TOOL_CALL_ID = "call_aaaa";
+ private static final long FIXED_TIMESTAMP = 1_700_000_000_000L;
+
+ private static Path snapshotDir;
+
+ @BeforeAll
+ static void resolveSnapshotDir() {
+ Path repoRoot = Paths.get(System.getProperty("user.dir")).getParent();
+ snapshotDir =
repoRoot.resolve("e2e-test/cross-language-event-snapshots");
+ }
+
+ // ── Helpers ────────────────────────────────────────────────────────────
+
+ private static boolean regenerateRequested() {
+ return Boolean.parseBoolean(System.getProperty("regenerate.snapshots",
"false"));
+ }
+
+ private static void writeJavaSnapshot(String fileName, Event event) throws
Exception {
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(event);
+ Path target = snapshotDir.resolve("java/" + fileName);
+ Files.createDirectories(target.getParent());
+ Files.writeString(target, json + "\n");
+ }
+
+ private static void assertJavaSnapshotStable(String fileName, Event event)
throws Exception {
+ String actualJson = MAPPER.writeValueAsString(event);
+ JsonNode actual = MAPPER.readTree(actualJson);
+
+ Path committed = snapshotDir.resolve("java/" + fileName);
+ assertTrue(
+ Files.exists(committed),
+ "Java snapshot "
+ + fileName
+ + " missing from "
+ + committed
+ + ". If you added a new event, regenerate with
-Dregenerate.snapshots=true and commit alongside the test.");
+ JsonNode expected = MAPPER.readTree(Files.readString(committed));
+
+ assertEquals(
+ expected,
+ actual,
+ "Java serialization of "
+ + fileName
+ + " drifted from committed snapshot; if intentional,
regenerate.");
+ }
+
+ private static Event readPythonSnapshot(String fileName) throws Exception {
+ Path pythonSnapshot = snapshotDir.resolve("python/" + fileName);
+ assertTrue(
+ Files.exists(pythonSnapshot),
+ "Python snapshot "
+ + fileName
+ + " missing from "
+ + pythonSnapshot
+ + ". Regenerate the Python side with
REGENERATE_SNAPSHOTS=1 and commit alongside this test.");
+ return Event.fromJson(Files.readString(pythonSnapshot));
+ }
+
+ // ── InputEvent ─────────────────────────────────────────────────────────
+
+ private static InputEvent buildInputEvent() {
+ Map<String, Object> attrs = new HashMap<>();
+ attrs.put("input", "hello");
+ return new InputEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateInputEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("input_event.json", buildInputEvent());
+ }
+
+ @Test
+ void inputEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("input_event.json", buildInputEvent());
+ }
+
+ @Test
+ void javaCanDeserializeInputEventFromPythonSnapshot() throws Exception {
+ Event base = readPythonSnapshot("input_event.json");
+ InputEvent typed = InputEvent.fromEvent(base);
+
+ assertEquals(
+ FIXED_EVENT_ID, typed.getId(), "ID lost when deserializing
Python InputEvent.");
+ assertEquals(InputEvent.EVENT_TYPE, typed.getType());
+ assertEquals("hello", typed.getInput(), "InputEvent.input mismatch.");
+ }
+
+ // ── OutputEvent ────────────────────────────────────────────────────────
+
+ private static OutputEvent buildOutputEvent() {
+ Map<String, Object> attrs = new HashMap<>();
+ attrs.put("output", "world");
+ return new OutputEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateOutputEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("output_event.json", buildOutputEvent());
+ }
+
+ @Test
+ void outputEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("output_event.json", buildOutputEvent());
+ }
+
+ @Test
+ void javaCanDeserializeOutputEventFromPythonSnapshot() throws Exception {
+ Event base = readPythonSnapshot("output_event.json");
+ OutputEvent typed = OutputEvent.fromEvent(base);
+
+ assertEquals(
+ FIXED_EVENT_ID, typed.getId(), "ID lost when deserializing
Python OutputEvent.");
+ assertEquals(OutputEvent.EVENT_TYPE, typed.getType());
+ assertEquals("world", typed.getOutput(), "OutputEvent.output
mismatch.");
+ }
+
+ // ── ChatRequestEvent ───────────────────────────────────────────────────
+
+ private static ChatRequestEvent buildChatRequestEvent() {
+ Map<String, Object> attrs = new LinkedHashMap<>();
+ attrs.put("model", "test-model");
+ attrs.put("messages", List.of(new ChatMessage(MessageRole.USER, "hello
world")));
+ return new ChatRequestEvent(FIXED_EVENT_ID, attrs);
+ }
+
+ @Test
+ void regenerateChatRequestEventJavaSnapshot() throws Exception {
+ assumeTrue(regenerateRequested(), "Set -Dregenerate.snapshots=true to
refresh.");
+ writeJavaSnapshot("chat_request_event.json", buildChatRequestEvent());
+ }
+
+ @Test
+ void chatRequestEventJavaSnapshotIsStable() throws Exception {
+ assertJavaSnapshotStable("chat_request_event.json",
buildChatRequestEvent());
+ }
+
+ @Test
+ void javaCanDeserializeChatRequestEventFromPythonSnapshot() throws
Exception {
+ Event base = readPythonSnapshot("chat_request_event.json");
+ ChatRequestEvent typed = ChatRequestEvent.fromEvent(base);
+
+ assertEquals(FIXED_EVENT_ID, typed.getId());
+ assertEquals(ChatRequestEvent.EVENT_TYPE, typed.getType());
+ assertEquals("test-model", typed.getModel());
+ assertNotNull(typed.getMessages());
+ assertEquals(1, typed.getMessages().size(), "Expected one message.");
+ ChatMessage msg = typed.getMessages().get(0);
+ assertEquals(MessageRole.USER, msg.getRole(), "Role mismatch on
Python-produced message.");
+ assertEquals("hello world", msg.getContent());
+ }
+
+ @Test
+ void chatRequestOutputSchemaWireFormatIsJavaShaped() throws Exception {
Review Comment:
This (and the Python mirror
`test_chat_request_output_schema_wire_format_is_python_shaped`) pins a wire
incompatibility rather than reconciling it: for a `RowTypeInfo`-typed
`output_schema`, Java emits `{"fieldNames": [...], "types": [<Class>]}` while
Python emits `{"names": [...], "types": [<BasicType int>]}` — both the keys and
the type encodings differ, so a `ChatRequestEvent` carrying a non-null
`RowTypeInfo` output_schema can't be deserialized on the other side. (The
`BaseModel` `module`/`class` path does look symmetric; this is specifically the
`RowTypeInfo` branch.)
Is structured `RowTypeInfo` output across the boundary in scope for 0.3? If
yes the two sides need a shared key/format; if no, same ask — make it an
explicit documented limitation instead of a green test asserting the mismatch.
##########
plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java:
##########
@@ -228,20 +228,39 @@ private void extractActionsFromAgent(Agent agent) throws
Exception {
// Scan the agent class for methods annotated with @Action
Class<?> agentClass = agent.getClass();
for (Method method : agentClass.getDeclaredMethods()) {
- if
(method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
{
- org.apache.flink.agents.api.annotation.Action actionAnnotation
=
-
method.getAnnotation(org.apache.flink.agents.api.annotation.Action.class);
-
- String[] listenEventTypeStrings =
-
Objects.requireNonNull(actionAnnotation).listenEventTypes();
-
- org.apache.flink.agents.plan.JavaFunction javaFunction =
+ if
(!method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
{
Review Comment:
`getDeclaredMethods()` returns only methods declared on the concrete class,
so `@Action` methods inherited from a parent agent class aren't registered.
Every agent today is a single-level `extends Agent` with `@Action` on the final
class, so nothing triggers this — but factoring shared actions into an
intermediate base class (a natural refactor) would make them silently stop
registering.
Note `getMethods()` only half-fixes it (catches `public` `@Action` by
convention but misses `protected`/package-private); walking the superclass
chain calling `getDeclaredMethods()` is the visibility-insensitive fix. Low
urgency — worth a guard or a documented constraint.
##########
api/src/test/java/org/apache/flink/agents/api/CrossLanguageEventSnapshotTest.java:
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.OutputSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.tools.ToolResponse;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Cross-language event SerDe snapshot tests. */
+class CrossLanguageEventSnapshotTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final UUID FIXED_EVENT_ID =
Review Comment:
Forcing a fixed id here masks a real cross-language divergence: Java assigns
event ids with `UUID.randomUUID()` (`Event.java`) while Python derives a
deterministic content-based id (`event.py` `_generate_content_based_id`).
Because the two sides use different identity models, the committed snapshots
only line up if the id is forced — so they don't reflect what either side
naturally emits, and the same logical event gets a different id depending on
its producer, which breaks id-based dedup/replay/correlation across languages.
Is cross-language event-id stability a requirement? If so the two sides
should agree on one scheme (content-derived on both, most likely); if not,
worth documenting that ids are language-local and must not be used for
cross-boundary correlation.
##########
python/flink_agents/plan/agent_plan.py:
##########
@@ -242,21 +242,31 @@ def _get_actions(agent: Agent) -> List[Action]:
actions = []
for name, value in agent.__class__.__dict__.items():
if isinstance(value, staticmethod) and hasattr(value,
"_listen_events"):
+ exec_ = (
Review Comment:
`hasattr(value, "_listen_events")` is checked on the `staticmethod` wrapper,
which makes this order-sensitive. With `@action` outermost over `@staticmethod`
(the order the examples use), the attribute lands on the staticmethod object
and this branch hits. With the conventional `@staticmethod` outermost over
`@action`, the attribute is on `__func__`, `hasattr(wrapper, ...)` is False
(3.9), and the `elif callable(value)` below also misses (staticmethod isn't
callable pre-3.10) — so the action is silently dropped with no error.
Unwrapping first (`inner = value.__func__ if isinstance(value, staticmethod)
else value`, then check `inner`) would make both orders work. No example hits
the broken order today, so it's preventive — but it's an easy silent-drop trap.
##########
python/flink_agents/plan/function.py:
##########
@@ -272,17 +272,23 @@ def set_java_resource_adapter(self, adapter: Any) -> None:
def __call__(self, *args: Tuple[Any, ...], **kwargs: Dict[str, Any]) ->
Any:
"""Invoke the Java method via the JVM resource adapter.
- LLM tool calls always arrive as keyword arguments — positional
- ``*args`` are ignored because the Java side reorders parameters
- by name via reflection.
+ Positional args route to ``invokeJavaAction`` (action dispatch);
+ keyword args route to ``invokeJavaTool`` (LLM tool dispatch).
"""
if self._j_resource_adapter is None:
msg = (
"JavaFunction requires the JVM resource adapter; not set "
- "on this descriptor. The runtime should inject it via "
+ "on this descriptor. The runtime injects it via "
"set_java_resource_adapter before invocation."
)
raise RuntimeError(msg)
+ if args:
Review Comment:
Routing on `if args:` (positional → `invokeJavaAction`, else →
`invokeJavaTool`) leaves the "positional = action, keyword = tool" contract
enforced only by the docstring. A mixed call `func(event, x=1)` would go to
`invokeJavaAction` and silently drop `kwargs`; a no-arg call falls to the tool
path. Both real call sites are clean today (the action path passes exactly
`(event, context)` positionally, the tool path passes pure kwargs), so this
isn't a live bug — but it's an unenforced convention that a future caller or
refactor could misroute silently.
A cheap assertion (reject simultaneous `args` and `kwargs`, or dispatch on
an explicit flag) would make the contract enforced rather than assumed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]