addu390 commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315600549
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor
pythonActionExecutor) thro
/**
* Extracts the downstream output payload from an output {@link Event}.
*
- * <p>For a Java {@link OutputEvent}, returns the payload directly. For
Python-side output
- * events (cross-language unified {@link Event} with output type), the
event is JSON-serialized
- * and handed to {@link
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+ * <p>Dispatch is by pipeline wire format, not action language:
+ *
+ * <ul>
+ * <li>Java pipelines ({@code inputIsJava}) emit the raw payload
directly.
+ * <li>Python pipelines re-encode through {@link
+ * PythonActionExecutor#getOutputFromOutputEvent(String)} so the
downstream Python sink
+ * receives cloudpickle bytes.
+ * </ul>
*
* @param event the output event (must satisfy {@link
EventUtil#isOutputEvent(Event)}).
- * @param pythonActionExecutor the Python action executor (used only for
Python output events).
+ * @param pythonActionExecutor used only on Python pipelines.
* @return the typed output payload.
*/
@SuppressWarnings("unchecked")
OUT getOutputFromOutputEvent(Event event, PythonActionExecutor
pythonActionExecutor) {
checkState(EventUtil.isOutputEvent(event));
- if (event instanceof OutputEvent) {
- return (OUT) ((OutputEvent) event).getOutput();
- } else {
- // Python output events arrive as unified Event with type
"_output_event".
- // Pass the JSON representation to Python for extraction.
- try {
- String eventJson = new
ObjectMapper().writeValueAsString(event);
- Object outputFromOutputEvent =
-
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
- return (OUT) outputFromOutputEvent;
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to extract output from event: " +
event.getType(), e);
- }
+ OutputEvent typedEvent =
+ (event instanceof OutputEvent) ? (OutputEvent) event :
OutputEvent.fromEvent(event);
+ if (inputIsJava) {
+ return (OUT) typedEvent.getOutput();
Review Comment:
Missed the tests, added 3 new unit tests in `EventRouterTest`,
`JavaAgentWithPythonActionTest` would be the Pemja regression guard and also
added a `call-outs` section in the PR description.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]