addu390 commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315600549


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor 
pythonActionExecutor) thro
     /**
      * Extracts the downstream output payload from an output {@link Event}.
      *
-     * <p>For a Java {@link OutputEvent}, returns the payload directly. For 
Python-side output
-     * events (cross-language unified {@link Event} with output type), the 
event is JSON-serialized
-     * and handed to {@link 
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+     * <p>Dispatch is by pipeline wire format, not action language:
+     *
+     * <ul>
+     *   <li>Java pipelines ({@code inputIsJava}) emit the raw payload 
directly.
+     *   <li>Python pipelines re-encode through {@link
+     *       PythonActionExecutor#getOutputFromOutputEvent(String)} so the 
downstream Python sink
+     *       receives cloudpickle bytes.
+     * </ul>
      *
      * @param event the output event (must satisfy {@link 
EventUtil#isOutputEvent(Event)}).
-     * @param pythonActionExecutor the Python action executor (used only for 
Python output events).
+     * @param pythonActionExecutor used only on Python pipelines.
      * @return the typed output payload.
      */
     @SuppressWarnings("unchecked")
     OUT getOutputFromOutputEvent(Event event, PythonActionExecutor 
pythonActionExecutor) {
         checkState(EventUtil.isOutputEvent(event));
-        if (event instanceof OutputEvent) {
-            return (OUT) ((OutputEvent) event).getOutput();
-        } else {
-            // Python output events arrive as unified Event with type 
"_output_event".
-            // Pass the JSON representation to Python for extraction.
-            try {
-                String eventJson = new 
ObjectMapper().writeValueAsString(event);
-                Object outputFromOutputEvent =
-                        
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
-                return (OUT) outputFromOutputEvent;
-            } catch (Exception e) {
-                throw new IllegalStateException(
-                        "Failed to extract output from event: " + 
event.getType(), e);
-            }
+        OutputEvent typedEvent =
+                (event instanceof OutputEvent) ? (OutputEvent) event : 
OutputEvent.fromEvent(event);
+        if (inputIsJava) {
+            return (OUT) typedEvent.getOutput();

Review Comment:
   Missed the tests, added 3 new unit tests in `EventRouterTest`, 
`JavaAgentWithPythonActionTest` would be the Pemja regression guard and also 
added a `call-outs` section in the PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to