[FLINK-3659] Expose broadcast state on DataStream API.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6c17bef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6c17bef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6c17bef

Branch: refs/heads/master
Commit: c6c17befe54d55755ebaf160ff20a11aa32bbbca
Parents: 484fedd
Author: kkloudas <[email protected]>
Authored: Thu Dec 21 14:38:54 2017 +0100
Committer: kkloudas <[email protected]>
Committed: Wed Feb 7 14:07:45 2018 +0100

----------------------------------------------------------------------
 .../datastream/BroadcastConnectedStream.java    | 255 ++++++++
 .../api/datastream/BroadcastStream.java         |  87 +++
 .../streaming/api/datastream/DataStream.java    |  41 ++
 .../co/BaseBroadcastProcessFunction.java        | 105 +++
 .../functions/co/BroadcastProcessFunction.java  |  93 +++
 .../co/KeyedBroadcastProcessFunction.java       | 145 ++++
 .../api/graph/StreamGraphGenerator.java         |   2 +-
 .../co/CoBroadcastWithKeyedOperator.java        | 324 +++++++++
 .../co/CoBroadcastWithNonKeyedOperator.java     | 228 +++++++
 .../transformations/TwoInputTransformation.java |   4 +-
 .../flink/streaming/api/DataStreamTest.java     | 186 ++++++
 .../co/CoBroadcastWithKeyedOperatorTest.java    | 655 +++++++++++++++++++
 .../co/CoBroadcastWithNonKeyedOperatorTest.java | 497 ++++++++++++++
 .../util/TwoInputStreamOperatorTestHarness.java |   2 +-
 14 files changed, 2620 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
