kl0u commented on a change in pull request #14312:
URL: https://github.com/apache/flink/pull/14312#discussion_r545134296



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -205,26 +212,46 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
                        final TypeInformation<OUT> outTypeInfo) {
 
                Preconditions.checkNotNull(function);
-               Preconditions.checkArgument(!(nonBroadcastStream instanceof 
KeyedStream),
+               Preconditions.checkArgument(
+                               !(nonBroadcastStream instanceof KeyedStream),
                                "A BroadcastProcessFunction can only be used on 
a non-keyed stream.");
 
-               TwoInputStreamOperator<IN1, IN2, OUT> operator =
-                               new 
CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
-               return transform("Co-Process-Broadcast", outTypeInfo, operator);
+               return transform("Co-Process-Broadcast", function, outTypeInfo);
        }
 
        @Internal
        private <OUT> SingleOutputStreamOperator<OUT> transform(
                        final String functionName,
-                       final TypeInformation<OUT> outTypeInfo,
-                       final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+                       BroadcastProcessFunction<IN1, IN2, OUT> userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -205,26 +212,46 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
                        final TypeInformation<OUT> outTypeInfo) {
 
                Preconditions.checkNotNull(function);
-               Preconditions.checkArgument(!(nonBroadcastStream instanceof 
KeyedStream),
+               Preconditions.checkArgument(
+                               !(nonBroadcastStream instanceof KeyedStream),
                                "A BroadcastProcessFunction can only be used on 
a non-keyed stream.");
 
-               TwoInputStreamOperator<IN1, IN2, OUT> operator =
-                               new 
CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
-               return transform("Co-Process-Broadcast", outTypeInfo, operator);
+               return transform("Co-Process-Broadcast", function, outTypeInfo);
        }
 
        @Internal
        private <OUT> SingleOutputStreamOperator<OUT> transform(
                        final String functionName,
-                       final TypeInformation<OUT> outTypeInfo,
-                       final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+                       BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+                       final TypeInformation<OUT> outTypeInfo) {
 
                // read the output type of the input Transforms to coax out 
errors about MissingTypeInfo
                nonBroadcastStream.getType();
                broadcastStream.getType();
 
                final BroadcastStateTransformation<IN1, IN2, OUT> 
transformation =
-                               getBroadcastStateTransformation(functionName, 
outTypeInfo, operator);
+                               getBroadcastStateTransformation(functionName, 
clean(userFunction), outTypeInfo);
+
+               @SuppressWarnings({"unchecked", "rawtypes"})
+               final SingleOutputStreamOperator<OUT> returnStream =
+                               new SingleOutputStreamOperator(environment, 
transformation);
+
+               getExecutionEnvironment().addOperator(transformation);
+               return returnStream;
+       }
+
+       @Internal
+       private <OUT> SingleOutputStreamOperator<OUT> transform(
+                       final String functionName,
+                       KeyedBroadcastProcessFunction<?, IN1, IN2, OUT> 
userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -236,26 +263,32 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
 
        private <OUT> BroadcastStateTransformation<IN1, IN2, OUT> 
getBroadcastStateTransformation(
                        final String functionName,
-                       final TypeInformation<OUT> outTypeInfo,
-                       final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-               if (nonBroadcastStream instanceof KeyedStream) {
-                       return BroadcastStateTransformation.forKeyedStream(
-                                       functionName,
-                                       (KeyedStream<IN1, ?>) 
nonBroadcastStream,
-                                       broadcastStream,
-                                       SimpleOperatorFactory.of(operator),
-                                       outTypeInfo,
-                                       environment.getParallelism());
-               } else {
-                       return BroadcastStateTransformation.forNonKeyedStream(
-                                       functionName,
-                                       nonBroadcastStream,
-                                       broadcastStream,
-                                       SimpleOperatorFactory.of(operator),
-                                       outTypeInfo,
-                                       environment.getParallelism());
-               }
+                       BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+                       final TypeInformation<OUT> outTypeInfo) {
+
+               return BroadcastStateTransformation.forStream(
+                               functionName,
+                               nonBroadcastStream,
+                               broadcastStream,
+                               userFunction,
+                               broadcastStateDescriptors,
+                               outTypeInfo,
+                               environment.getParallelism());
+       }
+
+       private <KS, OUT> KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> 
getBroadcastStateTransformation(
+                       final String functionName,
+                       KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
userFunction,

Review comment:
       Add `final` for uniformity.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/BroadcastStateTransformation.java
##########
@@ -53,111 +50,91 @@
 @Internal
 public class BroadcastStateTransformation<IN1, IN2, OUT> extends 
PhysicalTransformation<OUT> {

Review comment:
       The two BroadcastTransformations could have a base class without the 
key-related fields and the function.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
##########
@@ -236,26 +263,32 @@ public StreamExecutionEnvironment 
getExecutionEnvironment() {
 
        private <OUT> BroadcastStateTransformation<IN1, IN2, OUT> 
getBroadcastStateTransformation(
                        final String functionName,
-                       final TypeInformation<OUT> outTypeInfo,
-                       final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-
-               if (nonBroadcastStream instanceof KeyedStream) {
-                       return BroadcastStateTransformation.forKeyedStream(
-                                       functionName,
-                                       (KeyedStream<IN1, ?>) 
nonBroadcastStream,
-                                       broadcastStream,
-                                       SimpleOperatorFactory.of(operator),
-                                       outTypeInfo,
-                                       environment.getParallelism());
-               } else {
-                       return BroadcastStateTransformation.forNonKeyedStream(
-                                       functionName,
-                                       nonBroadcastStream,
-                                       broadcastStream,
-                                       SimpleOperatorFactory.of(operator),
-                                       outTypeInfo,
-                                       environment.getParallelism());
-               }
+                       BroadcastProcessFunction<IN1, IN2, OUT> userFunction,
+                       final TypeInformation<OUT> outTypeInfo) {
+
+               return BroadcastStateTransformation.forStream(
+                               functionName,
+                               nonBroadcastStream,
+                               broadcastStream,
+                               userFunction,
+                               broadcastStateDescriptors,
+                               outTypeInfo,
+                               environment.getParallelism());
+       }
+

Review comment:
       Now the two `getBroadcastStateTransformation()` methods have no 
additional logic and each has a single call-site, so why not inlining them in 
their call-sites?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/BroadcastStateTransformation.java
##########
@@ -53,111 +50,91 @@
 @Internal
 public class BroadcastStateTransformation<IN1, IN2, OUT> extends 
PhysicalTransformation<OUT> {
 
-       private final Transformation<IN1> nonBroadcastStream;
+       private final BroadcastProcessFunction<IN1, IN2, OUT> userFunction;
 
-       private final Transformation<IN2> broadcastStream;
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
 
-       private final StreamOperatorFactory<OUT> operatorFactory;
+       private final Transformation<IN1> inputStream;
 
-       private final TypeInformation<?> stateKeyType;
+       private final Transformation<IN2> broadcastStream;
 
-       private final KeySelector<IN1, ?> keySelector;
+       private ChainingStrategy chainingStrategy = 
ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
 
        private BroadcastStateTransformation(
                        final String name,
                        final Transformation<IN1> inputStream,
                        final Transformation<IN2> broadcastStream,
-                       final StreamOperatorFactory<OUT> operatorFactory,
-                       @Nullable final TypeInformation<?> keyType,
-                       @Nullable final KeySelector<IN1, ?> keySelector,
+                       final BroadcastProcessFunction<IN1, IN2, OUT> 
userFunction,
+                       final List<MapStateDescriptor<?, ?>> 
broadcastStateDescriptors,
                        final TypeInformation<OUT> outTypeInfo,
                        final int parallelism) {
                super(name, outTypeInfo, parallelism);
-               this.nonBroadcastStream = checkNotNull(inputStream);
+               this.inputStream = checkNotNull(inputStream);
                this.broadcastStream = checkNotNull(broadcastStream);
-               this.operatorFactory = checkNotNull(operatorFactory);
+               this.userFunction = userFunction;
+               this.broadcastStateDescriptors = broadcastStateDescriptors;
 
-               this.stateKeyType = keyType;
-               this.keySelector = keySelector;
-               updateManagedMemoryStateBackendUseCase(keySelector != null);
+               updateManagedMemoryStateBackendUseCase(false /* not keyed */);
        }
 
        public Transformation<IN2> getBroadcastStream() {
                return broadcastStream;
        }
 
        public Transformation<IN1> getNonBroadcastStream() {
-               return nonBroadcastStream;
+               return inputStream;
        }
 
-       public StreamOperatorFactory<OUT> getOperatorFactory() {
-               return operatorFactory;
+       public BroadcastProcessFunction<IN1, IN2, OUT> getUserFunction() {
+               return userFunction;
        }
 
-       public TypeInformation<?> getStateKeyType() {
-               return stateKeyType;
+       public List<MapStateDescriptor<?, ?>> getBroadcastStateDescriptors() {
+               return broadcastStateDescriptors;
        }
 
-       public KeySelector<IN1, ?> getKeySelector() {
-               return keySelector;
+       public ChainingStrategy getChainingStrategy() {
+               return chainingStrategy;
        }
 
        @Override
-       public void setChainingStrategy(ChainingStrategy strategy) {
-               this.operatorFactory.getChainingStrategy();
+       public void setChainingStrategy(ChainingStrategy chainingStrategy) {
+               this.chainingStrategy = checkNotNull(chainingStrategy);
        }
 
        @Override
        public List<Transformation<?>> getTransitivePredecessors() {
                final List<Transformation<?>> predecessors = new ArrayList<>();
                predecessors.add(this);
-               predecessors.add(nonBroadcastStream);
+               predecessors.add(inputStream);
                predecessors.add(broadcastStream);
                return predecessors;
        }
 
        @Override
        public List<Transformation<?>> getInputs() {
                final List<Transformation<?>> predecessors = new ArrayList<>();
-               predecessors.add(nonBroadcastStream);
+               predecessors.add(inputStream);
                predecessors.add(broadcastStream);
                return predecessors;
        }
 
        // ------------------------------- Static Constructors 
-------------------------------
 
-       public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, 
OUT> forNonKeyedStream(
-                       final String name,
-                       final DataStream<IN1> nonBroadcastStream,
-                       final BroadcastStream<IN2> broadcastStream,
-                       final StreamOperatorFactory<OUT> operatorFactory,
-                       final TypeInformation<OUT> outTypeInfo,
-                       final int parallelism) {
-               return new BroadcastStateTransformation<>(
-                               name,
-                               
checkNotNull(nonBroadcastStream).getTransformation(),
-                               
checkNotNull(broadcastStream).getTransformation(),
-                               operatorFactory,
-                               null,
-                               null,
-                               outTypeInfo,
-                               parallelism);
-       }
-
-       public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, 
OUT> forKeyedStream(
+       public static <IN1, IN2, OUT> BroadcastStateTransformation<IN1, IN2, 
OUT> forStream(

Review comment:
       We can make the constructor `public` now and remove the static 
constructors from here and from the `keyed` alternative as now we have two 
separate classes.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, 
this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another 
keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This 
function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is 
kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a 
href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html";>
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends 
PhysicalTransformation<OUT> {
+
+       private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
userFunction;
+
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+       private final Transformation<IN1> inputStream;
+
+       private final Transformation<IN2> broadcastStream;
+
+       private final TypeInformation<?> stateKeyType;
+
+       private final KeySelector<IN1, ?> keySelector;

Review comment:
       Same here.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, 
this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another 
keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This 
function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is 
kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a 
href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html";>
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends 
PhysicalTransformation<OUT> {
+
+       private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
userFunction;
+
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+       private final Transformation<IN1> inputStream;
+
+       private final Transformation<IN2> broadcastStream;
+
+       private final TypeInformation<?> stateKeyType;
+
+       private final KeySelector<IN1, ?> keySelector;
+
+       private ChainingStrategy chainingStrategy = 
ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
+
+       private KeyedBroadcastStateTransformation(
+                       final String name,
+                       final Transformation<IN1> inputStream,
+                       final Transformation<IN2> broadcastStream,
+                       final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
userFunction,
+                       final List<MapStateDescriptor<?, ?>> 
broadcastStateDescriptors,
+                       final TypeInformation<?> keyType,
+                       final KeySelector<IN1, ?> keySelector,
+                       final TypeInformation<OUT> outTypeInfo,
+                       final int parallelism) {
+               super(name, outTypeInfo, parallelism);
+               this.inputStream = checkNotNull(inputStream);
+               this.broadcastStream = checkNotNull(broadcastStream);
+               this.userFunction = userFunction;
+               this.broadcastStateDescriptors = broadcastStateDescriptors;
+
+               this.stateKeyType = keyType;
+               this.keySelector = keySelector;
+               updateManagedMemoryStateBackendUseCase(true /* we have keyed 
state */);
+       }
+
+       public Transformation<IN2> getBroadcastStream() {
+               return broadcastStream;
+       }
+
+       public Transformation<IN1> getNonBroadcastStream() {
+               return inputStream;
+       }
+
+       public KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
getUserFunction() {
+               return userFunction;
+       }
+
+       public List<MapStateDescriptor<?, ?>> getBroadcastStateDescriptors() {
+               return broadcastStateDescriptors;
+       }
+
+       public TypeInformation<?> getStateKeyType() {
+               return stateKeyType;
+       }
+
+       public KeySelector<IN1, ?> getKeySelector() {
+               return keySelector;
+       }
+
+       public ChainingStrategy getChainingStrategy() {
+               return chainingStrategy;
+       }
+
+       @Override
+       public void setChainingStrategy(ChainingStrategy chainingStrategy) {
+               this.chainingStrategy = checkNotNull(chainingStrategy);
+       }
+
+       @Override
+       public List<Transformation<?>> getTransitivePredecessors() {
+               final List<Transformation<?>> predecessors = new ArrayList<>();
+               predecessors.add(this);
+               predecessors.add(inputStream);
+               predecessors.add(broadcastStream);
+               return predecessors;
+       }
+
+       @Override
+       public List<Transformation<?>> getInputs() {
+               final List<Transformation<?>> predecessors = new ArrayList<>();
+               predecessors.add(inputStream);
+               predecessors.add(broadcastStream);
+               return predecessors;
+       }
+
+       // ------------------------------- Static Constructors 
-------------------------------
+

Review comment:
       Now we can remove this and make the constructor `public`, right?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/KeyedBroadcastStateTransformation.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the transformation for the Broadcast State pattern. In a nutshell, 
this transformation
+ * allows to take a broadcasted (non-keyed) stream, connect it with another 
keyed or non-keyed
+ * stream, and apply a function on the resulting connected stream. This 
function will have access to
+ * all the elements that belong to the non-keyed, broadcasted side, as this is 
kept in Flink's
+ * state.
+ *
+ * <p>For more information see the
+ * <a 
href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html";>
+ * Broadcast State Pattern documentation page</a>.
+ *
+ * @param <IN1> The type of the elements in the non-broadcasted input.
+ * @param <IN2> The type of the elements in the broadcasted input.
+ * @param <OUT> The type of the elements that result from this transformation.
+ */
+@Internal
+public class KeyedBroadcastStateTransformation<KS, IN1, IN2, OUT> extends 
PhysicalTransformation<OUT> {
+
+       private final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 
userFunction;
+
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+       private final Transformation<IN1> inputStream;
+
+       private final Transformation<IN2> broadcastStream;
+
+       private final TypeInformation<?> stateKeyType;

Review comment:
       Now this can have `KS` as a generic type, right? (for type safety)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -735,4 +761,10 @@ public int hashCode() {
                        return inputEdge.hashCode();
                }
        }
+

Review comment:
       Why not putting it as a method in the `InputConfig` with a default 
implementation that returns `false`? I think that this will make the code nicer 
and also it will avoid checks like `inputConfig instanceof 
StreamConfig.NetworkInputConfig`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -84,7 +84,7 @@
 
        private String transformationUID;
        private String userHash;
-       private boolean sortedInputs = false;
+       private Map<Integer, StreamConfig.InputRequirement> inputRequirements = 
new HashMap<>();

Review comment:
       Why not making this `final`?
   
   Also nit-pick: I find it is nicer to initialize fields in the constructor 
but feel free to ignore this.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BroadcastStateTransformationTranslator.java
##########
@@ -51,14 +69,18 @@
                checkNotNull(transformation);
                checkNotNull(context);
 
+               CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> operator = new 
CoBroadcastWithNonKeyedOperator<>(

Review comment:
       I like that we create the operator here as now we push further down the 
runtime-related stuff. In the future we could have a translator that gives, for 
example, not an operator but sth else.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
##########
@@ -238,10 +231,12 @@ private void setTypeSerializer(String key, 
TypeSerializer<?> typeWrapper) {
                }
        }
 
+       // TODO: this method is a bit misleading, because what it really does 
is setting up

Review comment:
       Can't we rename it to sth more descriptive?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to