[FLINK-3659] Expose broadcast state on DataStream API.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6c17bef Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6c17bef Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6c17bef Branch: refs/heads/master Commit: c6c17befe54d55755ebaf160ff20a11aa32bbbca Parents: 484fedd Author: kkloudas <[email protected]> Authored: Thu Dec 21 14:38:54 2017 +0100 Committer: kkloudas <[email protected]> Committed: Wed Feb 7 14:07:45 2018 +0100 ---------------------------------------------------------------------- .../datastream/BroadcastConnectedStream.java | 255 ++++++++ .../api/datastream/BroadcastStream.java | 87 +++ .../streaming/api/datastream/DataStream.java | 41 ++ .../co/BaseBroadcastProcessFunction.java | 105 +++ .../functions/co/BroadcastProcessFunction.java | 93 +++ .../co/KeyedBroadcastProcessFunction.java | 145 ++++ .../api/graph/StreamGraphGenerator.java | 2 +- .../co/CoBroadcastWithKeyedOperator.java | 324 +++++++++ .../co/CoBroadcastWithNonKeyedOperator.java | 228 +++++++ .../transformations/TwoInputTransformation.java | 4 +- .../flink/streaming/api/DataStreamTest.java | 186 ++++++ .../co/CoBroadcastWithKeyedOperatorTest.java | 655 +++++++++++++++++++ .../co/CoBroadcastWithNonKeyedOperatorTest.java | 497 ++++++++++++++ .../util/TwoInputStreamOperatorTestHarness.java | 2 +- 14 files changed, 2620 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java new file mode 100644 index 0000000..aeb3bc2 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.Utils; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator; +import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator; +import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import org.apache.flink.util.Preconditions; + +import java.util.Collections; + +import static java.util.Objects.requireNonNull; + +/** + * A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, + * with a {@link BroadcastStream} with {@link org.apache.flink.api.common.state.BroadcastState + * BroadcastState}. As in the case of {@link ConnectedStreams} these streams are useful for cases + * where operations on one stream directly affect the operations on the other stream, usually via + * shared state between the streams. + * + * <p>An example for the use of such connected streams would be to apply rules that change over time + * onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will + * store them in the broadcast state, while the other stream will contain the elements to apply the + * rules to. By broadcasting the rules, these will be available in all parallel instances, and + * can be applied to all partitions of the other stream. + * + * @param <IN1> The input type of the non-broadcast side. + * @param <IN2> The input type of the broadcast side. + * @param <K> The key type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + * @param <V> The value type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + */ +@PublicEvolving +public class BroadcastConnectedStream<IN1, IN2, K, V> { + + private final StreamExecutionEnvironment environment; + private final DataStream<IN1> inputStream1; + private final BroadcastStream<IN2, K, V> inputStream2; + private final MapStateDescriptor<K, V> broadcastStateDescriptor; + + protected BroadcastConnectedStream( + final StreamExecutionEnvironment env, + final DataStream<IN1> input1, + final BroadcastStream<IN2, K, V> input2, + final MapStateDescriptor<K, V> broadcastStateDescriptor) { + this.environment = requireNonNull(env); + this.inputStream1 = requireNonNull(input1); + this.inputStream2 = requireNonNull(input2); + this.broadcastStateDescriptor = requireNonNull(broadcastStateDescriptor); + } + + public StreamExecutionEnvironment getExecutionEnvironment() { + return environment; + } + + /** + * Returns the non-broadcast {@link DataStream}. + * + * @return The stream which, by convention, is not broadcasted. + */ + public DataStream<IN1> getFirstInput() { + return inputStream1; + } + + /** + * Returns the {@link BroadcastStream}. + * + * @return The stream which, by convention, is the broadcast one. + */ + public BroadcastStream<IN2, K, V> getSecondInput() { + return inputStream2; + } + + /** + * Gets the type of the first input. + * + * @return The type of the first input + */ + public TypeInformation<IN1> getType1() { + return inputStream1.getType(); + } + + /** + * Gets the type of the second input. + * + * @return The type of the second input + */ + public TypeInformation<IN2> getType2() { + return inputStream2.getType(); + } + + /** + * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given + * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream. + * + * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream. + * @param <OUT> The type of the output elements. + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function) { + + TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + function, + KeyedBroadcastProcessFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); + + return process(function, outTypeInfo); + } + + /** + * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given + * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream. + * + * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream. + * @param outTypeInfo The type of the output elements. + * @param <OUT> The type of the output elements. + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <OUT> SingleOutputStreamOperator<OUT> process( + final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function, + final TypeInformation<OUT> outTypeInfo) { + + Preconditions.checkNotNull(function); + Preconditions.checkArgument(inputStream1 instanceof KeyedStream, + "A KeyedBroadcastProcessFunction can only be used with a keyed stream as the second input."); + + TwoInputStreamOperator<IN1, IN2, OUT> operator = + new CoBroadcastWithKeyedOperator<>(function, Collections.singletonList(broadcastStateDescriptor)); + return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator); + } + + /** + * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given + * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream. + * + * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream. + * @param <OUT> The type of the output elements. + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) { + + TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( + function, + BroadcastProcessFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + TypeExtractor.NO_INDEX, + getType1(), + getType2(), + Utils.getCallLocationName(), + true); + + return process(function, outTypeInfo); + } + + /** + * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given + * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream. + * + * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream. + * @param outTypeInfo The type of the output elements. + * @param <OUT> The type of the output elements. + * @return The transformed {@link DataStream}. + */ + @PublicEvolving + public <OUT> SingleOutputStreamOperator<OUT> process( + final BroadcastProcessFunction<IN1, IN2, OUT> function, + final TypeInformation<OUT> outTypeInfo) { + + Preconditions.checkNotNull(function); + Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream), + "A BroadcastProcessFunction can only be used with a non-keyed stream as the second input."); + + TwoInputStreamOperator<IN1, IN2, OUT> operator = + new CoBroadcastWithNonKeyedOperator<>(function, Collections.singletonList(broadcastStateDescriptor)); + return transform("Co-Process-Broadcast", outTypeInfo, operator); + } + + @Internal + private <OUT> SingleOutputStreamOperator<OUT> transform( + final String functionName, + final TypeInformation<OUT> outTypeInfo, + final TwoInputStreamOperator<IN1, IN2, OUT> operator) { + + // read the output type of the input Transforms to coax out errors about MissingTypeInfo + inputStream1.getType(); + inputStream2.getType(); + + TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>( + inputStream1.getTransformation(), + inputStream2.getTransformation(), + functionName, + operator, + outTypeInfo, + environment.getParallelism()); + + if (inputStream1 instanceof KeyedStream) { + KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1; + TypeInformation<?> keyType1 = keyedInput1.getKeyType(); + transform.setStateKeySelectors(keyedInput1.getKeySelector(), null); + transform.setStateKeyType(keyType1); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform); + + getExecutionEnvironment().addOperator(transform); + + return returnStream; + } + + protected <F> F clean(F f) { + return getExecutionEnvironment().clean(f); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java new file mode 100644 index 0000000..e21e36f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastStream.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.StreamTransformation; + +import static java.util.Objects.requireNonNull; + +/** + * A {@code BroadcastStream} is a stream with {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + * This can be created by any stream using the {@link DataStream#broadcast(MapStateDescriptor)} method and + * implicitly creates a state where the user can store elements of the created {@code BroadcastStream}. + * (see {@link BroadcastConnectedStream}). + * + * <p>Note that no further operation can be applied to these streams. The only available option is to connect them + * with a keyed or non-keyed stream, using the {@link KeyedStream#connect(BroadcastStream)} and the + * {@link DataStream#connect(BroadcastStream)} respectively. Applying these methods will result it a + * {@link BroadcastConnectedStream} for further processing. + * + * @param <T> The type of input/output elements. + * @param <K> The key type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + * @param <V> The value type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + */ +@PublicEvolving +public class BroadcastStream<T, K, V> { + + private final StreamExecutionEnvironment environment; + + private final DataStream<T> inputStream; + + /** + * The {@link org.apache.flink.api.common.state.StateDescriptor state descriptor} of the + * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. This state + * has a {@code key-value} format. + */ + private final MapStateDescriptor<K, V> broadcastStateDescriptor; + + protected BroadcastStream( + final StreamExecutionEnvironment env, + final DataStream<T> input, + final MapStateDescriptor<K, V> broadcastStateDescriptor) { + + this.environment = requireNonNull(env); + this.inputStream = requireNonNull(input); + this.broadcastStateDescriptor = requireNonNull(broadcastStateDescriptor); + } + + public TypeInformation<T> getType() { + return inputStream.getType(); + } + + public <F> F clean(F f) { + return environment.clean(f); + } + + public StreamTransformation<T> getTransformation() { + return inputStream.getTransformation(); + } + + public MapStateDescriptor<K, V> getBroadcastStateDescriptor() { + return broadcastStateDescriptor; + } + + public StreamExecutionEnvironment getEnvironment() { + return environment; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 83c1126..d859689 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -253,6 +254,30 @@ public class DataStream<T> { } /** + * Creates a new {@link BroadcastConnectedStream} by connecting the current + * {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}. + * + * <p>The latter can be created using the {@link #broadcast(MapStateDescriptor)} method. + * + * <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)} + * method, where {@code MyFunction} can be either a + * {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction} + * or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction} + * depending on the current stream being a {@link KeyedStream} or not. + * + * @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream. + * @return The {@link BroadcastConnectedStream}. + */ + @PublicEvolving + public <R, K, V> BroadcastConnectedStream<T, R, K, V> connect(BroadcastStream<R, K, V> broadcastStream) { + return new BroadcastConnectedStream<>( + environment, + this, + Preconditions.checkNotNull(broadcastStream), + broadcastStream.getBroadcastStateDescriptor()); + } + + /** * It creates a new {@link KeyedStream} that uses the provided key for partitioning * its operator states. * @@ -373,6 +398,22 @@ public class DataStream<T> { /** * Sets the partitioning of the {@link DataStream} so that the output elements + * are broadcasted to every parallel instance of the next operation. In addition, + * it implicitly creates a {@link org.apache.flink.api.common.state.BroadcastState broadcast state} + * which can be used to store the element of the stream. + * + * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to + * create a {@link BroadcastConnectedStream} for further processing of the elements. + */ + @PublicEvolving + public <K, V> BroadcastStream<T, K, V> broadcast(final MapStateDescriptor<K, V> broadcastStateDescriptor) { + Preconditions.checkNotNull(broadcastStateDescriptor); + final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>()); + return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptor); + } + + /** + * Sets the partitioning of the {@link DataStream} so that the output elements * are shuffled uniformly randomly to the next operation. * * @return The DataStream with shuffle partitioning set. http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java new file mode 100644 index 0000000..9419d80 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.util.OutputTag; + +/** + * The base class containing the functionality available to all broadcast process function. + * These include the {@link BroadcastProcessFunction} and the {@link KeyedBroadcastProcessFunction}. + */ +@PublicEvolving +public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction { + + private static final long serialVersionUID = -131631008887478610L; + + /** + * The base context available to all methods in a broadcast process function. This + * include {@link BroadcastProcessFunction BroadcastProcessFunctions} and + * {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. + */ + abstract class BaseContext { + + /** + * Timestamp of the element currently being processed or timestamp of a firing timer. + * + * <p>This might be {@code null}, for example if the time characteristic of your program + * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + */ + public abstract Long timestamp(); + + /** + * Emits a record to the side output identified by the {@link OutputTag}. + * + * @param outputTag the {@code OutputTag} that identifies the side output to emit to. + * @param value The record to emit. + */ + public abstract <X> void output(OutputTag<X> outputTag, X value); + + /** Returns the current processing time. */ + public abstract long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + public abstract long currentWatermark(); + } + + /** + * A base {@link BaseContext context} available to the broadcasted stream side of + * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream}. + * + * <p>Apart from the basic functionality of a {@link BaseContext context}, + * this also allows to get and update the elements stored in the + * {@link BroadcastState broadcast state}. + * In other words, it gives read/write access to the broadcast state. + */ + public abstract class Context extends BaseContext { + + /** + * Fetches the {@link BroadcastState} with the specified name. + * + * @param stateDescriptor the {@link MapStateDescriptor} of the state to be fetched. + * @return The required {@link BroadcastState broadcast state}. + */ + public abstract <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor); + } + + /** + * A {@link BaseContext context} available to the non-broadcasted stream side of + * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream}. + * + * <p>Apart from the basic functionality of a {@link BaseContext context}, + * this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the + * broadcast state. + */ + public abstract class ReadOnlyContext extends BaseContext { + + /** + * Fetches a read-only view of the broadcast state with the specified name. + * + * @param stateDescriptor the {@link MapStateDescriptor} of the state to be fetched. + * @return The required read-only view of the broadcast state. + */ + public abstract <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java new file mode 100644 index 0000000..4dcc929 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.util.Collector; + +/** + * A function to be applied to a + * {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream} that + * connects {@link org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream}, i.e. a stream + * with broadcast state, with a <b>non-keyed</b> {@link org.apache.flink.streaming.api.datastream.DataStream DataStream}. + * + * <p>The stream with the broadcast state can be created using the + * {@link org.apache.flink.streaming.api.datastream.DataStream#broadcast(MapStateDescriptor) + * stream.broadcast(MapStateDescriptor)} method. + * + * <p>The user has to implement two methods: + * <ol> + * <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to + * each element in the broadcast side + * <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the + * non-broadcasted/keyed side. + * </ol> + * + * <p>The {@code processElementOnBroadcastSide()} takes as argument (among others) a context that allows it to + * read/write to the broadcast state, while the {@code processElement()} has read-only access to the broadcast state. + * + * @param <IN1> The input type of the non-broadcast side. + * @param <IN2> The input type of the broadcast side. + * @param <OUT> The output type of the operator. + */ +@PublicEvolving +public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { + + private static final long serialVersionUID = 8352559162119034453L; + + /** + * This method is called for each element in the (non-broadcast) + * {@link org.apache.flink.streaming.api.datastream.DataStream data stream}. + * + * <p>This function can output zero or more elements using the {@link Collector} parameter, + * query the current processing/event time, and also query and update the local keyed state. + * Finally, it has <b>read-only</b> access to the broadcast state. + * The context is only valid during the invocation of this method, do not store it. + * + * @param value The stream element. + * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element, + * querying the current processing/event time and updating the broadcast state. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ + public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; + + /** + * This method is called for each element in the + * {@link org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}. + * + * <p>This function can output zero or more elements using the {@link Collector} parameter, + * query the current processing/event time, and also query and update the internal + * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. These can be done + * through the provided {@link Context}. + * The context is only valid during the invocation of this method, do not store it. + * + * @param value The stream element. + * @param ctx A {@link Context} that allows querying the timestamp of the element, + * querying the current processing/event time and updating the broadcast state. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ + public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java new file mode 100644 index 0000000..9d14259 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +/** + * A function to be applied to a + * {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream} that + * connects {@link org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream}, i.e. a stream + * with broadcast state, with a {@link org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}. + * + * <p>The stream with the broadcast state can be created using the + * {@link org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor) + * keyedStream.broadcast(MapStateDescriptor)} method. + * + * <p>The user has to implement two methods: + * <ol> + * <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to + * each element in the broadcast side + * <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the + * non-broadcasted/keyed side. + * </ol> + * + * <p>The {@code processElementOnBroadcastSide()} takes as an argument (among others) a context that allows it to + * read/write to the broadcast state and also apply a transformation to all (local) keyed states, while the + * {@code processElement()} has read-only access to the broadcast state, but can read/write to the keyed state and + * register timers. + * + * @param <IN1> The input type of the keyed (non-broadcast) side. + * @param <IN2> The input type of the broadcast side. + * @param <OUT> The output type of the operator. + */ +@PublicEvolving +public abstract class KeyedBroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { + + private static final long serialVersionUID = -2584726797564976453L; + + /** + * This method is called for each element in the (non-broadcast) + * {@link org.apache.flink.streaming.api.datastream.KeyedStream keyed stream}. + * + * <p>It can output zero or more elements using the {@link Collector} parameter, + * query the current processing/event time, and also query and update the local keyed state. + * In addition, it can get a {@link TimerService} for registering timers and querying the time. + * Finally, it has <b>read-only</b> access to the broadcast state. + * The context is only valid during the invocation of this method, do not store it. + * + * @param value The stream element. + * @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element, + * querying the current processing/event time and iterating the broadcast state + * with <b>read-only</b> access. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ + public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception; + + /** + * This method is called for each element in the + * {@link org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}. + * + * <p>It can output zero or more elements using the {@link Collector} parameter, + * query the current processing/event time, and also query and update the internal + * {@link org.apache.flink.api.common.state.BroadcastState broadcast state}. These can + * be done through the provided {@link Context}. + * The context is only valid during the invocation of this method, do not store it. + * + * @param value The stream element. + * @param ctx A {@link Context} that allows querying the timestamp of the element, + * querying the current processing/event time and updating the broadcast state. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ + public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception; + + /** + * Called when a timer set using {@link TimerService} fires. + * + * @param timestamp The timestamp of the firing timer. + * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, + * querying the current processing/event time, iterating the broadcast state + * with <b>read-only</b> access, querying the {@link TimeDomain} of the firing timer + * and getting a {@link TimerService} for registering timers and querying the time. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector for returning result values. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + public void onTimer(final long timestamp, final OnTimerContext ctx, final Collector<OUT> out) throws Exception { + // the default implementation does nothing. + } + + /** + * A {@link BaseBroadcastProcessFunction.Context context} available to the keyed stream side of + * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any). + * + * <p>Apart from the basic functionality of a {@link BaseBroadcastProcessFunction.Context context}, + * this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the + * broadcast state and a {@link TimerService} for querying time and registering timers. + */ + public abstract class KeyedReadOnlyContext extends ReadOnlyContext { + + /** + * A {@link TimerService} for querying time and registering timers. + */ + public abstract TimerService timerService(); + } + + /** + * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. + */ + public abstract class OnTimerContext extends KeyedReadOnlyContext { + + /** + * The {@link TimeDomain} of the firing timer, i.e. if it is + * event or processing time timer. + */ + public abstract TimeDomain timeDomain(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 0a05f09..7d0333f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -586,7 +586,7 @@ public class StreamGraphGenerator { transform.getOutputType(), transform.getName()); - if (transform.getStateKeySelector1() != null) { + if (transform.getStateKeySelector1() != null || transform.getStateKeySelector2() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig()); streamGraph.setTwoInputStateKey(transform.getId(), transform.getStateKeySelector1(), transform.getStateKeySelector2(), keySerializer); } http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java new file mode 100644 index 0000000..794b0db --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}. + * + * @param <KS> The key type of the input keyed stream. + * @param <IN1> The input type of the keyed (non-broadcast) side. + * @param <IN2> The input type of the broadcast side. + * @param <OUT> The output type of the operator. + */ +@Internal +public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> + extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<IN1, IN2, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace> { + + private static final long serialVersionUID = 5926499536290284870L; + + private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors; + + private transient TimestampedCollector<OUT> collector; + + private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates; + + private transient ReadWriteContextImpl rwContext; + + private transient ReadOnlyContextImpl rContext; + + private transient OnTimerContextImpl onTimerContext; + + public CoBroadcastWithKeyedOperator( + final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function, + final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) { + super(function); + this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors); + } + + @Override + public void open() throws Exception { + super.open(); + + InternalTimerService<VoidNamespace> internalTimerService = + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); + + TimerService timerService = new SimpleTimerService(internalTimerService); + + collector = new TimestampedCollector<>(output); + + this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size()); + for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) { + broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor)); + } + + rwContext = new ReadWriteContextImpl(userFunction, broadcastStates, timerService); + rContext = new ReadOnlyContextImpl(userFunction, broadcastStates, timerService); + onTimerContext = new OnTimerContextImpl(userFunction, broadcastStates, timerService); + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { + collector.setTimestamp(element); + rContext.setElement(element); + userFunction.processElement(element.getValue(), rContext, collector); + rContext.setElement(null); + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + collector.setTimestamp(element); + rwContext.setElement(element); + userFunction.processBroadcastElement(element.getValue(), rwContext, collector); + rwContext.setElement(null); + } + + @Override + public void onEventTime(InternalTimer<KS, VoidNamespace> timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + onTimerContext.timeDomain = TimeDomain.EVENT_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; + } + + @Override + public void onProcessingTime(InternalTimer<KS, VoidNamespace> timer) throws Exception { + collector.eraseTimestamp(); + onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME; + onTimerContext.timer = timer; + userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); + onTimerContext.timeDomain = null; + onTimerContext.timer = null; + } + + private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context { + + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; + + private final TimerService timerService; + + private StreamRecord<IN2> element; + + ReadWriteContextImpl ( + final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function, + final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, + final TimerService timerService) { + + function.super(); + this.states = Preconditions.checkNotNull(broadcastStates); + this.timerService = Preconditions.checkNotNull(timerService); + } + + void setElement(StreamRecord<IN2> e) { + this.element = e; + } + + @Override + public Long timestamp() { + checkState(element != null); + return element.getTimestamp(); + } + + @Override + public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor); + BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor); + if (state == null) { + throw new IllegalArgumentException("The requested state does not exist. " + + "Check for typos in your state descriptor, or specify the state descriptor " + + "in the datastream.broadcast(...) call if you forgot to register it."); + } + return state; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + checkArgument(outputTag != null, "OutputTag must not be null."); + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + + @Override + public long currentProcessingTime() { + return timerService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return timerService.currentWatermark(); + } + } + + private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<IN1, IN2, OUT>.KeyedReadOnlyContext { + + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; + + private final TimerService timerService; + + private StreamRecord<IN1> element; + + ReadOnlyContextImpl( + final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function, + final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, + final TimerService timerService) { + + function.super(); + this.states = Preconditions.checkNotNull(broadcastStates); + this.timerService = Preconditions.checkNotNull(timerService); + } + + void setElement(StreamRecord<IN1> e) { + this.element = e; + } + + @Override + public Long timestamp() { + checkState(element != null); + return element.hasTimestamp() ? element.getTimestamp() : null; + } + + @Override + public TimerService timerService() { + return timerService; + } + + @Override + public long currentProcessingTime() { + return timerService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return timerService.currentWatermark(); + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + checkArgument(outputTag != null, "OutputTag must not be null."); + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + + @Override + public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor); + ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); + if (state == null) { + throw new IllegalArgumentException("The requested state does not exist. " + + "Check for typos in your state descriptor, or specify the state descriptor " + + "in the datastream.broadcast(...) call if you forgot to register it."); + } + return state; + } + } + + private class OnTimerContextImpl extends KeyedBroadcastProcessFunction<IN1, IN2, OUT>.OnTimerContext { + + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; + + private final TimerService timerService; + + private TimeDomain timeDomain; + + private InternalTimer<KS, VoidNamespace> timer; + + OnTimerContextImpl( + final KeyedBroadcastProcessFunction<IN1, IN2, OUT> function, + final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, + final TimerService timerService) { + + function.super(); + this.states = Preconditions.checkNotNull(broadcastStates); + this.timerService = Preconditions.checkNotNull(timerService); + } + + @Override + public Long timestamp() { + checkState(timer != null); + return timer.getTimestamp(); + } + + @Override + public TimeDomain timeDomain() { + checkState(timeDomain != null); + return timeDomain; + } + + @Override + public TimerService timerService() { + return timerService; + } + + @Override + public long currentProcessingTime() { + return timerService.currentProcessingTime(); + } + + @Override + public long currentWatermark() { + return timerService.currentWatermark(); + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + checkArgument(outputTag != null, "OutputTag must not be null."); + output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp())); + } + + @Override + public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor); + ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); + if (state == null) { + throw new IllegalArgumentException("The requested state does not exist. " + + "Check for typos in your state descriptor, or specify the state descriptor " + + "in the datastream.broadcast(...) call if you forgot to register it."); + } + return state; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java new file mode 100644 index 0000000..25bf873 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReadOnlyBroadcastState; +import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link TwoInputStreamOperator} for executing {@link BroadcastProcessFunction BroadcastProcessFunctions}. + * + * @param <IN1> The input type of the keyed (non-broadcast) side. + * @param <IN2> The input type of the broadcast side. + * @param <OUT> The output type of the operator. + */ +@Internal +public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> + extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT> { + + private static final long serialVersionUID = -1869740381935471752L; + + /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ + private long currentWatermark = Long.MIN_VALUE; + + private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors; + + private transient TimestampedCollector<OUT> collector; + + private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates; + + private transient ReadWriteContextImpl rwContext; + + private transient ReadOnlyContextImpl rContext; + + public CoBroadcastWithNonKeyedOperator( + final BroadcastProcessFunction<IN1, IN2, OUT> function, + final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) { + super(function); + this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors); + } + + @Override + public void open() throws Exception { + super.open(); + + collector = new TimestampedCollector<>(output); + + this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size()); + for (MapStateDescriptor<?, ?> descriptor: broadcastStateDescriptors) { + broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor)); + } + + rwContext = new ReadWriteContextImpl(userFunction, broadcastStates, getProcessingTimeService()); + rContext = new ReadOnlyContextImpl(userFunction, broadcastStates, getProcessingTimeService()); + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { + collector.setTimestamp(element); + rContext.setElement(element); + userFunction.processElement(element.getValue(), rContext, collector); + rContext.setElement(null); + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + collector.setTimestamp(element); + rwContext.setElement(element); + userFunction.processBroadcastElement(element.getValue(), rwContext, collector); + rwContext.setElement(null); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + currentWatermark = mark.getTimestamp(); + } + + private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context { + + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; + + private final ProcessingTimeService timerService; + + private StreamRecord<IN2> element; + + ReadWriteContextImpl( + final BroadcastProcessFunction<IN1, IN2, OUT> function, + final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, + final ProcessingTimeService timerService) { + + function.super(); + this.states = Preconditions.checkNotNull(broadcastStates); + this.timerService = Preconditions.checkNotNull(timerService); + } + + void setElement(StreamRecord<IN2> e) { + this.element = e; + } + + @Override + public Long timestamp() { + checkState(element != null); + return element.getTimestamp(); + } + + @Override + public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor); + BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor); + if (state == null) { + throw new IllegalArgumentException("The requested state does not exist. " + + "Check for typos in your state descriptor, or specify the state descriptor " + + "in the datastream.broadcast(...) call if you forgot to register it."); + } + return state; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + checkArgument(outputTag != null, "OutputTag must not be null."); + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + + @Override + public long currentProcessingTime() { + return timerService.getCurrentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + } + + private class ReadOnlyContextImpl extends BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext { + + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; + + private final ProcessingTimeService timerService; + + private StreamRecord<IN1> element; + + ReadOnlyContextImpl( + final BroadcastProcessFunction<IN1, IN2, OUT> function, + final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, + final ProcessingTimeService timerService) { + + function.super(); + this.states = Preconditions.checkNotNull(broadcastStates); + this.timerService = Preconditions.checkNotNull(timerService); + } + + void setElement(StreamRecord<IN1> e) { + this.element = e; + } + + @Override + public Long timestamp() { + checkState(element != null); + return element.hasTimestamp() ? element.getTimestamp() : null; + } + + @Override + public <X> void output(OutputTag<X> outputTag, X value) { + checkArgument(outputTag != null, "OutputTag must not be null."); + output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); + } + + @Override + public long currentProcessingTime() { + return timerService.getCurrentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { + Preconditions.checkNotNull(stateDescriptor); + ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); + if (state == null) { + throw new IllegalArgumentException("The requested state does not exist. " + + "Check for typos in your state descriptor, or specify the state descriptor " + + "in the datastream.broadcast(...) call if you forgot to register it."); + } + return state; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java index 5ee055c..1c75921 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/TwoInputTransformation.java @@ -31,8 +31,8 @@ import java.util.List; /** * This Transformation represents the application of a - * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to two input - * {@code StreamTransformations}. The result is again only one stream. + * {@link TwoInputStreamOperator} to two input {@code StreamTransformations}. + * The result is again only one stream. * * @param <IN1> The type of the elements in the first input {@code StreamTransformation} * @param <IN2> The type of the elements in the second input {@code StreamTransformation} http://git-wip-us.apache.org/repos/asf/flink/blob/c6c17bef/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 6fb06d3..59f54b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; @@ -37,6 +38,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -45,9 +47,12 @@ import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; @@ -57,6 +62,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; @@ -78,8 +84,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; + import java.lang.reflect.Method; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -754,6 +764,182 @@ public class DataStreamTest extends TestLogger { } @Test + public void testConnectWithBroadcastTranslation() throws Exception { + + final Map<Long, String> expected = new HashMap<>(); + expected.put(0L, "test:0"); + expected.put(1L, "test:1"); + expected.put(2L, "test:2"); + expected.put(3L, "test:3"); + expected.put(4L, "test:4"); + expected.put(5L, "test:5"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector<Long, Long>) value -> value); + + final DataStream<String> srcTwo = env.fromCollection(expected.values()) + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + final BroadcastStream<String, Long, String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + + // the timestamp should be high enough to trigger the timer after all the elements arrive. + final DataStream<String> output = srcOne.connect(broadcast).process( + new TestBroadcastProcessFunction(100000L, expected)); + + output.addSink(new DiscardingSink<>()); + env.execute(); + } + + private abstract static class CustomWmEmitter<T> implements AssignerWithPunctuatedWatermarks<T> { + + @Nullable + @Override + public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) { + return new Watermark(extractedTimestamp); + } + } + + private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, String, String> { + + private final Map<Long, String> expectedState; + + private final long timerTimestamp; + + static final MapStateDescriptor<Long, String> DESCRIPTOR = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + TestBroadcastProcessFunction( + final long timerTS, + final Map<Long, String> expectedBroadcastState + ) { + expectedState = expectedBroadcastState; + timerTimestamp = timerTS; + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + ctx.timerService().registerEventTimeTimer(timerTimestamp); + } + + @Override + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { + long key = Long.parseLong(value.split(":")[1]); + ctx.getBroadcastState(DESCRIPTOR).put(key, value); + } + + @Override + public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { + Map<Long, String> map = new HashMap<>(); + for (Map.Entry<Long, String> entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + map.put(entry.getKey(), entry.getValue()); + } + Assert.assertEquals(expectedState, map); + } + } + + /** + * Tests that with a {@link KeyedStream} we have to provide a {@link KeyedBroadcastProcessFunction}. + */ + @Test(expected = IllegalArgumentException.class) + public void testFailedTranslationOnKeyed() { + + final MapStateDescriptor<Long, String> descriptor = new MapStateDescriptor<>( + "broadcast", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }).keyBy((KeySelector<Long, Long>) value -> value); + + final DataStream<String> srcTwo = env.fromElements("Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5") + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + BroadcastStream<String, Long, String> broadcast = srcTwo.broadcast(descriptor); + srcOne.connect(broadcast) + .process(new BroadcastProcessFunction<Long, String, String>() { + @Override + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { + // do nothing + } + + @Override + public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception { + // do nothing + } + }); + } + + /** + * Tests that with a non-keyed stream we have to provide a {@link BroadcastProcessFunction}. + */ + @Test(expected = IllegalArgumentException.class) + public void testFailedTranslationOnNonKeyed() { + + final MapStateDescriptor<Long, String> descriptor = new MapStateDescriptor<>( + "broadcast", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStream<Long> srcOne = env.generateSequence(0L, 5L) + .assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() { + + @Override + public long extractTimestamp(Long element, long previousElementTimestamp) { + return element; + } + }); + + final DataStream<String> srcTwo = env.fromElements("Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5") + .assignTimestampsAndWatermarks(new CustomWmEmitter<String>() { + @Override + public long extractTimestamp(String element, long previousElementTimestamp) { + return Long.parseLong(element.split(":")[1]); + } + }); + + BroadcastStream<String, Long, String> broadcast = srcTwo.broadcast(descriptor); + srcOne.connect(broadcast) + .process(new KeyedBroadcastProcessFunction<Long, String, String>() { + + @Override + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { + // do nothing + } + + @Override + public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + // do nothing + } + }); + } + + @Test public void operatorTest() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
