zhipeng93 commented on code in PR #215:
URL: https://github.com/apache/flink-ml/pull/215#discussion_r1168127178


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java:
##########
@@ -152,41 +155,52 @@ private static <OUT> DataStream<OUT> 
cacheBroadcastVariables(
     }
 
     /**
-     * uses {@link DraftExecutionEnvironment} to execute the 
userDefinedFunction and returns the
+     * Uses {@link DraftExecutionEnvironment} to execute the 
userDefinedFunction and returns the
      * resultStream.
      *
-     * @param env execution environment.
-     * @param inputList non-broadcast input list.
-     * @param broadcastStreamNames names of the broadcast data streams.
-     * @param graphBuilder user-defined logic.
-     * @param <OUT> output type of the result stream.
-     * @return the result stream by applying user-defined logic on the input 
list.
+     * @param env Execution environment.
+     * @param inputList Non-broadcast input list.
+     * @param broadcastStreamNames Names of the broadcast data streams.
+     * @param graphBuilder User-defined logic.
+     * @param <OUT> Output type of the result stream.
+     * @return The result stream by applying user-defined logic on the input 
list.
      */
     private static <OUT> DataStream<OUT> getResultStream(
             StreamExecutionEnvironment env,
             List<DataStream<?>> inputList,
             String[] broadcastStreamNames,
             Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
-        TypeInformation<?>[] inTypes = new TypeInformation[inputList.size()];
-        for (int i = 0; i < inputList.size(); i++) {
-            inTypes[i] = inputList.get(i).getType();
-        }
-        // do not block all non-broadcast input edges by default.
-        boolean[] isBlocked = new boolean[inputList.size()];
-        Arrays.fill(isBlocked, false);
+
+        // Executes the graph builder and gets real non-broadcast inputs.
         DraftExecutionEnvironment draftEnv =
-                new DraftExecutionEnvironment(
-                        env, new BroadcastWrapper<>(broadcastStreamNames, 
inTypes, isBlocked));
+                new DraftExecutionEnvironment(env, new DefaultWrapper<>());
 
         List<DataStream<?>> draftSources = new ArrayList<>();
         for (DataStream<?> dataStream : inputList) {
             draftSources.add(draftEnv.addDraftSource(dataStream, 
dataStream.getType()));
         }
         DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
-        Preconditions.checkState(
-                draftEnv.getStreamGraph(false).getStreamNodes().size() == 1 + 
inputList.size(),
-                "cannot add more than one operator in withBroadcastStream's 
lambda function.");
-        draftEnv.copyToActualEnvironment();
-        return draftEnv.getActualStream(draftOutStream.getId());
+
+        List<Transformation<?>> realNonBroadcastInputs =
+                draftOutStream.getTransformation().getInputs();

Review Comment:
   The `realNonBroadcastInputs` is not equivalent to the `inputList` here. 
   
   When processing the non-broadcast streams, we need to block until all the 
broadacast streams are consumed. However, the `inputList` are not always the 
real non-broadcast streams.
   
   For example, when calling `withBroadcastStreams()` with join as a udf (see 
[1]), the inputList are `source1` and `source2`, but the real non-broadcast 
input is a keyedStream generated following [2].
   
   [1] 
https://github.com/apache/flink-ml/pull/215/files#diff-92d87f6ba02ee90f721294e1d720d7a56b6b911ee8b5c2e00e0f0ce39fb21359R134
   [2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L388



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to