[FLINK-2819] Add Windowed Join/CoGroup Operator Based on Tagged Union Right now, this does everything in memory, so the JVM will blow if data for one key becomes too large.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8634dbbe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8634dbbe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8634dbbe Branch: refs/heads/master Commit: 8634dbbe998af53471bed2f3a6557c722bb37b87 Parents: 47b5cb7 Author: Aljoscha Krettek <[email protected]> Authored: Tue Oct 6 16:33:04 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 7 22:08:25 2015 +0200 ---------------------------------------------------------------------- .../api/datastream/CoGroupedStreams.java | 563 +++++++++++++++++++ .../streaming/api/datastream/DataStream.java | 36 +- .../streaming/api/datastream/JoinedStreams.java | 328 +++++++++++ .../datastream/temporal/StreamJoinOperator.java | 274 --------- .../datastream/temporal/TemporalOperator.java | 124 ---- .../api/datastream/temporal/TemporalWindow.java | 45 -- .../windowing/ReduceWindowFunction.java | 32 +- .../flink/streaming/api/CoGroupJoinITCase.java | 372 ++++++++++++ .../streaming/api/WindowCrossJoinTest.java | 143 ----- .../api/operators/co/SelfConnectionTest.java | 61 -- .../streaming/examples/join/WindowJoin.java | 128 +++-- .../examples/join/util/WindowJoinData.java | 70 +-- .../scala/examples/join/WindowJoin.scala | 43 +- .../join/WindowJoinITCase.java | 101 ++-- .../join/WindowJoinITCase.java | 101 ++-- .../streaming/api/scala/CoGroupedStreams.scala | 294 ++++++++++ .../flink/streaming/api/scala/DataStream.scala | 24 +- .../streaming/api/scala/JoinedStreams.scala | 303 ++++++++++ .../api/scala/StreamJoinOperator.scala | 203 ------- .../streaming/api/scala/TemporalOperator.scala | 51 -- .../streaming/api/scala/CoGroupJoinITCase.scala | 274 +++++++++ .../StreamingScalaAPICompletenessTest.scala | 14 +- 22 files changed, 2409 insertions(+), 1175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java new file mode 100644 index 0000000..e1f1a96 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.util.List; + +/** + *{@code CoGroupedStreams} represents two {@link DataStream DataStreams} that have been co-grouped. + * A streaming co-group operation is evaluated over elements in a window. + * + * <p> + * To finalize co-group operation you also need to specify a {@link KeySelector} for + * both the first and second input and a {@link WindowAssigner}. + * + * <p> + * Note: Right now, the groups are being built in memory so you need to ensure that they don't + * get too big. Otherwise the JVM might crash. + * + * <p> + * Example: + * + * <pre> {@code + * DataStream<Tuple2<String, Integer>> one = ...; + * DataStream<Tuple2<String, Integer>> two = ...; + * + * DataStream<T> result = one.coGroup(two) + * .where(new MyFirstKeySelector()) + * .equalTo(new MyFirstKeySelector()) + * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .apply(new MyCoGroupFunction()); + * } </pre> + */ +public class CoGroupedStreams { + + /** + * A co-group operation that does not yet have its {@link KeySelector KeySelectors} defined. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + */ + public static class Unspecified<T1, T2> { + DataStream<T1> input1; + DataStream<T2> input2; + + protected Unspecified(DataStream<T1> input1, + DataStream<T2> input2) { + this.input1 = input1; + this.input2 = input2; + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) { + return new WithKey<>(input1, input2, input1.clean(keySelector), null); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) { + return new WithKey<>(input1, input2, null, input1.clean(keySelector)); + } + } + + /** + * A co-group operation that has {@link KeySelector KeySelectors} defined for either both or + * one input. + * + * <p> + * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)} + * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a + * {@link WindowAssigner} using {@link #window(WindowAssigner)}. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + * @param <KEY> Type of the key. This must be the same for both inputs + */ + public static class WithKey<T1, T2, KEY> { + DataStream<T1> input1; + DataStream<T2> input2; + + KeySelector<T1, KEY> keySelector1; + KeySelector<T2, KEY> keySelector2; + + protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) { + this.input1 = input1; + this.input2 = input2; + + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) { + return new CoGroupedStreams.WithKey<>(input1, input2, input1.clean(keySelector), keySelector2); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public CoGroupedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) { + return new CoGroupedStreams.WithKey<>(input1, input2, keySelector1, input1.clean(keySelector)); + } + + /** + * Specifies the window on which the co-group operation works. + */ + public <W extends Window> CoGroupedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { + if (keySelector1 == null || keySelector2 == null) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo()."); + + } + return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null); + } + } + + /** + * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as + * well as a {@link WindowAssigner}. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + * @param <KEY> Type of the key. This must be the same for both inputs + * @param <W> Type of {@link Window} on which the co-group operation works. + */ + public static class WithWindow<T1, T2, KEY, W extends Window> { + private final DataStream<T1> input1; + private final DataStream<T2> input2; + + private final KeySelector<T1, KEY> keySelector1; + private final KeySelector<T2, KEY> keySelector2; + + private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; + + private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; + + private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; + + protected WithWindow(DataStream<T1> input1, + DataStream<T2> input2, + KeySelector<T1, KEY> keySelector1, + KeySelector<T2, KEY> keySelector2, + WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, + Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, + Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) { + this.input1 = input1; + this.input2 = input2; + + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + + this.windowAssigner = windowAssigner; + this.trigger = trigger; + this.evictor = evictor; + } + + /** + * Sets the {@code Trigger} that should be used to trigger window emission. + */ + public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { + return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor); + } + + /** + * Sets the {@code Evictor} that should be used to evict elements from a window before emission. + * + * <p> + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { + return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor); + } + + /** + * Completes the co-group operation with the user function that is executed + * for windowed groups. + */ + public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { + + TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + CoGroupFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "CoGroup", + false); + + return apply(function, resultType); + } + + /** + * Completes the co-group operation with the user function that is executed + * for windowed groups. + */ + public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { + //clean the closure + function = input1.getExecutionEnvironment().clean(function); + + DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 + .map(new Input1Tagger<T1, T2>()) + .returns(new UnionTypeInfo<>(input1.getType(), input2.getType())); + DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 + .map(new Input2Tagger<T1, T2>()) + .returns(new UnionTypeInfo<>(input1.getType(), input2.getType())); + + WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp = taggedInput1 + .union(taggedInput2) + .keyBy(new UnionKeySelector<>(keySelector1, keySelector2)) + .window(windowAssigner); + + if (trigger != null) { + windowOp.trigger(trigger); + } + if (evictor != null) { + windowOp.evictor(evictor); + } + + return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); + } + } + + /** + * Creates a new co-group operation from the two given inputs. + */ + public static <T1, T2> Unspecified<T1, T2> createCoGroup(DataStream<T1> input1, DataStream<T2> input2) { + return new Unspecified<>(input1, input2); + } + + /** + * Internal class for implementing tagged union co-group. + */ + public static class TaggedUnion<T1, T2> { + private final T1 one; + private final T2 two; + + private TaggedUnion(T1 one, T2 two) { + this.one = one; + this.two = two; + } + + public boolean isOne() { + return one != null; + } + + public boolean isTwo() { + return two != null; + } + + public T1 getOne() { + return one; + } + + public T2 getTwo() { + return two; + } + + public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) { + return new TaggedUnion<>(one, null); + } + + public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) { + return new TaggedUnion<>(null, two); + } + } + + private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> { + private static final long serialVersionUID = 1L; + + TypeInformation<T1> oneType; + TypeInformation<T2> twoType; + + public UnionTypeInfo(TypeInformation<T1> oneType, + TypeInformation<T2> twoType) { + this.oneType = oneType; + this.twoType = twoType; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 2; + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + @SuppressWarnings("unchecked, rawtypes") + public Class<TaggedUnion<T1, T2>> getTypeClass() { + return (Class) TaggedUnion.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + public TypeSerializer<TaggedUnion<T1, T2>> createSerializer(ExecutionConfig config) { + return new UnionSerializer<>(oneType.createSerializer(config), twoType.createSerializer(config)); + } + + @Override + public String toString() { + return "TaggedUnion<" + oneType + ", " + twoType + ">"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof UnionTypeInfo) { + @SuppressWarnings("unchecked") + UnionTypeInfo<T1, T2> unionTypeInfo = (UnionTypeInfo<T1, T2>) obj; + + return unionTypeInfo.canEqual(this) && oneType.equals(unionTypeInfo.oneType) && twoType.equals(unionTypeInfo.twoType); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 31 * oneType.hashCode() + twoType.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof UnionTypeInfo; + } + } + + private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> { + private static final long serialVersionUID = 1L; + + private final TypeSerializer<T1> oneSerializer; + private final TypeSerializer<T2> twoSerializer; + + public UnionSerializer(TypeSerializer<T1> oneSerializer, + TypeSerializer<T2> twoSerializer) { + this.oneSerializer = oneSerializer; + this.twoSerializer = twoSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<TaggedUnion<T1, T2>> duplicate() { + return this; + } + + @Override + public TaggedUnion<T1, T2> createInstance() { + return null; + } + + @Override + public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from) { + if (from.isOne()) { + return TaggedUnion.one(oneSerializer.copy(from.getOne())); + } else { + return TaggedUnion.two(twoSerializer.copy(from.getTwo())); + } + } + + @Override + public TaggedUnion<T1, T2> copy(TaggedUnion<T1, T2> from, TaggedUnion<T1, T2> reuse) { + if (from.isOne()) { + return TaggedUnion.one(oneSerializer.copy(from.getOne())); + } else { + return TaggedUnion.two(twoSerializer.copy(from.getTwo())); + } } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(TaggedUnion<T1, T2> record, DataOutputView target) throws IOException { + if (record.isOne()) { + target.writeByte(1); + oneSerializer.serialize(record.getOne(), target); + } else { + target.writeByte(2); + twoSerializer.serialize(record.getTwo(), target); + } + } + + @Override + public TaggedUnion<T1, T2> deserialize(DataInputView source) throws IOException { + byte tag = source.readByte(); + if (tag == 1) { + return TaggedUnion.one(oneSerializer.deserialize(source)); + } else { + return TaggedUnion.two(twoSerializer.deserialize(source)); + } + } + + @Override + public TaggedUnion<T1, T2> deserialize(TaggedUnion<T1, T2> reuse, + DataInputView source) throws IOException { + byte tag = source.readByte(); + if (tag == 1) { + return TaggedUnion.one(oneSerializer.deserialize(source)); + } else { + return TaggedUnion.two(twoSerializer.deserialize(source)); + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + byte tag = source.readByte(); + target.writeByte(tag); + if (tag == 1) { + oneSerializer.copy(source, target); + } else { + twoSerializer.copy(source, target); + } + } + + @Override + public int hashCode() { + return 31 * oneSerializer.hashCode() + twoSerializer.hashCode(); + } + + @Override + @SuppressWarnings("unchecked") + public boolean equals(Object obj) { + if (obj instanceof UnionSerializer) { + UnionSerializer<T1, T2> other = (UnionSerializer<T1, T2>) obj; + + return other.canEqual(this) && oneSerializer.equals(other.oneSerializer) && twoSerializer.equals(other.twoSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof UnionSerializer; + } + } + + private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> { + private static final long serialVersionUID = 1L; + + @Override + public TaggedUnion<T1, T2> map(T1 value) throws Exception { + return TaggedUnion.one(value); + } + } + + private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> { + private static final long serialVersionUID = 1L; + + @Override + public TaggedUnion<T1, T2> map(T2 value) throws Exception { + return TaggedUnion.two(value); + } + } + + private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> { + private static final long serialVersionUID = 1L; + + private final KeySelector<T1, KEY> keySelector1; + private final KeySelector<T2, KEY> keySelector2; + + public UnionKeySelector(KeySelector<T1, KEY> keySelector1, + KeySelector<T2, KEY> keySelector2) { + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + } + + @Override + public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{ + if (value.isOne()) { + return keySelector1.getKey(value.getOne()); + } else { + return keySelector2.getKey(value.getTwo()); + } + } + } + + private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> + extends WrappingFunction<CoGroupFunction<T1, T2, T>> + implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { + private static final long serialVersionUID = 1L; + + public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) { + super(userFunction); + } + + @Override + public void apply(KEY key, + W window, + Iterable<TaggedUnion<T1, T2>> values, + Collector<T> out) throws Exception { + List<T1> oneValues = Lists.newArrayList(); + List<T2> twoValues = Lists.newArrayList(); + for (TaggedUnion<T1, T2> val: values) { + if (val.isOne()) { + oneValues.add(val.getOne()); + } else { + twoValues.add(val.getTwo()); + } + } + wrappedFunction.coGroup(oneValues, twoValues, out); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8de1a0d..0be1d56 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -44,7 +44,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.collector.selector.OutputSelector; -import org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.TimestampExtractor; import org.apache.flink.streaming.api.functions.sink.FileSinkFunctionByMillis; @@ -618,30 +617,19 @@ public class DataStream<T> { } /** - * Initiates a temporal Join transformation. <br/> - * A temporal Join transformation joins the elements of two - * {@link DataStream}s on key equality over a specified time window. - * - * <p> - * This method returns a {@link StreamJoinOperator} on which the - * {@link StreamJoinOperator#onWindow(long, java.util.concurrent.TimeUnit)} - * should be called to define the window, and then the - * {@link StreamJoinOperator.JoinWindow#where(int...)} and - * {@link StreamJoinOperator.JoinPredicate#equalTo(int...)} can be used to define - * the join keys. - * <p> - * The user can also use the - * {@link org.apache.flink.streaming.api.datastream.temporal.StreamJoinOperator.JoinPredicate.JoinedStream#with} - * to apply a custom join function. - * - * @param dataStreamToJoin - * The other DataStream with which this DataStream is joined. - * @return A {@link StreamJoinOperator} to continue the definition of the - * Join transformation. - * + * Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys + * and window can be specified. + */ + public <T2> CoGroupedStreams.Unspecified<T, T2> coGroup(DataStream<T2> otherStream) { + return CoGroupedStreams.createCoGroup(this, otherStream); + } + + /** + * Creates a join operation. See {@link JoinedStreams} for an example of how the keys + * and window can be specified. */ - public <IN2> StreamJoinOperator<T, IN2> join(DataStream<IN2> dataStreamToJoin) { - return new StreamJoinOperator<T, IN2>(this, dataStreamToJoin); + public <T2> JoinedStreams.Unspecified<T, T2> join(DataStream<T2> otherStream) { + return JoinedStreams.createJoin(this, otherStream); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java new file mode 100644 index 0000000..ee848e3 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.operators.translation.WrappingFunction; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.evictors.Evictor; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.util.Collector; + +/** + *{@code JoinedStreams} represents two {@link DataStream DataStreams} that have been joined. + * A streaming join operation is evaluated over elements in a window. + * + * <p> + * To finalize the join operation you also need to specify a {@link KeySelector} for + * both the first and second input and a {@link WindowAssigner}. + * + * <p> + * Note: Right now, the the join is being evaluated in memory so you need to ensure that the number + * of elements per key does not get too high. Otherwise the JVM might crash. + * + * <p> + * Example: + * + * <pre> {@code + * DataStream<Tuple2<String, Integer>> one = ...; + * DataStream<Tuple2<String, Integer>> twp = ...; + * + * DataStream<T> result = one.join(two) + * .where(new MyFirstKeySelector()) + * .equalTo(new MyFirstKeySelector()) + * .window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) + * .apply(new MyJoinFunction()); + * } </pre> + */ +public class JoinedStreams extends CoGroupedStreams{ + + /** + * A join operation that does not yet have its {@link KeySelector KeySelectors} defined. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + */ + public static class Unspecified<T1, T2> { + DataStream<T1> input1; + DataStream<T2> input2; + + protected Unspecified(DataStream<T1> input1, + DataStream<T2> input2) { + this.input1 = input1; + this.input2 = input2; + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public <KEY> WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) { + return new WithKey<>(input1, input2, keySelector, null); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public <KEY> WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) { + return new WithKey<>(input1, input2, null, keySelector); + } + } + + /** + * A join operation that has {@link KeySelector KeySelectors} defined for either both or + * one input. + * + * <p> + * You need to specify a {@code KeySelector} for both inputs using {@link #where(KeySelector)} + * and {@link #equalTo(KeySelector)} before you can proceeed with specifying a + * {@link WindowAssigner} using {@link #window(WindowAssigner)}. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + * @param <KEY> Type of the key. This must be the same for both inputs + */ + public static class WithKey<T1, T2, KEY> { + DataStream<T1> input1; + DataStream<T2> input2; + + KeySelector<T1, KEY> keySelector1; + KeySelector<T2, KEY> keySelector2; + + protected WithKey(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) { + this.input1 = input1; + this.input2 = input2; + + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + } + + /** + * Specifies a {@link KeySelector} for elements from the first input. + */ + public WithKey<T1, T2, KEY> where(KeySelector<T1, KEY> keySelector) { + return new JoinedStreams.WithKey<>(input1, input2, keySelector, keySelector2); + } + + /** + * Specifies a {@link KeySelector} for elements from the second input. + */ + public JoinedStreams.WithKey<T1, T2, KEY> equalTo(KeySelector<T2, KEY> keySelector) { + return new JoinedStreams.WithKey<>(input1, input2, keySelector1, keySelector); + } + + /** + * Specifies the window on which the join operation works. + */ + public <W extends Window> JoinedStreams.WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { + if (keySelector1 == null || keySelector2 == null) { + throw new UnsupportedOperationException("You first need to specify KeySelectors for both inputs using where() and equalTo()."); + + } + return new WithWindow<>(input1, input2, keySelector1, keySelector2, assigner, null, null); + } + } + + /** + * A join operation that has {@link KeySelector KeySelectors} defined for both inputs as + * well as a {@link WindowAssigner}. + * + * @param <T1> Type of the elements from the first input + * @param <T2> Type of the elements from the second input + * @param <KEY> Type of the key. This must be the same for both inputs + * @param <W> Type of {@link Window} on which the join operation works. + */ + public static class WithWindow<T1, T2, KEY, W extends Window> { + private final DataStream<T1> input1; + private final DataStream<T2> input2; + + private final KeySelector<T1, KEY> keySelector1; + private final KeySelector<T2, KEY> keySelector2; + + private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; + + private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; + + private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; + + protected WithWindow(DataStream<T1> input1, + DataStream<T2> input2, + KeySelector<T1, KEY> keySelector1, + KeySelector<T2, KEY> keySelector2, + WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, + Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, + Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) { + this.input1 = input1; + this.input2 = input2; + + this.keySelector1 = keySelector1; + this.keySelector2 = keySelector2; + + this.windowAssigner = windowAssigner; + this.trigger = trigger; + this.evictor = evictor; + } + + /** + * Sets the {@code Trigger} that should be used to trigger window emission. + */ + public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { + return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, newTrigger, evictor); + } + + /** + * Sets the {@code Evictor} that should be used to evict elements from a window before emission. + * + * <p> + * Note: When using an evictor window performance will degrade significantly, since + * pre-aggregation of window results cannot be used. + */ + public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { + return new WithWindow<>(input1, input2, keySelector1, keySelector2, windowAssigner, trigger, newEvictor); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window. + */ + public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { + TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "CoGroup", + false); + + return apply(function, resultType); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window. + */ + public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { + //clean the closure + function = input1.getExecutionEnvironment().clean(function); + + return input1.coGroup(input2) + .where(keySelector1) + .equalTo(keySelector2) + .window(windowAssigner) + .trigger(trigger) + .evictor(evictor) + .apply(new FlatJoinCoGroupFunction<>(function), resultType); + + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window. + */ + public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { + TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( + function, + JoinFunction.class, + true, + true, + input1.getType(), + input2.getType(), + "CoGroup", + false); + + return apply(function, resultType); + } + + /** + * Completes the join operation with the user function that is executed + * for each combination of elements with the same key in a window. + */ + public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { + //clean the closure + function = input1.getExecutionEnvironment().clean(function); + + return input1.coGroup(input2) + .where(keySelector1) + .equalTo(keySelector2) + .window(windowAssigner) + .trigger(trigger) + .evictor(evictor) + .apply(new JoinCoGroupFunction<>(function), resultType); + + } + } + + /** + * Creates a new join operation from the two given inputs. + */ + public static <T1, T2> Unspecified<T1, T2> createJoin(DataStream<T1> input1, DataStream<T2> input2) { + return new Unspecified<>(input1, input2); + } + + /** + * CoGroup function that does a nested-loop join to get the join result. + */ + private static class JoinCoGroupFunction<T1, T2, T> + extends WrappingFunction<JoinFunction<T1, T2, T>> + implements CoGroupFunction<T1, T2, T> { + private static final long serialVersionUID = 1L; + + public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { + for (T1 val1: first) { + for (T2 val2: second) { + out.collect(wrappedFunction.join(val1, val2)); + } + } + } + } + + /** + * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version) + */ + private static class FlatJoinCoGroupFunction<T1, T2, T> + extends WrappingFunction<FlatJoinFunction<T1, T2, T>> + implements CoGroupFunction<T1, T2, T> { + private static final long serialVersionUID = 1L; + + public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) { + super(wrappedFunction); + } + + @Override + public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { + for (T1 val1: first) { + for (T2 val2: second) { + wrappedFunction.join(val1, val2, out); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java deleted file mode 100644 index 4a5622d..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamJoinOperator.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream.temporal; - -import java.util.concurrent.TimeUnit; - -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.operators.Keys; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.util.keys.KeySelectorUtil; - -public class StreamJoinOperator<I1, I2> extends - TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> { - - public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) { - super(input1, input2); - } - - @Override - protected JoinWindow<I1, I2> createNextWindowOperator() { - return new JoinWindow<I1, I2>(this); - } - - public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> { - - private StreamJoinOperator<I1, I2> op; - private TypeInformation<I1> type1; - - private JoinWindow(StreamJoinOperator<I1, I2> operator) { - this.op = operator; - this.type1 = op.input1.getType(); - } - - /** - * Continues a temporal Join transformation. <br/> - * Defines the {@link Tuple} fields of the first join {@link DataStream} - * that should be used as join keys.<br/> - * <b>Note: Fields can only be selected as join keys on Tuple - * DataStreams.</b><br/> - * - * @param fields - * The indexes of the other Tuple fields of the first join - * DataStreams that should be used as keys. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public JoinPredicate<I1, I2> where(int... fields) { - return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig())); - } - - /** - * Continues a temporal join transformation. <br/> - * Defines the fields of the first join {@link DataStream} that should - * be used as grouping keys. Fields are the names of member fields of - * the underlying type of the data stream. - * - * @param fields - * The fields of the first join DataStream that should be - * used as keys. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public JoinPredicate<I1, I2> where(String... fields) { - return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys( - new Keys.ExpressionKeys<I1>(fields, type1), type1, op.input1.getExecutionEnvironment().getConfig())); - } - - /** - * Continues a temporal Join transformation and defines a - * {@link KeySelector} function for the first join {@link DataStream} - * .</br> The KeySelector function is called for each element of the - * first DataStream and extracts a single key value on which the - * DataStream is joined. </br> - * - * @param keySelector - * The KeySelector function which extracts the key values - * from the DataStream on which it is joined. - * @return An incomplete Join transformation. Call - * {@link JoinPredicate#equalTo} to continue the Join. - */ - public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) { - return new JoinPredicate<I1, I2>(op, keySelector); - } - - @Override - public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) { - return every(timeUnit.toMillis(length)); - } - - @Override - public JoinWindow<I1, I2> every(long length) { - op.slideInterval = length; - return this; - } - - // ---------------------------------------------------------------------------------------- - - } - - /** - * Intermediate step of a temporal Join transformation. <br/> - * To continue the Join transformation, select the join key of the second - * input {@link DataStream} by calling {@link JoinPredicate#equalTo} - * - */ - public static class JoinPredicate<I1, I2> { - - private StreamJoinOperator<I1, I2> op; - private KeySelector<I1, ?> keys1; - private KeySelector<I2, ?> keys2; - private TypeInformation<I2> type2; - - private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) { - this.op = operator; - this.keys1 = keys1; - this.type2 = op.input2.getType(); - } - - /** - * Creates a temporal Join transformation and defines the {@link Tuple} - * fields of the second join {@link DataStream} that should be used as - * join keys.<br/> - * </p> The resulting operator wraps each pair of joining elements in a - * Tuple2<I1,I2>(first, second). To use a different wrapping function - * use {@link JoinedStream#with(JoinFunction)} - * - * @param fields - * The indexes of the Tuple fields of the second join - * DataStream that should be used as keys. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(int... fields) { - keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2), - type2, op.input1.getExecutionEnvironment().getConfig()); - return createJoinOperator(); - } - - /** - * Creates a temporal Join transformation and defines the fields of the - * second join {@link DataStream} that should be used as join keys. </p> - * The resulting operator wraps each pair of joining elements in a - * Tuple2<I1,I2>(first, second). To use a different wrapping function - * use {@link JoinedStream#with(JoinFunction)} - * - * @param fields - * The fields of the second join DataStream that should be - * used as keys. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(String... fields) { - this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, - type2), type2, op.input1.getExecutionEnvironment().getConfig()); - return createJoinOperator(); - } - - /** - * Creates a temporal Join transformation and defines a - * {@link KeySelector} function for the second join {@link DataStream} - * .</br> The KeySelector function is called for each element of the - * second DataStream and extracts a single key value on which the - * DataStream is joined. </p> The resulting operator wraps each pair of - * joining elements in a Tuple2<I1,I2>(first, second). To use a - * different wrapping function use - * {@link JoinedStream#with(JoinFunction)} - * - * - * @param keySelector - * The KeySelector function which extracts the key values - * from the second DataStream on which it is joined. - * @return A streaming join operator. Call {@link JoinedStream#with} to - * apply a custom wrapping - */ - public <K> JoinedStream<I1, I2, Tuple2<I1, I2>> equalTo(KeySelector<I2, K> keySelector) { - this.keys2 = keySelector; - return createJoinOperator(); - } - - private JoinedStream<I1, I2, Tuple2<I1, I2>> createJoinOperator() { - -// JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>(); -// -// JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction( -// joinFunction, this); -// -// TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>( -// op.input1.getType(), op.input2.getType()); - -// return new JoinedStream<I1, I2, Tuple2<I1, I2>>(this, op.input1 -// .keyBy(keys1) -// .connect(op.input2.keyBy(keys2)) -// .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize, -// op.slideInterval, op.timeStamp1, op.timeStamp2)); - return null; - } - - public static class JoinedStream<I1, I2, R> extends - SingleOutputStreamOperator<R, JoinedStream<I1, I2, R>> { - private final JoinPredicate<I1, I2> predicate; - - private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<R> ds) { - super(ds.getExecutionEnvironment(), ds.getTransformation()); - this.predicate = predicate; - } - - /** - * Completes a stream join. </p> The resulting operator wraps each pair - * of joining elements using the user defined {@link JoinFunction} - * - * @return The joined data stream. - */ - @SuppressWarnings("unchecked") - public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) { - - TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction, - predicate.op.input1.getType(), predicate.op.input2.getType()); - -// JoinWindowFunction<I1, I2, OUT> joinWindowFunction = getJoinWindowFunction(joinFunction, predicate); -// - -// return new JoinedStream<I1, I2, OUT>( -// predicate, predicate.op.input1 -// .keyBy(predicate.keys1) -// .connect(predicate.op.input2.keyBy(predicate.keys2)) -// .addGeneralWindowCombine(joinWindowFunction, outType, predicate.op.windowSize, -// predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2)); - return null; - } - } - } - - public static final class DefaultJoinFunction<I1, I2> implements - JoinFunction<I1, I2, Tuple2<I1, I2>> { - - private static final long serialVersionUID = 1L; - private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>(); - - @Override - public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception { - outTuple.f0 = first; - outTuple.f1 = second; - return outTuple; - } - } - -// private static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction( -// JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) { -// return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction); -// } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java deleted file mode 100644 index 9da00f2..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalOperator.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream.temporal; - -import java.util.concurrent.TimeUnit; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; - -public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> { - - public final DataStream<I1> input1; - public final DataStream<I2> input2; - - public long windowSize; - public long slideInterval; - - public TimestampWrapper<I1> timeStamp1; - public TimestampWrapper<I2> timeStamp2; - - public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) { - if (input1 == null || input2 == null) { - throw new NullPointerException(); - } - this.input1 = input1; - this.input2 = input2; - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be transformed. - * To define sliding windows call {@link TemporalWindow#every} on the - * resulting operator. - * - * @param length - * The size of the window in milliseconds. - * @param timeUnit - * The unit if time to be used - * @return An incomplete temporal transformation. - */ - @SuppressWarnings("unchecked") - public OP onWindow(long length, TimeUnit timeUnit) { - return onWindow(timeUnit.toMillis(length), - (TimestampWrapper<I1>) SystemTimestamp.getWrapper(), - (TimestampWrapper<I2>) SystemTimestamp.getWrapper()); - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be - * transformed.To define sliding windows call {@link TemporalWindow#every} - * on the resulting operator. - * - * @param length - * The size of the window in milliseconds. - * @param timeStamp1 - * The timestamp used to extract time from the elements of the - * first data stream. - * @param timeStamp2 - * The timestamp used to extract time from the elements of the - * second data stream. - * @return An incomplete temporal transformation. - */ - public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) { - return onWindow(length, timeStamp1, timeStamp2, 0); - } - - /** - * Continues a temporal transformation.<br/> - * Defines the window size on which the two DataStreams will be - * transformed.To define sliding windows call {@link TemporalWindow#every} - * on the resulting operator. - * - * @param length - * The size of the window in milliseconds. - * @param timeStamp1 - * The timestamp used to extract time from the elements of the - * first data stream. - * @param timeStamp2 - * The timestamp used to extract time from the elements of the - * second data stream. - * @param startTime - * The start time to measure the first window - * @return An incomplete temporal transformation. - */ - public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2, - long startTime) { - return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime), - new TimestampWrapper<I2>(timeStamp2, startTime)); - } - - private OP onWindow(long length, TimestampWrapper<I1> timeStamp1, - TimestampWrapper<I2> timeStamp2) { - - this.windowSize = length; - this.slideInterval = length; - - this.timeStamp1 = timeStamp1; - this.timeStamp2 = timeStamp2; - - return createNextWindowOperator(); - } - - protected abstract OP createNextWindowOperator(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java deleted file mode 100644 index 49c75c4..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/TemporalWindow.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream.temporal; - -import java.util.concurrent.TimeUnit; - -public interface TemporalWindow<T> { - - /** - * Defines the slide interval for this temporal operator - * - * @param length - * Length of the window - * @param timeUnit - * Unit of time - * @return The temporal operator with slide interval specified - */ - public T every(long length, TimeUnit timeUnit); - - /** - * Defines the slide interval for this temporal operator - * - * @param length - * Length of the window - * @return The temporal operator with slide interval specified - */ - public T every(long length); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java index 042fe18..edd8a34 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceWindowFunction.java @@ -18,37 +18,17 @@ package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.functions.util.FunctionUtils; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; -public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunction<T, T, K, W> { +public class ReduceWindowFunction<K, W extends Window, T> + extends WrappingFunction<ReduceFunction<T>> + implements WindowFunction<T, T, K, W> { private static final long serialVersionUID = 1L; - private final ReduceFunction<T> reduceFunction; - public ReduceWindowFunction(ReduceFunction<T> reduceFunction) { - this.reduceFunction = reduceFunction; - } - - @Override - public void setRuntimeContext(RuntimeContext ctx) { - super.setRuntimeContext(ctx); - FunctionUtils.setFunctionRuntimeContext(reduceFunction, ctx); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - FunctionUtils.openFunction(reduceFunction, parameters); - } - - @Override - public void close() throws Exception { - super.close(); - FunctionUtils.closeFunction(reduceFunction); + super(reduceFunction); } @Override @@ -59,7 +39,7 @@ public class ReduceWindowFunction<K, W extends Window, T> extends RichWindowFunc if (result == null) { result = v; } else { - result = reduceFunction.reduce(result, v); + result = wrappedFunction.reduce(result, v); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java new file mode 100644 index 0000000..c06a608 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java @@ -0,0 +1,372 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.flink.streaming.api; + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.TimestampExtractor; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase { + + private static List<String> testResults; + + @Test + public void testCoGroup() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().enableTimestamps(); + + DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + ctx.collect(Tuple2.of("a", 2)); + + ctx.collect(Tuple2.of("b", 3)); + ctx.collect(Tuple2.of("b", 4)); + ctx.collect(Tuple2.of("b", 5)); + + ctx.collect(Tuple2.of("a", 6)); + ctx.collect(Tuple2.of("a", 7)); + ctx.collect(Tuple2.of("a", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception { + ctx.collect(Tuple2.of("a", 0)); + ctx.collect(Tuple2.of("a", 1)); + + ctx.collect(Tuple2.of("b", 3)); + + ctx.collect(Tuple2.of("c", 6)); + ctx.collect(Tuple2.of("c", 7)); + ctx.collect(Tuple2.of("c", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple2TimestampExtractor()); + + + source1.coGroup(source2) + .where(new Tuple2KeyExtractor()) + .equalTo(new Tuple2KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() { + @Override + public void coGroup(Iterable<Tuple2<String, Integer>> first, + Iterable<Tuple2<String, Integer>> second, + Collector<String> out) throws Exception { + StringBuilder result = new StringBuilder(); + result.append("F:"); + for (Tuple2<String, Integer> t: first) { + result.append(t.toString()); + } + result.append(" S:"); + for (Tuple2<String, Integer> t: second) { + result.append(t.toString()); + } + out.collect(result.toString()); + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("CoGroup Test"); + + List<String> expectedResult = Lists.newArrayList( + "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)", + "F:(b,3)(b,4)(b,5) S:(b,3)", + "F:(a,6)(a,7)(a,8) S:", + "F: S:(c,6)(c,7)(c,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testJoin() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().enableTimestamps(); + + DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "u", 0)); + ctx.collect(Tuple3.of("a", "w", 1)); + + ctx.collect(Tuple3.of("b", "i", 3)); + ctx.collect(Tuple3.of("b", "k", 5)); + + ctx.collect(Tuple3.of("a", "x", 6)); + ctx.collect(Tuple3.of("a", "z", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + + source1.join(source2) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { + @Override + public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Join Test"); + + List<String> expectedResult = Lists.newArrayList( + "(a,x,0):(a,u,0)", + "(a,x,0):(a,w,1)", + "(a,y,1):(a,u,0)", + "(a,y,1):(a,w,1)", + "(a,z,2):(a,u,0)", + "(a,z,2):(a,w,1)", + "(b,u,3):(b,i,3)", + "(b,u,3):(b,k,5)", + "(b,w,5):(b,i,3)", + "(b,w,5):(b,k,5)", + "(a,i,6):(a,x,6)", + "(a,i,6):(a,z,8)", + "(a,j,7):(a,x,6)", + "(a,j,7):(a,z,8)", + "(a,k,8):(a,x,6)", + "(a,k,8):(a,z,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + @Test + public void testSelfJoin() throws Exception { + + testResults = Lists.newArrayList(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.getConfig().enableTimestamps(); + + DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { + ctx.collect(Tuple3.of("a", "x", 0)); + ctx.collect(Tuple3.of("a", "y", 1)); + ctx.collect(Tuple3.of("a", "z", 2)); + + ctx.collect(Tuple3.of("b", "u", 3)); + ctx.collect(Tuple3.of("b", "w", 5)); + + ctx.collect(Tuple3.of("a", "i", 6)); + ctx.collect(Tuple3.of("a", "j", 7)); + ctx.collect(Tuple3.of("a", "k", 8)); + } + + @Override + public void cancel() { + } + }).extractTimestamp(new Tuple3TimestampExtractor()); + + source1.join(source1) + .where(new Tuple3KeyExtractor()) + .equalTo(new Tuple3KeyExtractor()) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() { + @Override + public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception { + return first + ":" + second; + } + }) + .addSink(new SinkFunction<String>() { + @Override + public void invoke(String value) throws Exception { + testResults.add(value); + } + }); + + env.execute("Self-Join Test"); + + List<String> expectedResult = Lists.newArrayList( + "(a,x,0):(a,x,0)", + "(a,x,0):(a,y,1)", + "(a,x,0):(a,z,2)", + "(a,y,1):(a,x,0)", + "(a,y,1):(a,y,1)", + "(a,y,1):(a,z,2)", + "(a,z,2):(a,x,0)", + "(a,z,2):(a,y,1)", + "(a,z,2):(a,z,2)", + "(b,u,3):(b,u,3)", + "(b,u,3):(b,w,5)", + "(b,w,5):(b,u,3)", + "(b,w,5):(b,w,5)", + "(a,i,6):(a,i,6)", + "(a,i,6):(a,j,7)", + "(a,i,6):(a,k,8)", + "(a,j,7):(a,i,6)", + "(a,j,7):(a,j,7)", + "(a,j,7):(a,k,8)", + "(a,k,8):(a,i,6)", + "(a,k,8):(a,j,7)", + "(a,k,8):(a,k,8)"); + + Collections.sort(expectedResult); + Collections.sort(testResults); + + Assert.assertEquals(expectedResult, testResults); + } + + private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1; + } + + @Override + public long emitWatermark(Tuple2<String, Integer> element, long currentTimestamp) { + return element.f1 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> { + private static final long serialVersionUID = 1L; + + @Override + public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) { + return element.f2; + } + + @Override + public long emitWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) { + return element.f2 - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + } + + private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple2<String, Integer> value) throws Exception { + return value.f0; + } + } + + private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> { + private static final long serialVersionUID = 1L; + + @Override + public String getKey(Tuple3<String, String, Integer> value) throws Exception { + return value.f0; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java deleted file mode 100644 index 7d2a131..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import static org.junit.Assert.assertEquals; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashSet; - -import org.apache.flink.api.common.functions.CrossFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.helper.Timestamp; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; -import org.apache.flink.streaming.util.TestListResultSink; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.junit.Ignore; -import org.junit.Test; - -public class WindowCrossJoinTest extends StreamingMultipleProgramsTestBase { - - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> joinExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); - private static ArrayList<Tuple2<Tuple2<Integer, String>, Integer>> crossExpectedResults = new ArrayList<Tuple2<Tuple2<Integer, String>, Integer>>(); - - private static class MyTimestamp<T> implements Timestamp<T> { - - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(T value) { - return 101L; - } - } - - /** - * TODO: enable once new join operator is ready - * @throws Exception - */ - @Ignore - @Test - public void test() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - env.setBufferTimeout(1); - - TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> joinResultSink = - new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>(); - TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>> crossResultSink = - new TestListResultSink<Tuple2<Tuple2<Integer, String>, Integer>>(); - - ArrayList<Tuple2<Integer, String>> in1 = new ArrayList<Tuple2<Integer, String>>(); - ArrayList<Tuple1<Integer>> in2 = new ArrayList<Tuple1<Integer>>(); - - in1.add(new Tuple2<Integer, String>(10, "a")); - in1.add(new Tuple2<Integer, String>(20, "b")); - in1.add(new Tuple2<Integer, String>(20, "x")); - in1.add(new Tuple2<Integer, String>(0, "y")); - - in2.add(new Tuple1<Integer>(0)); - in2.add(new Tuple1<Integer>(5)); - in2.add(new Tuple1<Integer>(20)); - - joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "b"), 20)); - joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "x"), 20)); - joinExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(0, "y"), 0)); - - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(10, "a"), 0)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(10, "a"), 5)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(10, "a"), 20)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "b"), 0)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "b"), 5)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "b"), 20)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "x"), 0)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "x"), 5)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(20, "x"), 20)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(0, "y"), 0)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(0, "y"), 5)); - crossExpectedResults.add(new Tuple2<Tuple2<Integer, String>, Integer>( - new Tuple2<Integer, String>(0, "y"), 20)); - - DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1); - DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2); - - inStream1 - .join(inStream2) - .onWindow(1000, new MyTimestamp<Tuple2<Integer, String>>(), - new MyTimestamp<Tuple1<Integer>>(), 100).where(0).equalTo(0) - .map(new ResultMap()) - .addSink(joinResultSink); - - env.execute(); - - assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinExpectedResults), - new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(joinResultSink.getResult())); - assertEquals(new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossExpectedResults), - new HashSet<Tuple2<Tuple2<Integer, String>, Integer>>(crossResultSink.getResult())); - } - - private static class ResultMap implements - MapFunction<Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>, - Tuple2<Tuple2<Integer, String>, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Tuple2<Integer, String>, Integer> map(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) throws Exception { - return new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8634dbbe/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java index fc9de1d..c116c01 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java @@ -47,67 +47,6 @@ public class SelfConnectionTest extends StreamingMultipleProgramsTestBase { private static List<String> expected; /** - * TODO: enable once new join operator is implemented - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Ignore - @Test - public void sameDataStreamTest() { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - - TestListResultSink<String> resultSink = new TestListResultSink<String>(); - - Timestamp<Integer> timeStamp = new IntegerTimestamp(); - - KeySelector keySelector = new KeySelector<Integer, Integer>() { - - private static final long serialVersionUID = 1L; - - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }; - - DataStream<Integer> src = env.fromElements(1, 3, 5); - - @SuppressWarnings("unused") - DataStreamSink<Tuple2<Integer, Integer>> dataStream = - src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector) - .map(new MapFunction<Tuple2<Integer, Integer>, String>() { - - private static final long serialVersionUID = 1L; - - @Override - public String map(Tuple2<Integer, Integer> value) throws Exception { - return value.toString(); - } - }) - .addSink(resultSink); - - - try { - env.execute(); - - expected = new ArrayList<String>(); - - expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)")); - - List<String> result = resultSink.getResult(); - - Collections.sort(expected); - Collections.sort(result); - - assertEquals(expected, result); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - /** * We connect two different data streams in a chain to a CoMap. */ @Test
