zhipeng93 commented on code in PR #215:
URL: https://github.com/apache/flink-ml/pull/215#discussion_r1168127178


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java:
##########
@@ -152,41 +155,52 @@ private static <OUT> DataStream<OUT> 
cacheBroadcastVariables(
     }
 
     /**
-     * uses {@link DraftExecutionEnvironment} to execute the 
userDefinedFunction and returns the
+     * Uses {@link DraftExecutionEnvironment} to execute the 
userDefinedFunction and returns the
      * resultStream.
      *
-     * @param env execution environment.
-     * @param inputList non-broadcast input list.
-     * @param broadcastStreamNames names of the broadcast data streams.
-     * @param graphBuilder user-defined logic.
-     * @param <OUT> output type of the result stream.
-     * @return the result stream by applying user-defined logic on the input 
list.
+     * @param env Execution environment.
+     * @param inputList Non-broadcast input list.
+     * @param broadcastStreamNames Names of the broadcast data streams.
+     * @param graphBuilder User-defined logic.
+     * @param <OUT> Output type of the result stream.
+     * @return The result stream by applying user-defined logic on the input 
list.
      */
     private static <OUT> DataStream<OUT> getResultStream(
             StreamExecutionEnvironment env,
             List<DataStream<?>> inputList,
             String[] broadcastStreamNames,
             Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
-        TypeInformation<?>[] inTypes = new TypeInformation[inputList.size()];
-        for (int i = 0; i < inputList.size(); i++) {
-            inTypes[i] = inputList.get(i).getType();
-        }
-        // do not block all non-broadcast input edges by default.
-        boolean[] isBlocked = new boolean[inputList.size()];
-        Arrays.fill(isBlocked, false);
+
+        // Executes the graph builder and gets real non-broadcast inputs.
         DraftExecutionEnvironment draftEnv =
-                new DraftExecutionEnvironment(
-                        env, new BroadcastWrapper<>(broadcastStreamNames, 
inTypes, isBlocked));
+                new DraftExecutionEnvironment(env, new DefaultWrapper<>());
 
         List<DataStream<?>> draftSources = new ArrayList<>();
         for (DataStream<?> dataStream : inputList) {
             draftSources.add(draftEnv.addDraftSource(dataStream, 
dataStream.getType()));
         }
         DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
-        Preconditions.checkState(
-                draftEnv.getStreamGraph(false).getStreamNodes().size() == 1 + 
inputList.size(),
-                "cannot add more than one operator in withBroadcastStream's 
lambda function.");
-        draftEnv.copyToActualEnvironment();
-        return draftEnv.getActualStream(draftOutStream.getId());
+
+        List<Transformation<?>> realNonBroadcastInputs =
+                draftOutStream.getTransformation().getInputs();

Review Comment:
   The `realNonBroadcastInputs` is not equivalent to the `inputList` here. 
   
   For example, when calling `withBroadcastStreams()` with join as a udf (see 
[1]), the inputList are `source1` and `source2`, but the real non-broadcast 
input is a keyedStream generated following [2].
   
   I have also updated the java doc of `withBroadcastStream` and explained that 
only the result stream can access the broadcast variables.
   
   [1] 
https://github.com/apache/flink-ml/pull/215/files#diff-92d87f6ba02ee90f721294e1d720d7a56b6b911ee8b5c2e00e0f0ce39fb21359R134
   [2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java#L388



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to