new file mode 100644
index 0000000..aeb3bc2
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A BroadcastConnectedStream represents the result of connecting a keyed or 
non-keyed stream,
+ * with a {@link BroadcastStream} with {@link 
org.apache.flink.api.common.state.BroadcastState
+ * BroadcastState}. As in the case of {@link ConnectedStreams} these streams 
are useful for cases
+ * where operations on one stream directly affect the operations on the other 
stream, usually via
+ * shared state between the streams.
+ *
+ * <p>An example for the use of such connected streams would be to apply rules 
that change over time
+ * onto another, possibly keyed stream. The stream with the broadcast state 
has the rules, and will
+ * store them in the broadcast state, while the other stream will contain the 
elements to apply the
+ * rules to. By broadcasting the rules, these will be available in all 
parallel instances, and
+ * can be applied to all partitions of the other stream.
+ *
+ * @param <IN1> The input type of the non-broadcast side.
+ * @param <IN2> The input type of the broadcast side.
+ * @param <K> The key type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param <V> The value type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ */
+@PublicEvolving
+public class BroadcastConnectedStream<IN1, IN2, K, V> {
+
+       private final StreamExecutionEnvironment environment;
+       private final DataStream<IN1> inputStream1;
+       private final BroadcastStream<IN2, K, V> inputStream2;
+       private final MapStateDescriptor<K, V> broadcastStateDescriptor;
+
+       protected BroadcastConnectedStream(
+                       final StreamExecutionEnvironment env,
+                       final DataStream<IN1> input1,
+                       final BroadcastStream<IN2, K, V> input2,
+                       final MapStateDescriptor<K, V> 
broadcastStateDescriptor) {
+               this.environment = requireNonNull(env);
+               this.inputStream1 = requireNonNull(input1);
+               this.inputStream2 = requireNonNull(input2);
+               this.broadcastStateDescriptor = 
requireNonNull(broadcastStateDescriptor);
+       }
+
+       public StreamExecutionEnvironment getExecutionEnvironment() {
+               return environment;
+       }
+
+       /**
+        * Returns the non-broadcast {@link DataStream}.
+        *
+        * @return The stream which, by convention, is not broadcasted.
+        */
+       public DataStream<IN1> getFirstInput() {
+               return inputStream1;
+       }
+
+       /**
+        * Returns the {@link BroadcastStream}.
+        *
+        * @return The stream which, by convention, is the broadcast one.
+        */
+       public BroadcastStream<IN2, K, V> getSecondInput() {
+               return inputStream2;
+       }
+
+       /**
+        * Gets the type of the first input.
+        *
+        * @return The type of the first input
+        */
+       public TypeInformation<IN1> getType1() {
+               return inputStream1.getType();
+       }
+
+       /**
+        * Gets the type of the second input.
+        *
+        * @return The type of the second input
+        */
+       public TypeInformation<IN2> getType2() {
+               return inputStream2.getType();
+       }
+
+       /**
+        * Assumes as inputs a {@link BroadcastStream} and a {@link 
KeyedStream} and applies the given
+        * {@link KeyedBroadcastProcessFunction} on them, thereby creating a 
transformed output stream.
+        *
+        * @param function The {@link KeyedBroadcastProcessFunction} that is 
called for each element in the stream.
+        * @param <OUT> The type of the output elements.
+        * @return The transformed {@link DataStream}.
+        */
+       @PublicEvolving
+       public <OUT> SingleOutputStreamOperator<OUT> process(final 
KeyedBroadcastProcessFunction<IN1, IN2, OUT> function) {
+
+               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(
+                               function,
+                               KeyedBroadcastProcessFunction.class,
+                               0,
+                               1,
+                               2,
+                               TypeExtractor.NO_INDEX,
+                               TypeExtractor.NO_INDEX,
+                               TypeExtractor.NO_INDEX,
+                               getType1(),
+                               getType2(),
+                               Utils.getCallLocationName(),
+                               true);
+
+               return process(function, outTypeInfo);
+       }
+
+       /**
+        * Assumes as inputs a {@link BroadcastStream} and a {@link 
KeyedStream} and applies the given
+        * {@link KeyedBroadcastProcessFunction} on them, thereby creating a 
transformed output stream.
+        *
+        * @param function The {@link KeyedBroadcastProcessFunction} that is 
called for each element in the stream.
+        * @param outTypeInfo The type of the output elements.
+        * @param <OUT> The type of the output elements.
+        * @return The transformed {@link DataStream}.
+        */
+       @PublicEvolving
+       public <OUT> SingleOutputStreamOperator<OUT> process(
+                       final KeyedBroadcastProcessFunction<IN1, IN2, OUT> 
function,
+                       final TypeInformation<OUT> outTypeInfo) {
+
+               Preconditions.checkNotNull(function);
+               Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
+                               "A KeyedBroadcastProcessFunction can only be 
used with a keyed stream as the second input.");
+
+               TwoInputStreamOperator<IN1, IN2, OUT> operator =
+                               new CoBroadcastWithKeyedOperator<>(function, 
Collections.singletonList(broadcastStateDescriptor));
+               return transform("Co-Process-Broadcast-Keyed", outTypeInfo, 
operator);
+       }
+
+       /**
+        * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link 
DataStream} and applies the given
+        * {@link BroadcastProcessFunction} on them, thereby creating a 
transformed output stream.
+        *
+        * @param function The {@link BroadcastProcessFunction} that is called 
for each element in the stream.
+        * @param <OUT> The type of the output elements.
+        * @return The transformed {@link DataStream}.
+        */
+       @PublicEvolving
+       public <OUT> SingleOutputStreamOperator<OUT> process(final 
BroadcastProcessFunction<IN1, IN2, OUT> function) {
+
+               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.getBinaryOperatorReturnType(
+                               function,
+                               BroadcastProcessFunction.class,
+                               0,
+                               1,
+                               2,
+                               TypeExtractor.NO_INDEX,
+                               TypeExtractor.NO_INDEX,
+                               TypeExtractor.NO_INDEX,
+                               getType1(),
+                               getType2(),
+                               Utils.getCallLocationName(),
+                               true);
+
+               return process(function, outTypeInfo);
+       }
+
+       /**
+        * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link 
DataStream} and applies the given
+        * {@link BroadcastProcessFunction} on them, thereby creating a 
transformed output stream.
+        *
+        * @param function The {@link BroadcastProcessFunction} that is called 
for each element in the stream.
+        * @param outTypeInfo The type of the output elements.
+        * @param <OUT> The type of the output elements.
+        * @return The transformed {@link DataStream}.
+        */
+       @PublicEvolving
+       public <OUT> SingleOutputStreamOperator<OUT> process(
+                       final BroadcastProcessFunction<IN1, IN2, OUT> function,
+                       final TypeInformation<OUT> outTypeInfo) {
+
+               Preconditions.checkNotNull(function);
+               Preconditions.checkArgument(!(inputStream1 instanceof 
KeyedStream),
+                               "A BroadcastProcessFunction can only be used 
with a non-keyed stream as the second input.");
+
+               TwoInputStreamOperator<IN1, IN2, OUT> operator =
+                               new CoBroadcastWithNonKeyedOperator<>(function, 
Collections.singletonList(broadcastStateDescriptor));
+               return transform("Co-Process-Broadcast", outTypeInfo, operator);
+       }
+
+       @Internal
+       private <OUT> SingleOutputStreamOperator<OUT> transform(
+                       final String functionName,
+                       final TypeInformation<OUT> outTypeInfo,
+                       final TwoInputStreamOperator<IN1, IN2, OUT> operator) {
+
+               // read the output type of the input Transforms to coax out 
errors about MissingTypeInfo
+               inputStream1.getType();
+               inputStream2.getType();
+
+               TwoInputTransformation<IN1, IN2, OUT> transform = new 
TwoInputTransformation<>(
+                               inputStream1.getTransformation(),
+                               inputStream2.getTransformation(),
+                               functionName,
+                               operator,
+                               outTypeInfo,
+                               environment.getParallelism());
+
+               if (inputStream1 instanceof KeyedStream) {
+                       KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) 
inputStream1;
+                       TypeInformation<?> keyType1 = keyedInput1.getKeyType();
+                       
transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
+                       transform.setStateKeyType(keyType1);
+               }
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               SingleOutputStreamOperator<OUT> returnStream = new 
SingleOutputStreamOperator(environment, transform);
+
+               getExecutionEnvironment().addOperator(transform);
+
+               return returnStream;
+       }
+
+       protected <F> F clean(F f) {
+               return getExecutionEnvironment().clean(f);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java
new file mode 100644
index 0000000..e21e36f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.StreamTransformation;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@code BroadcastStream} is a stream with {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * This can be created by any stream using the {@link 
DataStream#broadcast(MapStateDescriptor)} method and
+ * implicitly creates a state where the user can store elements of the created 
{@code BroadcastStream}.
+ * (see {@link BroadcastConnectedStream}).
+ *
+ * <p>Note that no further operation can be applied to these streams. The only 
available option is to connect them
+ * with a keyed or non-keyed stream, using the {@link 
KeyedStream#connect(BroadcastStream)} and the
+ * {@link DataStream#connect(BroadcastStream)} respectively. Applying these 
methods will result it a
+ * {@link BroadcastConnectedStream} for further processing.
+ *
+ * @param <T> The type of input/output elements.
+ * @param <K> The key type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param <V> The value type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ */
+@PublicEvolving
+public class BroadcastStream<T, K, V> {
+
+       private final StreamExecutionEnvironment environment;
+
+       private final DataStream<T> inputStream;
+
+       /**
+        * The {@link org.apache.flink.api.common.state.StateDescriptor state 
descriptor} of the
+        * {@link org.apache.flink.api.common.state.BroadcastState broadcast 
state}. This state
+        * has a {@code key-value} format.
+        */
+       private final MapStateDescriptor<K, V> broadcastStateDescriptor;
+
+       protected BroadcastStream(
+                       final StreamExecutionEnvironment env,
+                       final DataStream<T> input,
+                       final MapStateDescriptor<K, V> 
broadcastStateDescriptor) {
+
+               this.environment = requireNonNull(env);
+               this.inputStream = requireNonNull(input);
+               this.broadcastStateDescriptor = 
requireNonNull(broadcastStateDescriptor);
+       }
+
+       public TypeInformation<T> getType() {
+               return inputStream.getType();
+       }
+
+       public <F> F clean(F f) {
+               return environment.clean(f);
+       }
+
+       public StreamTransformation<T> getTransformation() {
+               return inputStream.getTransformation();
+       }
+
+       public MapStateDescriptor<K, V> getBroadcastStateDescriptor() {
+               return broadcastStateDescriptor;
+       }
+
+       public StreamExecutionEnvironment getEnvironment() {
+               return environment;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 83c1126..d859689 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -253,6 +254,30 @@ public class DataStream<T> {
        }
 
        /**
+        * Creates a new {@link BroadcastConnectedStream} by connecting the 
current
+        * {@link DataStream} or {@link KeyedStream} with a {@link 
BroadcastStream}.
+        *
+        * <p>The latter can be created using the {@link 
#broadcast(MapStateDescriptor)} method.
+        *
+        * <p>The resulting stream can be further processed using the {@code 
BroadcastConnectedStream.process(MyFunction)}
+        * method, where {@code MyFunction} can be either a
+        * {@link 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction 
KeyedBroadcastProcessFunction}
+        * or a {@link 
org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction 
BroadcastProcessFunction}
+        * depending on the current stream being a {@link KeyedStream} or not.
+        *
+        * @param broadcastStream The broadcast stream with the broadcast state 
to be connected with this stream.
+        * @return The {@link BroadcastConnectedStream}.
+        */
+       @PublicEvolving
+       public <R, K, V> BroadcastConnectedStream<T, R, K, V> 
connect(BroadcastStream<R, K, V> broadcastStream) {
+               return new BroadcastConnectedStream<>(
+                               environment,
+                               this,
+                               Preconditions.checkNotNull(broadcastStream),
+                               broadcastStream.getBroadcastStateDescriptor());
+       }
+
+       /**
         * It creates a new {@link KeyedStream} that uses the provided key for 
partitioning
         * its operator states.
         *
@@ -373,6 +398,22 @@ public class DataStream<T> {
 
        /**
         * Sets the partitioning of the {@link DataStream} so that the output 
elements
+        * are broadcasted to every parallel instance of the next operation. In 
addition,
+        * it implicitly creates a {@link 
org.apache.flink.api.common.state.BroadcastState broadcast state}
+        * which can be used to store the element of the stream.
+        *
+        * @return A {@link BroadcastStream} which can be used in the {@link 
#connect(BroadcastStream)} to
+        * create a {@link BroadcastConnectedStream} for further processing of 
the elements.
+        */
+       @PublicEvolving
+       public <K, V> BroadcastStream<T, K, V> broadcast(final 
MapStateDescriptor<K, V> broadcastStateDescriptor) {
+               Preconditions.checkNotNull(broadcastStateDescriptor);
+               final DataStream<T> broadcastStream = setConnectionType(new 
BroadcastPartitioner<>());
+               return new BroadcastStream<>(environment, broadcastStream, 
broadcastStateDescriptor);
+       }
+
+       /**
+        * Sets the partitioning of the {@link DataStream} so that the output 
elements
         * are shuffled uniformly randomly to the next operation.
         *
         * @return The DataStream with shuffle partitioning set.

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
new file mode 100644
index 0000000..9419d80
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * The base class containing the functionality available to all broadcast 
process function.
+ * These include the {@link BroadcastProcessFunction} and the {@link 
KeyedBroadcastProcessFunction}.
+ */
+@PublicEvolving
+public abstract class BaseBroadcastProcessFunction extends 
AbstractRichFunction {
+
+       private static final long serialVersionUID = -131631008887478610L;
+
+       /**
+        * The base context available to all methods in a broadcast process 
function. This
+        * include {@link BroadcastProcessFunction BroadcastProcessFunctions} 
and
+        * {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+        */
+       abstract class BaseContext {
+
+               /**
+                * Timestamp of the element currently being processed or 
timestamp of a firing timer.
+                *
+                * <p>This might be {@code null}, for example if the time 
characteristic of your program
+                * is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+                */
+               public abstract Long timestamp();
+
+               /**
+                * Emits a record to the side output identified by the {@link 
OutputTag}.
+                *
+                * @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+                * @param value The record to emit.
+                */
+               public abstract <X> void output(OutputTag<X> outputTag, X 
value);
+
+               /** Returns the current processing time. */
+               public abstract long currentProcessingTime();
+
+               /** Returns the current event-time watermark. */
+               public abstract long currentWatermark();
+       }
+
+       /**
+        * A base {@link BaseContext context} available to the broadcasted 
stream side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream}.
+        *
+        * <p>Apart from the basic functionality of a {@link BaseContext 
context},
+        * this also allows to get and update the elements stored in the
+        * {@link BroadcastState broadcast state}.
+        * In other words, it gives read/write access to the broadcast state.
+        */
+       public abstract class Context extends BaseContext {
+
+               /**
+                * Fetches the {@link BroadcastState} with the specified name.
+                *
+                * @param stateDescriptor the {@link MapStateDescriptor} of the 
state to be fetched.
+                * @return The required {@link BroadcastState broadcast state}.
+                */
+               public abstract <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
+       }
+
+       /**
+        * A {@link BaseContext context} available to the non-broadcasted 
stream side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream}.
+        *
+        * <p>Apart from the basic functionality of a {@link BaseContext 
context},
+        * this also allows to get a <b>read-only</b> {@link Iterable} over the 
elements stored in the
+        * broadcast state.
+        */
+       public abstract class ReadOnlyContext extends BaseContext {
+
+               /**
+                * Fetches a read-only view of the broadcast state with the 
specified name.
+                *
+                * @param stateDescriptor the {@link MapStateDescriptor} of the 
state to be fetched.
+                * @return The required read-only view of the broadcast state.
+                */
+               public abstract <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
new file mode 100644
index 0000000..4dcc929
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function to be applied to a
+ * {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream} that
+ * connects {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
BroadcastStream}, i.e. a stream
+ * with broadcast state, with a <b>non-keyed</b> {@link 
org.apache.flink.streaming.api.datastream.DataStream DataStream}.
+ *
+ * <p>The stream with the broadcast state can be created using the
+ * {@link 
org.apache.flink.streaming.api.datastream.DataStream#broadcast(MapStateDescriptor)
+ * stream.broadcast(MapStateDescriptor)} method.
+ *
+ * <p>The user has to implement two methods:
+ * <ol>
+ *     <li>the {@link #processBroadcastElement(Object, Context, Collector)} 
which will be applied to
+ *     each element in the broadcast side
+ *     <li> and the {@link #processElement(Object, ReadOnlyContext, 
Collector)} which will be applied to the
+ *     non-broadcasted/keyed side.
+ * </ol>
+ *
+ * <p>The {@code processElementOnBroadcastSide()} takes as argument (among 
others) a context that allows it to
+ * read/write to the broadcast state, while the {@code processElement()} has 
read-only access to the broadcast state.
+ *
+ * @param <IN1> The input type of the non-broadcast side.
+ * @param <IN2> The input type of the broadcast side.
+ * @param <OUT> The output type of the operator.
+ */
+@PublicEvolving
+public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends 
BaseBroadcastProcessFunction {
+
+       private static final long serialVersionUID = 8352559162119034453L;
+
+       /**
+        * This method is called for each element in the (non-broadcast)
+        * {@link org.apache.flink.streaming.api.datastream.DataStream data 
stream}.
+        *
+        * <p>This function can output zero or more elements using the {@link 
Collector} parameter,
+        * query the current processing/event time, and also query and update 
the local keyed state.
+        * Finally, it has <b>read-only</b> access to the broadcast state.
+        * The context is only valid during the invocation of this method, do 
not store it.
+        *
+        * @param value The stream element.
+        * @param ctx A {@link ReadOnlyContext} that allows querying the 
timestamp of the element,
+        *            querying the current processing/event time and updating 
the broadcast state.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       public abstract void processElement(IN1 value, ReadOnlyContext ctx, 
Collector<OUT> out) throws Exception;
+
+       /**
+        * This method is called for each element in the
+        * {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
broadcast stream}.
+        *
+        * <p>This function can output zero or more elements using the {@link 
Collector} parameter,
+        * query the current processing/event time, and also query and update 
the internal
+        * {@link org.apache.flink.api.common.state.BroadcastState broadcast 
state}. These can be done
+        * through the provided {@link Context}.
+        * The context is only valid during the invocation of this method, do 
not store it.
+        *
+        * @param value The stream element.
+        * @param ctx A {@link Context} that allows querying the timestamp of 
the element,
+        *            querying the current processing/event time and updating 
the broadcast state.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
new file mode 100644
index 0000000..9d14259
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function to be applied to a
+ * {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream} that
+ * connects {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
BroadcastStream}, i.e. a stream
+ * with broadcast state, with a {@link 
org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}.
+ *
+ * <p>The stream with the broadcast state can be created using the
+ * {@link 
org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor)
+ * keyedStream.broadcast(MapStateDescriptor)} method.
+ *
+ * <p>The user has to implement two methods:
+ * <ol>
+ *     <li>the {@link #processBroadcastElement(Object, Context, Collector)} 
which will be applied to
+ *     each element in the broadcast side
+ *     <li> and the {@link #processElement(Object, KeyedReadOnlyContext, 
Collector)} which will be applied to the
+ *     non-broadcasted/keyed side.
+ * </ol>
+ *
+ * <p>The {@code processElementOnBroadcastSide()} takes as an argument (among 
others) a context that allows it to
+ * read/write to the broadcast state and also apply a transformation to all 
(local) keyed states, while the
+ * {@code processElement()} has read-only access to the broadcast state, but 
can read/write to the keyed state and
+ * register timers.
+ *
+ * @param <IN1> The input type of the keyed (non-broadcast) side.
+ * @param <IN2> The input type of the broadcast side.
+ * @param <OUT> The output type of the operator.
+ */
+@PublicEvolving
+public abstract class KeyedBroadcastProcessFunction<IN1, IN2, OUT> extends 
BaseBroadcastProcessFunction {
+
+       private static final long serialVersionUID = -2584726797564976453L;
+
+       /**
+        * This method is called for each element in the (non-broadcast)
+        * {@link org.apache.flink.streaming.api.datastream.KeyedStream keyed 
stream}.
+        *
+        * <p>It can output zero or more elements using the {@link Collector} 
parameter,
+        * query the current processing/event time, and also query and update 
the local keyed state.
+        * In addition, it can get a {@link TimerService} for registering 
timers and querying the time.
+        * Finally, it has <b>read-only</b> access to the broadcast state.
+        * The context is only valid during the invocation of this method, do 
not store it.
+        *
+        * @param value The stream element.
+        * @param ctx A {@link KeyedReadOnlyContext} that allows querying the 
timestamp of the element,
+        *            querying the current processing/event time and iterating 
the broadcast state
+        *            with <b>read-only</b> access.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       public abstract void processElement(final IN1 value, final 
KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
+
+       /**
+        * This method is called for each element in the
+        * {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
broadcast stream}.
+        *
+        * <p>It can output zero or more elements using the {@link Collector} 
parameter,
+        * query the current processing/event time, and also query and update 
the internal
+        * {@link org.apache.flink.api.common.state.BroadcastState broadcast 
state}. These can
+        * be done through the provided {@link Context}.
+        * The context is only valid during the invocation of this method, do 
not store it.
+        *
+        * @param value The stream element.
+        * @param ctx A {@link Context} that allows querying the timestamp of 
the element,
+        *            querying the current processing/event time and updating 
the broadcast state.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector to emit resulting elements to
+        * @throws Exception The function may throw exceptions which cause the 
streaming program
+        *                   to fail and go into recovery.
+        */
+       public abstract void processBroadcastElement(final IN2 value, final 
Context ctx, final Collector<OUT> out) throws Exception;
+
+       /**
+        * Called when a timer set using {@link TimerService} fires.
+        *
+        * @param timestamp The timestamp of the firing timer.
+        * @param ctx An {@link OnTimerContext} that allows querying the 
timestamp of the firing timer,
+        *            querying the current processing/event time, iterating the 
broadcast state
+        *            with <b>read-only</b> access, querying the {@link 
TimeDomain} of the firing timer
+        *            and getting a {@link TimerService} for registering timers 
and querying the time.
+        *            The context is only valid during the invocation of this 
method, do not store it.
+        * @param out The collector for returning result values.
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
+        */
+       public void onTimer(final long timestamp, final OnTimerContext ctx, 
final Collector<OUT> out) throws Exception {
+               // the default implementation does nothing.
+       }
+
+       /**
+        * A {@link BaseBroadcastProcessFunction.Context context} available to 
the keyed stream side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
+        *
+        * <p>Apart from the basic functionality of a {@link 
BaseBroadcastProcessFunction.Context context},
+        * this also allows to get a <b>read-only</b> {@link Iterable} over the 
elements stored in the
+        * broadcast state and a {@link TimerService} for querying time and 
registering timers.
+        */
+       public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
+
+               /**
+                * A {@link TimerService} for querying time and registering 
timers.
+                */
+               public abstract TimerService timerService();
+       }
+
+       /**
+        * Information available in an invocation of {@link #onTimer(long, 
OnTimerContext, Collector)}.
+        */
+       public abstract class OnTimerContext extends KeyedReadOnlyContext {
+
+               /**
+                * The {@link TimeDomain} of the firing timer, i.e. if it is
+                * event or processing time timer.
+                */
+               public abstract TimeDomain timeDomain();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 0a05f09..7d0333f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -586,7 +586,7 @@ public class StreamGraphGenerator {
                                transform.getOutputType(),
                                transform.getName());
 
-               if (transform.getStateKeySelector1() != null) {
+               if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
                        TypeSerializer<?> keySerializer = 
transform.getStateKeyType().createSerializer(env.getConfig());
                        streamGraph.setTwoInputStateKey(transform.getId(), 
transform.getStateKeySelector1(), transform.getStateKeySelector2(), 
keySerializer);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
new file mode 100644
index 0000000..794b0db
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import 
org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+ *
+ * @param <KS> The key type of the input keyed stream.
+ * @param <IN1> The input type of the keyed (non-broadcast) side.
+ * @param <IN2> The input type of the broadcast side.
+ * @param <OUT> The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
+               extends AbstractUdfStreamOperator<OUT, 
KeyedBroadcastProcessFunction<IN1, IN2, OUT>>
+               implements TwoInputStreamOperator<IN1, IN2, OUT>, 
Triggerable<KS, VoidNamespace> {
+
+       private static final long serialVersionUID = 5926499536290284870L;
+
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+       private transient TimestampedCollector<OUT> collector;
+
+       private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> 
broadcastStates;
+
+       private transient ReadWriteContextImpl rwContext;
+
+       private transient ReadOnlyContextImpl rContext;
+
+       private transient OnTimerContextImpl onTimerContext;
+
+       public CoBroadcastWithKeyedOperator(
+                       final KeyedBroadcastProcessFunction<IN1, IN2, OUT> 
function,
+                       final List<MapStateDescriptor<?, ?>> 
broadcastStateDescriptors) {
+               super(function);
+               this.broadcastStateDescriptors = 
Preconditions.checkNotNull(broadcastStateDescriptors);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+
+               InternalTimerService<VoidNamespace> internalTimerService =
+                               getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+               TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+               collector = new TimestampedCollector<>(output);
+
+               this.broadcastStates = new 
HashMap<>(broadcastStateDescriptors.size());
+               for (MapStateDescriptor<?, ?> descriptor: 
broadcastStateDescriptors) {
+                       broadcastStates.put(descriptor, 
getOperatorStateBackend().getBroadcastState(descriptor));
+               }
+
+               rwContext = new ReadWriteContextImpl(userFunction, 
broadcastStates, timerService);
+               rContext = new ReadOnlyContextImpl(userFunction, 
broadcastStates, timerService);
+               onTimerContext = new OnTimerContextImpl(userFunction, 
broadcastStates, timerService);
+       }
+
+       @Override
+       public void processElement1(StreamRecord<IN1> element) throws Exception 
{
+               collector.setTimestamp(element);
+               rContext.setElement(element);
+               userFunction.processElement(element.getValue(), rContext, 
collector);
+               rContext.setElement(null);
+       }
+
+       @Override
+       public void processElement2(StreamRecord<IN2> element) throws Exception 
{
+               collector.setTimestamp(element);
+               rwContext.setElement(element);
+               userFunction.processBroadcastElement(element.getValue(), 
rwContext, collector);
+               rwContext.setElement(null);
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<KS, VoidNamespace> timer) throws 
Exception {
+               collector.setAbsoluteTimestamp(timer.getTimestamp());
+               onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<KS, VoidNamespace> timer) 
throws Exception {
+               collector.eraseTimestamp();
+               onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+               onTimerContext.timer = timer;
+               userFunction.onTimer(timer.getTimestamp(), onTimerContext, 
collector);
+               onTimerContext.timeDomain = null;
+               onTimerContext.timer = null;
+       }
+
+       private class ReadWriteContextImpl extends 
BaseBroadcastProcessFunction.Context {
+
+               private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
+
+               private final TimerService timerService;
+
+               private StreamRecord<IN2> element;
+
+               ReadWriteContextImpl (
+                               final KeyedBroadcastProcessFunction<IN1, IN2, 
OUT> function,
+                               final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
+                               final TimerService timerService) {
+
+                       function.super();
+                       this.states = 
Preconditions.checkNotNull(broadcastStates);
+                       this.timerService = 
Preconditions.checkNotNull(timerService);
+               }
+
+               void setElement(StreamRecord<IN2> e) {
+                       this.element = e;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+                       return element.getTimestamp();
+               }
+
+               @Override
+               public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
+                       Preconditions.checkNotNull(stateDescriptor);
+                       BroadcastState<K, V> state = (BroadcastState<K, V>) 
states.get(stateDescriptor);
+                       if (state == null) {
+                               throw new IllegalArgumentException("The 
requested state does not exist. " +
+                                               "Check for typos in your state 
descriptor, or specify the state descriptor " +
+                                               "in the 
datastream.broadcast(...) call if you forgot to register it.");
+                       }
+                       return state;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       checkArgument(outputTag != null, "OutputTag must not be 
null.");
+                       output.collect(outputTag, new StreamRecord<>(value, 
element.getTimestamp()));
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return timerService.currentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return timerService.currentWatermark();
+               }
+       }
+
+       private class ReadOnlyContextImpl extends 
KeyedBroadcastProcessFunction<IN1, IN2, OUT>.KeyedReadOnlyContext {
+
+               private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
+
+               private final TimerService timerService;
+
+               private StreamRecord<IN1> element;
+
+               ReadOnlyContextImpl(
+                               final KeyedBroadcastProcessFunction<IN1, IN2, 
OUT> function,
+                               final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
+                               final TimerService timerService) {
+
+                       function.super();
+                       this.states = 
Preconditions.checkNotNull(broadcastStates);
+                       this.timerService = 
Preconditions.checkNotNull(timerService);
+               }
+
+               void setElement(StreamRecord<IN1> e) {
+                       this.element = e;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+                       return element.hasTimestamp() ? element.getTimestamp() 
: null;
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return timerService.currentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return timerService.currentWatermark();
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       checkArgument(outputTag != null, "OutputTag must not be 
null.");
+                       output.collect(outputTag, new StreamRecord<>(value, 
element.getTimestamp()));
+               }
+
+               @Override
+               public  <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
+                       Preconditions.checkNotNull(stateDescriptor);
+                       ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
+                       if (state == null) {
+                               throw new IllegalArgumentException("The 
requested state does not exist. " +
+                                               "Check for typos in your state 
descriptor, or specify the state descriptor " +
+                                               "in the 
datastream.broadcast(...) call if you forgot to register it.");
+                       }
+                       return state;
+               }
+       }
+
+       private class OnTimerContextImpl extends 
KeyedBroadcastProcessFunction<IN1, IN2, OUT>.OnTimerContext {
+
+               private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
+
+               private final TimerService timerService;
+
+               private TimeDomain timeDomain;
+
+               private InternalTimer<KS, VoidNamespace> timer;
+
+               OnTimerContextImpl(
+                               final KeyedBroadcastProcessFunction<IN1, IN2, 
OUT> function,
+                               final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
+                               final TimerService timerService) {
+
+                       function.super();
+                       this.states = 
Preconditions.checkNotNull(broadcastStates);
+                       this.timerService = 
Preconditions.checkNotNull(timerService);
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(timer != null);
+                       return timer.getTimestamp();
+               }
+
+               @Override
+               public TimeDomain timeDomain() {
+                       checkState(timeDomain != null);
+                       return timeDomain;
+               }
+
+               @Override
+               public TimerService timerService() {
+                       return timerService;
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return timerService.currentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return timerService.currentWatermark();
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       checkArgument(outputTag != null, "OutputTag must not be 
null.");
+                       output.collect(outputTag, new StreamRecord<>(value, 
timer.getTimestamp()));
+               }
+
+               @Override
+               public <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
+                       Preconditions.checkNotNull(stateDescriptor);
+                       ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
+                       if (state == null) {
+                               throw new IllegalArgumentException("The 
requested state does not exist. " +
+                                               "Check for typos in your state 
descriptor, or specify the state descriptor " +
+                                               "in the 
datastream.broadcast(...) call if you forgot to register it.");
+                       }
+                       return state;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
new file mode 100644
index 0000000..25bf873
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import 
org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
BroadcastProcessFunction BroadcastProcessFunctions}.
+ *
+ * @param <IN1> The input type of the keyed (non-broadcast) side.
+ * @param <IN2> The input type of the broadcast side.
+ * @param <OUT> The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
+               extends AbstractUdfStreamOperator<OUT, 
BroadcastProcessFunction<IN1, IN2, OUT>>
+               implements TwoInputStreamOperator<IN1, IN2, OUT> {
+
+       private static final long serialVersionUID = -1869740381935471752L;
+
+       /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
+       private long currentWatermark = Long.MIN_VALUE;
+
+       private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
+
+       private transient TimestampedCollector<OUT> collector;
+
+       private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> 
broadcastStates;
+
+       private transient ReadWriteContextImpl rwContext;
+
+       private transient ReadOnlyContextImpl rContext;
+
+       public CoBroadcastWithNonKeyedOperator(
+                       final BroadcastProcessFunction<IN1, IN2, OUT> function,
+                       final List<MapStateDescriptor<?, ?>> 
broadcastStateDescriptors) {
+               super(function);
+               this.broadcastStateDescriptors = 
Preconditions.checkNotNull(broadcastStateDescriptors);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+
+               collector = new TimestampedCollector<>(output);
+
+               this.broadcastStates = new 
HashMap<>(broadcastStateDescriptors.size());
+               for (MapStateDescriptor<?, ?> descriptor: 
broadcastStateDescriptors) {
+                       broadcastStates.put(descriptor, 
getOperatorStateBackend().getBroadcastState(descriptor));
+               }
+
+               rwContext = new ReadWriteContextImpl(userFunction, 
broadcastStates, getProcessingTimeService());
+               rContext = new ReadOnlyContextImpl(userFunction, 
broadcastStates, getProcessingTimeService());
+       }
+
+       @Override
+       public void processElement1(StreamRecord<IN1> element) throws Exception 
{
+               collector.setTimestamp(element);
+               rContext.setElement(element);
+               userFunction.processElement(element.getValue(), rContext, 
collector);
+               rContext.setElement(null);
+       }
+
+       @Override
+       public void processElement2(StreamRecord<IN2> element) throws Exception 
{
+               collector.setTimestamp(element);
+               rwContext.setElement(element);
+               userFunction.processBroadcastElement(element.getValue(), 
rwContext, collector);
+               rwContext.setElement(null);
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+               currentWatermark = mark.getTimestamp();
+       }
+
+       private class ReadWriteContextImpl extends 
BaseBroadcastProcessFunction.Context {
+
+               private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
+
+               private final ProcessingTimeService timerService;
+
+               private StreamRecord<IN2> element;
+
+               ReadWriteContextImpl(
+                               final BroadcastProcessFunction<IN1, IN2, OUT> 
function,
+                               final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
+                               final ProcessingTimeService timerService) {
+
+                       function.super();
+                       this.states = 
Preconditions.checkNotNull(broadcastStates);
+                       this.timerService = 
Preconditions.checkNotNull(timerService);
+               }
+
+               void setElement(StreamRecord<IN2> e) {
+                       this.element = e;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+                       return element.getTimestamp();
+               }
+
+               @Override
+               public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
+                       Preconditions.checkNotNull(stateDescriptor);
+                       BroadcastState<K, V> state = (BroadcastState<K, V>) 
states.get(stateDescriptor);
+                       if (state == null) {
+                               throw new IllegalArgumentException("The 
requested state does not exist. " +
+                                               "Check for typos in your state 
descriptor, or specify the state descriptor " +
+                                               "in the 
datastream.broadcast(...) call if you forgot to register it.");
+                       }
+                       return state;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       checkArgument(outputTag != null, "OutputTag must not be 
null.");
+                       output.collect(outputTag, new StreamRecord<>(value, 
element.getTimestamp()));
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return timerService.getCurrentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return currentWatermark;
+               }
+       }
+
+       private class ReadOnlyContextImpl extends BroadcastProcessFunction<IN1, 
IN2, OUT>.ReadOnlyContext {
+
+               private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
+
+               private final ProcessingTimeService timerService;
+
+               private StreamRecord<IN1> element;
+
+               ReadOnlyContextImpl(
+                               final BroadcastProcessFunction<IN1, IN2, OUT> 
function,
+                               final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
+                               final ProcessingTimeService timerService) {
+
+                       function.super();
+                       this.states = 
Preconditions.checkNotNull(broadcastStates);
+                       this.timerService = 
Preconditions.checkNotNull(timerService);
+               }
+
+               void setElement(StreamRecord<IN1> e) {
+                       this.element = e;
+               }
+
+               @Override
+               public Long timestamp() {
+                       checkState(element != null);
+                       return element.hasTimestamp() ? element.getTimestamp() 
: null;
+               }
+
+               @Override
+               public <X> void output(OutputTag<X> outputTag, X value) {
+                       checkArgument(outputTag != null, "OutputTag must not be 
null.");
+                       output.collect(outputTag, new StreamRecord<>(value, 
element.getTimestamp()));
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return timerService.getCurrentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return currentWatermark;
+               }
+
+               @Override
+               public <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
+                       Preconditions.checkNotNull(stateDescriptor);
+                       ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
+                       if (state == null) {
+                               throw new IllegalArgumentException("The 
requested state does not exist. " +
+                                               "Check for typos in your state 
descriptor, or specify the state descriptor " +
+                                               "in the 
datastream.broadcast(...) call if you forgot to register it.");
+                       }
+                       return state;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
index 5ee055c..1c75921 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java
@@ -31,8 +31,8 @@ import java.util.List;
 
 /**
  * This Transformation represents the application of a
- * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to 
two input
- * {@code StreamTransformations}. The result is again only one stream.
+ * {@link TwoInputStreamOperator} to two input {@code StreamTransformations}.
+ * The result is again only one stream.
  *
  * @param <IN1> The type of the elements in the first input {@code 
StreamTransformation}
  * @param <IN2> The type of the elements in the second input {@code 
StreamTransformation}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 6fb06d3..59f54b5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -37,6 +38,7 @@ import 
org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -45,9 +47,12 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
@@ -57,6 +62,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -78,8 +84,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Method;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -754,6 +764,182 @@ public class DataStreamTest extends TestLogger {
        }
 
        @Test
+       public void testConnectWithBroadcastTranslation() throws Exception {
+
+               final Map<Long, String> expected = new HashMap<>();
+               expected.put(0L, "test:0");
+               expected.put(1L, "test:1");
+               expected.put(2L, "test:2");
+               expected.put(3L, "test:3");
+               expected.put(4L, "test:4");
+               expected.put(5L, "test:5");
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
+
+                                       @Override
+                                       public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+                                               return element;
+                                       }
+                               }).keyBy((KeySelector<Long, Long>) value -> 
value);
+
+               final DataStream<String> srcTwo = 
env.fromCollection(expected.values())
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
+                                       @Override
+                                       public long extractTimestamp(String 
element, long previousElementTimestamp) {
+                                               return 
Long.parseLong(element.split(":")[1]);
+                                       }
+                               });
+
+               final BroadcastStream<String, Long, String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+               // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+               final DataStream<String> output = 
srcOne.connect(broadcast).process(
+                               new TestBroadcastProcessFunction(100000L, 
expected));
+
+               output.addSink(new DiscardingSink<>());
+               env.execute();
+       }
+
+       private abstract static class CustomWmEmitter<T> implements 
AssignerWithPunctuatedWatermarks<T> {
+
+               @Nullable
+               @Override
+               public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+                       return new Watermark(extractedTimestamp);
+               }
+       }
+
+       private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction<Long, String, String> {
+
+               private final Map<Long, String> expectedState;
+
+               private final long timerTimestamp;
+
+               static final MapStateDescriptor<Long, String> DESCRIPTOR = new 
MapStateDescriptor<>(
+                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+               );
+
+               TestBroadcastProcessFunction(
+                               final long timerTS,
+                               final Map<Long, String> expectedBroadcastState
+               ) {
+                       expectedState = expectedBroadcastState;
+                       timerTimestamp = timerTS;
+               }
+
+               @Override
+               public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+                       
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+               }
+
+               @Override
+               public void processBroadcastElement(String value, Context ctx, 
Collector<String> out) throws Exception {
+                       long key = Long.parseLong(value.split(":")[1]);
+                       ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+               }
+
+               @Override
+               public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<String> out) throws Exception {
+                       Map<Long, String> map = new HashMap<>();
+                       for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+                               map.put(entry.getKey(), entry.getValue());
+                       }
+                       Assert.assertEquals(expectedState, map);
+               }
+       }
+
+       /**
+        * Tests that with a {@link KeyedStream} we have to provide a {@link 
KeyedBroadcastProcessFunction}.
+        */
+       @Test(expected = IllegalArgumentException.class)
+       public void testFailedTranslationOnKeyed() {
+
+               final MapStateDescriptor<Long, String> descriptor = new 
MapStateDescriptor<>(
+                               "broadcast", BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+               );
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
+
+                                       @Override
+                                       public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+                                               return element;
+                                       }
+                               }).keyBy((KeySelector<Long, Long>) value -> 
value);
+
+               final DataStream<String> srcTwo = env.fromElements("Test:0", 
"Test:1", "Test:2", "Test:3", "Test:4", "Test:5")
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
+                                       @Override
+                                       public long extractTimestamp(String 
element, long previousElementTimestamp) {
+                                               return 
Long.parseLong(element.split(":")[1]);
+                                       }
+                               });
+
+               BroadcastStream<String, Long, String> broadcast = 
srcTwo.broadcast(descriptor);
+               srcOne.connect(broadcast)
+                               .process(new BroadcastProcessFunction<Long, 
String, String>() {
+                                       @Override
+                                       public void 
processBroadcastElement(String value, Context ctx, Collector<String> out) 
throws Exception {
+                                               // do nothing
+                                       }
+
+                                       @Override
+                                       public void processElement(Long value, 
ReadOnlyContext ctx, Collector<String> out) throws Exception {
+                                               // do nothing
+                                       }
+                               });
+       }
+
+       /**
+        * Tests that with a non-keyed stream we have to provide a {@link 
BroadcastProcessFunction}.
+        */
+       @Test(expected = IllegalArgumentException.class)
+       public void testFailedTranslationOnNonKeyed() {
+
+               final MapStateDescriptor<Long, String> descriptor = new 
MapStateDescriptor<>(
+                               "broadcast", BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO
+               );
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<Long>() {
+
+                                       @Override
+                                       public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+                                               return element;
+                                       }
+                               });
+
+               final DataStream<String> srcTwo = env.fromElements("Test:0", 
"Test:1", "Test:2", "Test:3", "Test:4", "Test:5")
+                               .assignTimestampsAndWatermarks(new 
CustomWmEmitter<String>() {
+                                       @Override
+                                       public long extractTimestamp(String 
element, long previousElementTimestamp) {
+                                               return 
Long.parseLong(element.split(":")[1]);
+                                       }
+                               });
+
+               BroadcastStream<String, Long, String> broadcast = 
srcTwo.broadcast(descriptor);
+               srcOne.connect(broadcast)
+                               .process(new 
KeyedBroadcastProcessFunction<Long, String, String>() {
+
+                                       @Override
+                                       public void 
processBroadcastElement(String value, Context ctx, Collector<String> out) 
throws Exception {
+                                               // do nothing
+                                       }
+
+                                       @Override
+                                       public void processElement(Long value, 
KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+                                               // do nothing
+                                       }
+                               });
+       }
+
+       @Test
        public void operatorTest() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 

Reply via email to