weiqingy commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315347855


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor 
pythonActionExecutor) thro
     /**
      * Extracts the downstream output payload from an output {@link Event}.
      *
-     * <p>For a Java {@link OutputEvent}, returns the payload directly. For 
Python-side output
-     * events (cross-language unified {@link Event} with output type), the 
event is JSON-serialized
-     * and handed to {@link 
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+     * <p>Dispatch is by pipeline wire format, not action language:
+     *
+     * <ul>
+     *   <li>Java pipelines ({@code inputIsJava}) emit the raw payload 
directly.
+     *   <li>Python pipelines re-encode through {@link
+     *       PythonActionExecutor#getOutputFromOutputEvent(String)} so the 
downstream Python sink
+     *       receives cloudpickle bytes.
+     * </ul>
      *
      * @param event the output event (must satisfy {@link 
EventUtil#isOutputEvent(Event)}).
-     * @param pythonActionExecutor the Python action executor (used only for 
Python output events).
+     * @param pythonActionExecutor used only on Python pipelines.
      * @return the typed output payload.
      */
     @SuppressWarnings("unchecked")
     OUT getOutputFromOutputEvent(Event event, PythonActionExecutor 
pythonActionExecutor) {
         checkState(EventUtil.isOutputEvent(event));
-        if (event instanceof OutputEvent) {
-            return (OUT) ((OutputEvent) event).getOutput();
-        } else {
-            // Python output events arrive as unified Event with type 
"_output_event".
-            // Pass the JSON representation to Python for extraction.
-            try {
-                String eventJson = new 
ObjectMapper().writeValueAsString(event);
-                Object outputFromOutputEvent =
-                        
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
-                return (OUT) outputFromOutputEvent;
-            } catch (Exception e) {
-                throw new IllegalStateException(
-                        "Failed to extract output from event: " + 
event.getType(), e);
-            }
+        OutputEvent typedEvent =
+                (event instanceof OutputEvent) ? (OutputEvent) event : 
OutputEvent.fromEvent(event);
+        if (inputIsJava) {
+            return (OUT) typedEvent.getOutput();

Review Comment:
   For the `inputIsJava == true && event is unified Event` case (e.g. a Python 
action emitting `_output_event` in a Java pipeline), this now calls 
`OutputEvent.fromEvent(event).getOutput()` and bypasses `PythonActionExecutor` 
entirely. The old branch routed everything that wasn't `instanceof OutputEvent` 
through Python — so this is a deliberate behavior change to a hot path, not 
just a refactor, and it affects every output event in every pipeline, not just 
cross-language ones.
   
   Two asks:
   - Could this be called out in the PR description so reviewers (and the 0.3 
release notes) don't miss it?
   - Is there a test under `runtime/` that specifically pins the Java-pipeline 
+ Python-action output path so a future Pemja change doesn't silently regress 
payload reconstruction? `CrossLanguageActionRuntimeTest` exercises dispatch  — 
point me at the one covering the recovered output type/value if I missed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to