weiqingy commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315347855


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor 
pythonActionExecutor) thro
     /**
      * Extracts the downstream output payload from an output {@link Event}.
      *
-     * <p>For a Java {@link OutputEvent}, returns the payload directly. For 
Python-side output
-     * events (cross-language unified {@link Event} with output type), the 
event is JSON-serialized
-     * and handed to {@link 
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+     * <p>Dispatch is by pipeline wire format, not action language:
+     *
+     * <ul>
+     *   <li>Java pipelines ({@code inputIsJava}) emit the raw payload 
directly.
+     *   <li>Python pipelines re-encode through {@link
+     *       PythonActionExecutor#getOutputFromOutputEvent(String)} so the 
downstream Python sink
+     *       receives cloudpickle bytes.
+     * </ul>
      *
      * @param event the output event (must satisfy {@link 
EventUtil#isOutputEvent(Event)}).
-     * @param pythonActionExecutor the Python action executor (used only for 
Python output events).
+     * @param pythonActionExecutor used only on Python pipelines.
      * @return the typed output payload.
      */
     @SuppressWarnings("unchecked")
     OUT getOutputFromOutputEvent(Event event, PythonActionExecutor 
pythonActionExecutor) {
         checkState(EventUtil.isOutputEvent(event));
-        if (event instanceof OutputEvent) {
-            return (OUT) ((OutputEvent) event).getOutput();
-        } else {
-            // Python output events arrive as unified Event with type 
"_output_event".
-            // Pass the JSON representation to Python for extraction.
-            try {
-                String eventJson = new 
ObjectMapper().writeValueAsString(event);
-                Object outputFromOutputEvent =
-                        
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
-                return (OUT) outputFromOutputEvent;
-            } catch (Exception e) {
-                throw new IllegalStateException(
-                        "Failed to extract output from event: " + 
event.getType(), e);
-            }
+        OutputEvent typedEvent =
+                (event instanceof OutputEvent) ? (OutputEvent) event : 
OutputEvent.fromEvent(event);
+        if (inputIsJava) {
+            return (OUT) typedEvent.getOutput();

Review Comment:
   For the `inputIsJava == true && event is unified Event` case (e.g. a Python 
action emitting `_output_event` in a Java pipeline), this now calls 
`OutputEvent.fromEvent(event).getOutput()` and bypasses `PythonActionExecutor` 
entirely. The old branch routed everything that wasn't `instanceof OutputEvent` 
through Python — so this is a deliberate behavior change to a hot path, not 
just a refactor, and it affects every output event in every pipeline, not just 
cross-language ones.
   
   Two asks:
   - Could this be called out in the PR description so reviewers (and the 0.3 
release notes) don't miss it?
   - Is there a test under `runtime/` that specifically pins the Java-pipeline 
+ Python-action output path so a future Pemja change doesn't silently regress 
payload reconstruction? `CrossLanguageActionRuntimeTest` exercises dispatch but 
I didn't spot one that asserts on the recovered output type/value here.



##########
plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java:
##########
@@ -228,20 +228,30 @@ private void extractActionsFromAgent(Agent agent) throws 
Exception {
         // Scan the agent class for methods annotated with @Action
         Class<?> agentClass = agent.getClass();
         for (Method method : agentClass.getDeclaredMethods()) {
-            if 
(method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
 {
-                org.apache.flink.agents.api.annotation.Action actionAnnotation 
=
-                        
method.getAnnotation(org.apache.flink.agents.api.annotation.Action.class);
-
-                String[] listenEventTypeStrings =
-                        
Objects.requireNonNull(actionAnnotation).listenEventTypes();
-
-                org.apache.flink.agents.plan.JavaFunction javaFunction =
+            if 
(!method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class))
 {
+                continue;
+            }
+            org.apache.flink.agents.api.annotation.Action actionAnnotation =
+                    Objects.requireNonNull(
+                            method.getAnnotation(
+                                    
org.apache.flink.agents.api.annotation.Action.class));
+            String[] listenEventTypeStrings = 
actionAnnotation.listenEventTypes();
+            org.apache.flink.agents.api.annotation.PythonFunction target =
+                    actionAnnotation.target();
+
+            org.apache.flink.agents.plan.Function execFunction;
+            if (target.module().isEmpty()) {

Review Comment:
   The `module().isEmpty()` branch picks Java vs Python, but `qualname()` is 
read unconditionally on the Python path without a non-empty check. 
`@Action(target = @PythonFunction(module = "pkg"))` (no qualname) would build a 
`PythonFunction("pkg", "")` and fail much later inside the interpreter with an 
obscure error.
   
   A small validation here — something like: both empty → Java; exactly one set 
→ reject with a message naming the action; both set → Python — would fail at 
the layer where the user can actually correct it. (Same concern applies in 
mirror on the Python side: does `@action(target=...)` reject a `JavaFunction` 
with an empty `method_name` or `parameter_types`?)



##########
plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonDeserializer.java:
##########
@@ -172,8 +172,16 @@ private Object deserializePythonConfig(JsonNode node) {
             List<Object> list = new ArrayList<>();
             node.forEach(element -> 
list.add(deserializePythonConfig(element)));
             return list;
-        } else if (node.isValueNode()) {
-            return node.asText();
+        } else if (node.isNull()) {

Review Comment:
   Nice — splitting `node.asText()` into typed branches 
(`booleanValue`/`intValue`/`longValue`/`doubleValue`/`textValue`) restores the 
type information that Python-side config carries through the JSON. Pre-change, 
a Python config of `{"max_iters": 10}` would have come back as the string 
`"10"` on the Java side. Worth a callout in the PR description as an incidental 
correctness fix shipped along with the cross-language work.



##########
python/flink_agents/api/tests/test_cross_language_event_snapshots.py:
##########
@@ -0,0 +1,455 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Cross-language event SerDe snapshot tests."""
+
+import json
+import os
+from pathlib import Path
+from typing import ClassVar
+from uuid import UUID
+
+import pytest
+
+from flink_agents.api.chat_message import ChatMessage, MessageRole
+from flink_agents.api.events.chat_event import ChatRequestEvent, 
ChatResponseEvent
+from flink_agents.api.events.context_retrieval_event import (
+    ContextRetrievalRequestEvent,
+    ContextRetrievalResponseEvent,
+)
+from flink_agents.api.events.event import Event, InputEvent, OutputEvent
+from flink_agents.api.events.tool_event import ToolRequestEvent, 
ToolResponseEvent
+from flink_agents.api.vector_stores.vector_store import Document
+
+_REPO_ROOT = Path(__file__).resolve().parents[4]
+_SNAPSHOT_DIR = _REPO_ROOT / "e2e-test" / "cross-language-event-snapshots"
+
+_FIXED_EVENT_ID = UUID("00000000-0000-0000-0000-000000000001")
+_FIXED_REQUEST_ID = UUID("00000000-0000-0000-0000-000000000002")
+_FIXED_TOOL_CALL_ID = "call_aaaa"
+
+
+def _regenerate_enabled() -> bool:
+    return os.environ.get("REGENERATE_SNAPSHOTS", "").lower() in {"1", "true", 
"yes"}
+
+
+def _force_id(event: Event, fixed_id: UUID) -> Event:
+    object.__setattr__(event, "id", fixed_id)
+    return event
+
+
+def _write_python_snapshot(name: str, event: Event) -> None:
+    target = _SNAPSHOT_DIR / "python" / name
+    target.parent.mkdir(parents=True, exist_ok=True)
+    target.write_text(event.model_dump_json(indent=2) + "\n")
+
+
+def _assert_python_snapshot_stable(name: str, event: Event) -> None:
+    actual = json.loads(event.model_dump_json())
+    committed_path = _SNAPSHOT_DIR / "python" / name
+    if not committed_path.exists():

Review Comment:
   Mirror of the same concern on the Java side: `pytest.skip` when the 
committed snapshot is missing turns a real wire-format regression into a silent 
pass-as-skipped. The `REGENERATE_SNAPSHOTS=1` skips on the regenerate tests are 
legitimate (the env var is the gate), but the stability / round-trip helpers 
(lines 64 and 76) would be stronger as `pytest.fail(...)` or a plain `assert 
committed_path.exists()` — the snapshot suite is the only thing pinning the 
cross-language wire format, so a missing file should be loud, not quiet.



##########
api/src/test/java/org/apache/flink/agents/api/CrossLanguageEventSnapshotTest.java:
##########
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.agents.OutputSchema;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.tools.ToolResponse;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+/** Cross-language event SerDe snapshot tests. */
+class CrossLanguageEventSnapshotTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private static final UUID FIXED_EVENT_ID =
+            UUID.fromString("00000000-0000-0000-0000-000000000001");
+    private static final UUID FIXED_REQUEST_ID =
+            UUID.fromString("00000000-0000-0000-0000-000000000002");
+    private static final String FIXED_TOOL_CALL_ID = "call_aaaa";
+    private static final long FIXED_TIMESTAMP = 1_700_000_000_000L;
+
+    private static Path snapshotDir;
+
+    @BeforeAll
+    static void resolveSnapshotDir() {
+        Path repoRoot = Paths.get(System.getProperty("user.dir")).getParent();
+        snapshotDir = 
repoRoot.resolve("e2e-test/cross-language-event-snapshots");
+    }
+
+    // ── Helpers ────────────────────────────────────────────────────────────
+
+    private static boolean regenerateRequested() {
+        return Boolean.parseBoolean(System.getProperty("regenerate.snapshots", 
"false"));
+    }
+
+    private static void writeJavaSnapshot(String fileName, Event event) throws 
Exception {
+        String json = 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(event);
+        Path target = snapshotDir.resolve("java/" + fileName);
+        Files.createDirectories(target.getParent());
+        Files.writeString(target, json + "\n");
+    }
+
+    private static void assertJavaSnapshotStable(String fileName, Event event) 
throws Exception {
+        String actualJson = MAPPER.writeValueAsString(event);
+        JsonNode actual = MAPPER.readTree(actualJson);
+
+        Path committed = snapshotDir.resolve("java/" + fileName);
+        assumeTrue(

Review Comment:
   `assumeTrue` here means the test reports as "skipped, not failed" if 
`chat_request_event.json` (or any other committed snapshot) gets renamed or 
deleted. The whole point of the snapshot suite is to catch silent wire-format 
drift, so a silently-skipped "stable" test undercuts that guarantee.
   
   The `regenerate*` tests genuinely need conditional skipping (the system 
property is the gate), but the `*JavaSnapshotIsStable` and 
`javaCanDeserialize*FromPythonSnapshot` cases would be stronger as hard 
assertions — committed snapshot missing means the test must fail. Worth doing 
the same on `readPythonSnapshot` at line 111.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to