weiqingy commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315347855
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor
pythonActionExecutor) thro
/**
* Extracts the downstream output payload from an output {@link Event}.
*
- * <p>For a Java {@link OutputEvent}, returns the payload directly. For
Python-side output
- * events (cross-language unified {@link Event} with output type), the
event is JSON-serialized
- * and handed to {@link
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+ * <p>Dispatch is by pipeline wire format, not action language:
+ *
+ * <ul>
+ * <li>Java pipelines ({@code inputIsJava}) emit the raw payload
directly.
+ * <li>Python pipelines re-encode through {@link
+ * PythonActionExecutor#getOutputFromOutputEvent(String)} so the
downstream Python sink
+ * receives cloudpickle bytes.
+ * </ul>
*
* @param event the output event (must satisfy {@link
EventUtil#isOutputEvent(Event)}).
- * @param pythonActionExecutor the Python action executor (used only for
Python output events).
+ * @param pythonActionExecutor used only on Python pipelines.
* @return the typed output payload.
*/
@SuppressWarnings("unchecked")
OUT getOutputFromOutputEvent(Event event, PythonActionExecutor
pythonActionExecutor) {
checkState(EventUtil.isOutputEvent(event));
- if (event instanceof OutputEvent) {
- return (OUT) ((OutputEvent) event).getOutput();
- } else {
- // Python output events arrive as unified Event with type
"_output_event".
- // Pass the JSON representation to Python for extraction.
- try {
- String eventJson = new
ObjectMapper().writeValueAsString(event);
- Object outputFromOutputEvent =
-
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
- return (OUT) outputFromOutputEvent;
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to extract output from event: " +
event.getType(), e);
- }
+ OutputEvent typedEvent =
+ (event instanceof OutputEvent) ? (OutputEvent) event :
OutputEvent.fromEvent(event);
+ if (inputIsJava) {
+ return (OUT) typedEvent.getOutput();
Review Comment:
For the `inputIsJava == true && event is unified Event` case (e.g. a Python
action emitting `_output_event` in a Java pipeline), this now calls
`OutputEvent.fromEvent(event).getOutput()` and bypasses `PythonActionExecutor`
entirely. The old branch routed everything that wasn't `instanceof OutputEvent`
through Python — so this is a deliberate behavior change to a hot path, not
just a refactor, and it affects every output event in every pipeline, not just
cross-language ones.
Two asks:
- Could this be called out in the PR description so reviewers (and the 0.3
release notes) don't miss it?
- Is there a test under `runtime/` that specifically pins the Java-pipeline
+ Python-action output path so a future Pemja change doesn't silently regress
payload reconstruction? `CrossLanguageActionRuntimeTest` exercises dispatch but
I didn't spot one that asserts on the recovered output type/value here.
##########
plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java:
##########
@@ -228,20 +228,30 @@ private void extractActionsFromAgent(Agent agent) throws
Exception {
// Scan the agent class for methods annotated with @Action
Class<?> agentClass = agent.getClass();
for (Method method : agentClass.getDeclaredMethods()) {
- if
(method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
{
- org.apache.flink.agents.api.annotation.Action actionAnnotation
=
-
method.getAnnotation(org.apache.flink.agents.api.annotation.Action.class);
-
- String[] listenEventTypeStrings =
-
Objects.requireNonNull(actionAnnotation).listenEventTypes();
-
- org.apache.flink.agents.plan.JavaFunction javaFunction =
+ if
(!method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
{
+ continue;
+ }
+ org.apache.flink.agents.api.annotation.Action actionAnnotation =
+ Objects.requireNonNull(
+ method.getAnnotation(
+
org.apache.flink.agents.api.annotation.Action.class));
+ String[] listenEventTypeStrings =
actionAnnotation.listenEventTypes();
+ org.apache.flink.agents.api.annotation.PythonFunction target =
+ actionAnnotation.target();
+
+ org.apache.flink.agents.plan.Function execFunction;
+ if (target.module().isEmpty()) {
Review Comment:
The `module().isEmpty()` branch picks Java vs Python, but `qualname()` is
read unconditionally on the Python path without a non-empty check.
`@Action(target = @PythonFunction(module = "pkg"))` (no qualname) would build a
`PythonFunction("pkg", "")` and fail much later inside the interpreter with an
obscure error.
A small validation here — something like: both empty → Java; exactly one set
→ reject with a message naming the action; both set → Python — would fail at
the layer where the user can actually correct it. (Same concern applies in
mirror on the Python side: does `@action(target=...)` reject a `JavaFunction`
with an empty `method_name` or `parameter_types`?)
##########
plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java:
##########
@@ -172,8 +172,16 @@ private Object deserializePythonConfig(JsonNode node) {
List<Object> list = new ArrayList<>();
node.forEach(element ->
list.add(deserializePythonConfig(element)));
return list;
- } else if (node.isValueNode()) {
- return node.asText();
+ } else if (node.isNull()) {
Review Comment:
Nice — splitting `node.asText()` into typed branches
(`booleanValue`/`intValue`/`longValue`/`doubleValue`/`textValue`) restores the
type information that Python-side config carries through the JSON. Pre-change,
a Python config of `{"max_iters": 10}` would have come back as the string
`"10"` on the Java side. Worth a callout in the PR description as an incidental
correctness fix shipped along with the cross-language work.
##########
python/flink_agents/api/tests/test_cross_language_event_snapshots.py:
##########
@@ -0,0 +1,455 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Cross-language event SerDe snapshot tests."""
+
+import json
+import os
+from pathlib import Path
+from typing import ClassVar
+from uuid import UUID
+
+import pytest
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.events.chat_event import ChatRequestEvent,
ChatResponseEvent
+from flink_agents.api.events.context_retrieval_event import (
+ ContextRetrievalRequestEvent,
+ ContextRetrievalResponseEvent,
+)
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.events.tool_event import ToolRequestEvent,
ToolResponseEvent
+from flink_agents.api.vector_stores.vector_store import Document
+
+_REPO_ROOT = Path(__file__).resolve().parents[4]
+_SNAPSHOT_DIR = _REPO_ROOT / "e2e-test" / "cross-language-event-snapshots"
+
+_FIXED_EVENT_ID = UUID("00000000-0000-0000-0000-000000000001")
+_FIXED_REQUEST_ID = UUID("00000000-0000-0000-0000-000000000002")
+_FIXED_TOOL_CALL_ID = "call_aaaa"
+
+
+def _regenerate_enabled() -> bool:
+ return os.environ.get("REGENERATE_SNAPSHOTS", "").lower() in {"1", "true",
"yes"}
+
+
+def _force_id(event: Event, fixed_id: UUID) -> Event:
+ object.__setattr__(event, "id", fixed_id)
+ return event
+
+
+def _write_python_snapshot(name: str, event: Event) -> None:
+ target = _SNAPSHOT_DIR / "python" / name
+ target.parent.mkdir(parents=True, exist_ok=True)
+ target.write_text(event.model_dump_json(indent=2) + "\n")
+
+
+def _assert_python_snapshot_stable(name: str, event: Event) -> None:
+ actual = json.loads(event.model_dump_json())
+ committed_path = _SNAPSHOT_DIR / "python" / name
+ if not committed_path.exists():
Review Comment:
Mirror of the same concern on the Java side: `pytest.skip` when the
committed snapshot is missing turns a real wire-format regression into a silent
pass-as-skipped. The `REGENERATE_SNAPSHOTS=1` skips on the regenerate tests are
legitimate (the env var is the gate), but the stability / round-trip helpers
(lines 64 and 76) would be stronger as `pytest.fail(...)` or a plain `assert
committed_path.exists()` — the snapshot suite is the only thing pinning the
cross-language wire format, so a missing file should be loud, not quiet.
##########
api/src/test/java/org/apache/flink/agents/api/CrossLanguageEventSnapshotTest.java:
##########
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.OutputSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.tools.ToolResponse;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Cross-language event SerDe snapshot tests. */
+class CrossLanguageEventSnapshotTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private static final UUID FIXED_EVENT_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000001");
+ private static final UUID FIXED_REQUEST_ID =
+ UUID.fromString("00000000-0000-0000-0000-000000000002");
+ private static final String FIXED_TOOL_CALL_ID = "call_aaaa";
+ private static final long FIXED_TIMESTAMP = 1_700_000_000_000L;
+
+ private static Path snapshotDir;
+
+ @BeforeAll
+ static void resolveSnapshotDir() {
+ Path repoRoot = Paths.get(System.getProperty("user.dir")).getParent();
+ snapshotDir =
repoRoot.resolve("e2e-test/cross-language-event-snapshots");
+ }
+
+ // ── Helpers ────────────────────────────────────────────────────────────
+
+ private static boolean regenerateRequested() {
+ return Boolean.parseBoolean(System.getProperty("regenerate.snapshots",
"false"));
+ }
+
+ private static void writeJavaSnapshot(String fileName, Event event) throws
Exception {
+ String json =
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(event);
+ Path target = snapshotDir.resolve("java/" + fileName);
+ Files.createDirectories(target.getParent());
+ Files.writeString(target, json + "\n");
+ }
+
+ private static void assertJavaSnapshotStable(String fileName, Event event)
throws Exception {
+ String actualJson = MAPPER.writeValueAsString(event);
+ JsonNode actual = MAPPER.readTree(actualJson);
+
+ Path committed = snapshotDir.resolve("java/" + fileName);
+ assumeTrue(
Review Comment:
`assumeTrue` here means the test reports as "skipped, not failed" if
`chat_request_event.json` (or any other committed snapshot) gets renamed or
deleted. The whole point of the snapshot suite is to catch silent wire-format
drift, so a silently-skipped "stable" test undercuts that guarantee.
The `regenerate*` tests genuinely need conditional skipping (the system
property is the gate), but the `*JavaSnapshotIsStable` and
`javaCanDeserialize*FromPythonSnapshot` cases would be stronger as hard
assertions — committed snapshot missing means the test must fail. Worth doing
the same on `readPythonSnapshot` at line 111.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]