[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905138#comment-15905138 ]
ASF GitHub Bot commented on FLINK-4460: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3484#discussion_r105398836 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java --- @@ -300,6 +303,36 @@ private StreamGraph generateInternal(List<StreamTransformation<?>> transformatio } /** + * Transforms a {@code SideOutputTransformation}. + * + * <p> + * For this we create a virtual node in the {@code StreamGraph} that holds the side-output + * {@link org.apache.flink.util.OutputTag}. + * + * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator + */ + private <T> Collection<Integer> transformSideOutput(SideOutputTransformation<T> sideOutput) { + StreamTransformation<T> input = sideOutput.getInput(); + Collection<Integer> resultIds = transform(input); + + + // the recursive transform might have already transformed this + if (alreadyTransformed.containsKey(sideOutput)) { + return alreadyTransformed.get(sideOutput); + } + + List<Integer> virtualResultIds = new ArrayList<>(); + + for (int inputId : resultIds) { + int virtualId = StreamTransformation.getNewNodeId(); + streamGraph.addVirtualSideOutputNode(inputId, virtualId, sideOutput.getOutputTag()); + virtualResultIds.add(virtualId); + } + return virtualResultIds; + } + + --- End diff -- Leave only one empty line. > Side Outputs in Flink > --------------------- > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API > Affects Versions: 1.2.0, 1.1.3 > Reporter: Chen Qin > Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)