weiqingy commented on code in PR #709:
URL: https://github.com/apache/flink-agents/pull/709#discussion_r3315347855
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java:
##########
@@ -186,31 +186,33 @@ Event wrapToInputEvent(IN input, PythonActionExecutor
pythonActionExecutor) thro
/**
* Extracts the downstream output payload from an output {@link Event}.
*
- * <p>For a Java {@link OutputEvent}, returns the payload directly. For
Python-side output
- * events (cross-language unified {@link Event} with output type), the
event is JSON-serialized
- * and handed to {@link
PythonActionExecutor#getOutputFromOutputEvent(String)} for extraction.
+ * <p>Dispatch is by pipeline wire format, not action language:
+ *
+ * <ul>
+ * <li>Java pipelines ({@code inputIsJava}) emit the raw payload
directly.
+ * <li>Python pipelines re-encode through {@link
+ * PythonActionExecutor#getOutputFromOutputEvent(String)} so the
downstream Python sink
+ * receives cloudpickle bytes.
+ * </ul>
*
* @param event the output event (must satisfy {@link
EventUtil#isOutputEvent(Event)}).
- * @param pythonActionExecutor the Python action executor (used only for
Python output events).
+ * @param pythonActionExecutor used only on Python pipelines.
* @return the typed output payload.
*/
@SuppressWarnings("unchecked")
OUT getOutputFromOutputEvent(Event event, PythonActionExecutor
pythonActionExecutor) {
checkState(EventUtil.isOutputEvent(event));
- if (event instanceof OutputEvent) {
- return (OUT) ((OutputEvent) event).getOutput();
- } else {
- // Python output events arrive as unified Event with type
"_output_event".
- // Pass the JSON representation to Python for extraction.
- try {
- String eventJson = new
ObjectMapper().writeValueAsString(event);
- Object outputFromOutputEvent =
-
pythonActionExecutor.getOutputFromOutputEvent(eventJson);
- return (OUT) outputFromOutputEvent;
- } catch (Exception e) {
- throw new IllegalStateException(
- "Failed to extract output from event: " +
event.getType(), e);
- }
+ OutputEvent typedEvent =
+ (event instanceof OutputEvent) ? (OutputEvent) event :
OutputEvent.fromEvent(event);
+ if (inputIsJava) {
+ return (OUT) typedEvent.getOutput();
Review Comment:
For the `inputIsJava == true && event is unified Event` case (e.g. a Python
action emitting `_output_event` in a Java pipeline), this now calls
`OutputEvent.fromEvent(event).getOutput()` and bypasses `PythonActionExecutor`
entirely. The old branch routed everything that wasn't `instanceof OutputEvent`
through Python — so this is a deliberate behavior change to a hot path, not
just a refactor, and it affects every output event in every pipeline, not just
cross-language ones.
Two asks:
- Could this be called out in the PR description so reviewers (and the 0.3
release notes) don't miss it?
- Is there a test under `runtime/` that specifically pins the Java-pipeline
+ Python-action output path so a future Pemja change doesn't silently regress
payload reconstruction? `CrossLanguageActionRuntimeTest` exercises dispatch —
point me at the one covering the recovered output type/value if I missed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]