This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b3e7f9bd972cd1924670b68915c14eb508714fc3 Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Wed Jan 15 16:56:43 2025 +0800 [FLINK-37135][runtime] Implement Join extension for DataStream V2 This closes #25961 --- .../datastream/impl/builtin/BuiltinJoinFuncs.java | 38 ++ .../TwoInputNonBroadcastJoinProcessFunction.java | 59 ++++ .../TwoInputNonBroadcastJoinProcessOperator.java | 110 ++++++ .../impl/stream/KeyedPartitionStreamImpl.java | 53 ++- .../flink/datastream/impl/utils/StreamUtils.java | 17 + .../test/streaming/api/datastream/JoinITCase.java | 387 +++++++++++++++++++++ 6 files changed, 655 insertions(+), 9 deletions(-) diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java new file mode 100644 index 00000000000..cbe6b386aaf --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.builtin; + +import org.apache.flink.datastream.api.builtin.BuiltinFuncs; +import org.apache.flink.datastream.api.extension.join.JoinFunction; +import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction; + +/** Join-related implementations in {@link BuiltinFuncs}. */ +public class BuiltinJoinFuncs { + + /** + * Wrap the JoinFunction and JoinType within a ProcessFunction to perform the Join operation. + * Note that the wrapped process function should only be used with KeyedStream. + */ + public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join( + JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) { + return new TwoInputNonBroadcastJoinProcessFunction<>(joinFunction, joinType); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java new file mode 100644 index 00000000000..0746894afa3 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.join.operators; + +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.join.JoinFunction; +import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; + +/** + * Wrap the user-defined {@link JoinFunction} as {@link TwoInputNonBroadcastStreamProcessFunction} + * to execute the Join operation within Join extension. + */ +public class TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT> + implements TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> { + + private final JoinFunction<IN1, IN2, OUT> joinFunction; + + private final JoinType joinType; + + public TwoInputNonBroadcastJoinProcessFunction( + JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) { + this.joinFunction = joinFunction; + this.joinType = joinType; + } + + @Override + public void processRecordFromFirstInput( + IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception {} + + @Override + public void processRecordFromSecondInput( + IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception {} + + public JoinFunction<IN1, IN2, OUT> getJoinFunction() { + return joinFunction; + } + + public JoinType getJoinType() { + return joinType; + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java new file mode 100644 index 00000000000..135ca15b938 --- /dev/null +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.join.operators; + +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; +import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Operator for executing the Join operation in Join extension. Note that this should be executed + * with two {@link KeyedPartitionStream}. + */ +public class TwoInputNonBroadcastJoinProcessOperator<K, IN1, IN2, OUT> + extends KeyedTwoInputNonBroadcastProcessOperator<K, IN1, IN2, OUT> { + + private final TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT> joinProcessFunction; + + private final ListStateDescriptor<IN1> leftStateDescriptor; + + private final ListStateDescriptor<IN2> rightStateDescriptor; + + /** The state that stores the left input records. */ + private transient ListState<IN1> leftState; + + /** The state that stores the right input records. */ + private transient ListState<IN2> rightState; + + public TwoInputNonBroadcastJoinProcessOperator( + TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction, + ListStateDescriptor<IN1> leftStateDescriptor, + ListStateDescriptor<IN2> rightStateDescriptor) { + super(userFunction); + this.joinProcessFunction = + (TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT>) userFunction; + checkArgument( + joinProcessFunction.getJoinType() == JoinType.INNER, + "Currently only support INNER join."); + this.leftStateDescriptor = leftStateDescriptor; + this.rightStateDescriptor = rightStateDescriptor; + } + + @Override + public void open() throws Exception { + super.open(); + leftState = + getOrCreateKeyedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + leftStateDescriptor); + rightState = + getOrCreateKeyedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + rightStateDescriptor); + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { + collector.setTimestampFromStreamRecord(element); + IN1 leftRecord = element.getValue(); + Iterable<IN2> rightRecords = rightState.get(); + if (rightRecords != null) { + for (IN2 rightRecord : rightRecords) { + joinProcessFunction + .getJoinFunction() + .processRecord(leftRecord, rightRecord, collector, partitionedContext); + } + } + leftState.add(leftRecord); + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + collector.setTimestampFromStreamRecord(element); + Iterable<IN1> leftRecords = leftState.get(); + IN2 rightRecord = element.getValue(); + if (leftRecords != null) { + for (IN1 leftRecord : leftState.get()) { + joinProcessFunction + .getJoinFunction() + .processRecord(leftRecord, rightRecord, collector, partitionedContext); + } + } + rightState.add(rightRecord); + } +} diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java index 6fca168b426..4bbabb9653f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java @@ -37,11 +37,14 @@ import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessCon import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream; import org.apache.flink.datastream.api.stream.ProcessConfigurable; import org.apache.flink.datastream.impl.attribute.AttributeParser; +import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction; +import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator; import org.apache.flink.datastream.impl.operators.KeyedTwoOutputProcessOperator; import org.apache.flink.datastream.impl.utils.StreamUtils; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; @@ -263,15 +266,22 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> getType(), ((KeyedPartitionStreamImpl<K, T_OTHER>) other).getType()); - KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> processOperator = - new KeyedTwoInputNonBroadcastProcessOperator<>(processFunction); - Transformation<OUT> outTransformation = - StreamUtils.getTwoInputTransformation( - "Keyed-TwoInput-Process", - this, - (KeyedPartitionStreamImpl<K, T_OTHER>) other, - outTypeInfo, - processOperator); + Transformation<OUT> outTransformation; + + if (processFunction instanceof TwoInputNonBroadcastJoinProcessFunction) { + outTransformation = getJoinTransformation(other, processFunction, outTypeInfo); + } else { + KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> processOperator = + new KeyedTwoInputNonBroadcastProcessOperator<>(processFunction); + outTransformation = + StreamUtils.getTwoInputTransformation( + "Keyed-TwoInput-Process", + this, + (KeyedPartitionStreamImpl<K, T_OTHER>) other, + outTypeInfo, + processOperator); + } + outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(outTransformation); return StreamUtils.wrapWithConfigureHandle( @@ -389,6 +399,31 @@ public class KeyedPartitionStreamImpl<K, V> extends AbstractDataStream<V> TypeExtractor.getKeySelectorTypes(newKeySelector, outputStream.getType()))); } + public <T_OTHER, OUT> Transformation<OUT> getJoinTransformation( + KeyedPartitionStream<K, T_OTHER> other, + TwoInputNonBroadcastStreamProcessFunction<V, T_OTHER, OUT> processFunction, + TypeInformation<OUT> outTypeInfo) { + ListStateDescriptor<V> leftStateDesc = + new ListStateDescriptor<>( + "join-left-state", TypeExtractor.createTypeInfo(getType().getTypeClass())); + ListStateDescriptor<T_OTHER> rightStateDesc = + new ListStateDescriptor<>( + "join-right-state", + TypeExtractor.createTypeInfo( + ((KeyedPartitionStreamImpl<Object, T_OTHER>) other) + .getType() + .getTypeClass())); + TwoInputNonBroadcastJoinProcessOperator<K, V, T_OTHER, OUT> joinProcessOperator = + new TwoInputNonBroadcastJoinProcessOperator<>( + processFunction, leftStateDesc, rightStateDesc); + return StreamUtils.getTwoInputTransformation( + "Keyed-Join-Process", + this, + (KeyedPartitionStreamImpl<K, T_OTHER>) other, + outTypeInfo, + joinProcessOperator); + } + public TypeInformation<K> getKeyType() { return keyType; } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java index 688b24e7fb1..6ec420a8135 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.dsv2.WrappedSink; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.join.JoinFunction; import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; @@ -33,6 +34,7 @@ import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; import org.apache.flink.datastream.api.stream.GlobalStream.ProcessConfigurableAndGlobalStream; import org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream; import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream; +import org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction; import org.apache.flink.datastream.impl.stream.AbstractDataStream; import org.apache.flink.datastream.impl.stream.GlobalStreamImpl; import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl; @@ -84,6 +86,21 @@ public final class StreamUtils { TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> processFunction, TypeInformation<IN1> in1TypeInformation, TypeInformation<IN2> in2TypeInformation) { + if (processFunction instanceof TwoInputNonBroadcastJoinProcessFunction) { + return TypeExtractor.getBinaryOperatorReturnType( + ((TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT>) processFunction) + .getJoinFunction(), + JoinFunction.class, + 0, + 1, + 2, + TypeExtractor.NO_INDEX, + in1TypeInformation, + in2TypeInformation, + Utils.getCallLocationName(), + true); + } + return TypeExtractor.getBinaryOperatorReturnType( processFunction, TwoInputNonBroadcastStreamProcessFunction.class, diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java new file mode 100644 index 00000000000..2c892feb693 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.streaming.api.datastream; + +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.builtin.BuiltinFuncs; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.context.RuntimeContext; +import org.apache.flink.datastream.api.extension.join.JoinFunction; +import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test the Join extension on DataStream V2. */ +class JoinITCase implements Serializable { + private transient ExecutionEnvironment env; + private static List<String> sinkResults; + + @BeforeEach + void before() throws Exception { + env = ExecutionEnvironment.getInstance(); + sinkResults = new ArrayList<>(); + } + + @AfterEach + void after() throws Exception { + sinkResults.clear(); + } + + @Test + void testInnerJoinWithSameKey() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source2"); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + source2, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + new TestJoinFunction(), + JoinType.INNER); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWithSameKey"); + + expectInAnyOrder( + "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", "key:1:2", "key:2:0", + "key:2:1", "key:2:2"); + } + + @Test + void testInnerJoinWithMultipleKeys() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key0", 0), + KeyAndValue.of("key1", 1), + KeyAndValue.of("key2", 2), + KeyAndValue.of("key2", 3)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key2", 4), + KeyAndValue.of("key2", 5), + KeyAndValue.of("key0", 6), + KeyAndValue.of("key1", 7)), + "source2"); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + source2, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + new TestJoinFunction(), + JoinType.INNER); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWithMultipleKeys"); + + expectInAnyOrder("key0:0:6", "key1:1:7", "key2:2:4", "key2:2:5", "key2:3:4", "key2:3:5"); + } + + @Test + void testInnerJoinWhenLeftInputNoData() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key0", 0), + KeyAndValue.of("key1", 1), + KeyAndValue.of("key2", 2), + KeyAndValue.of("key2", 3)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream(new ArrayList<>(), "source2"); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + source2, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + new TestJoinFunction(), + JoinType.INNER); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWhenLeftInputNoData"); + + expectInAnyOrder(); + } + + @Test + void testInnerJoinWhenRightInputNoData() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream(new ArrayList<>(), "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key0", 0), + KeyAndValue.of("key1", 1), + KeyAndValue.of("key2", 2), + KeyAndValue.of("key2", 3)), + "source2"); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + source2, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + new TestJoinFunction(), + JoinType.INNER); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWhenRightInputNoData"); + + expectInAnyOrder(); + } + + /** Test Join using {@link BuiltinFuncs#join(JoinFunction)}. */ + @Test + void testJoinWithWrappedJoinProcessFunction() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source2"); + + KeyedPartitionStream<String, KeyAndValue> keyedStream1 = + source1.keyBy((KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key); + KeyedPartitionStream<String, KeyAndValue> keyedStream2 = + source2.keyBy((KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key); + TwoInputNonBroadcastStreamProcessFunction<KeyAndValue, KeyAndValue, String> + wrappedJoinProcessFunction = BuiltinFuncs.join(new TestJoinFunction()); + NonKeyedPartitionStream<String> joinedStream = + keyedStream1.connectAndProcess(keyedStream2, wrappedJoinProcessFunction); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWithSameKey"); + + expectInAnyOrder( + "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", "key:1:2", "key:2:0", + "key:2:1", "key:2:2"); + } + + /** + * Test Join using {@link BuiltinFuncs#join(KeyedPartitionStream, KeyedPartitionStream, + * JoinFunction)}. + */ + @Test + void testJoinWithKeyedStream() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source2"); + + KeyedPartitionStream<String, KeyAndValue> keyedStream1 = + source1.keyBy((KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key); + KeyedPartitionStream<String, KeyAndValue> keyedStream2 = + source2.keyBy((KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join(keyedStream1, keyedStream2, new TestJoinFunction()); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWithSameKey"); + + expectInAnyOrder( + "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", "key:1:2", "key:2:0", + "key:2:1", "key:2:2"); + } + + /** + * Test Join using {@link BuiltinFuncs#join(NonKeyedPartitionStream, KeySelector, + * NonKeyedPartitionStream, KeySelector, JoinFunction)}. + */ + @Test + void testJoinWithNonKeyedStream() throws Exception { + NonKeyedPartitionStream<KeyAndValue> source1 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source1"); + NonKeyedPartitionStream<KeyAndValue> source2 = + getSourceStream( + Arrays.asList( + KeyAndValue.of("key", 0), + KeyAndValue.of("key", 1), + KeyAndValue.of("key", 2)), + "source2"); + + NonKeyedPartitionStream<String> joinedStream = + BuiltinFuncs.join( + source1, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + source2, + (KeySelector<KeyAndValue, String>) keyAndValue -> keyAndValue.key, + new TestJoinFunction()); + + joinedStream.toSink(new WrappedSink<>(new TestSink())); + env.execute("testInnerJoinWithSameKey"); + + expectInAnyOrder( + "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", "key:1:2", "key:2:0", + "key:2:1", "key:2:2"); + } + + private static class KeyAndValue { + public final String key; + public final int value; + + public KeyAndValue(String key, int value) { + this.key = key; + this.value = value; + } + + public static KeyAndValue of(String key, int value) { + return new KeyAndValue(key, value); + } + } + + private NonKeyedPartitionStream<KeyAndValue> getSourceStream( + Collection<KeyAndValue> data, String sourceName) { + if (data.isEmpty()) { + return getEmptySourceStream(data, sourceName); + } + return env.fromSource(DataStreamV2SourceUtils.fromData(data), sourceName); + } + + private NonKeyedPartitionStream<KeyAndValue> getEmptySourceStream( + Collection<KeyAndValue> data, String sourceName) { + // Since env#fromSource fails when the data is empty, we add a dummy record and then filter + // it out. + data.add(KeyAndValue.of("", -1)); + NonKeyedPartitionStream<KeyAndValue> source = + env.fromSource(DataStreamV2SourceUtils.fromData(data), sourceName); + return source.process( + new OneInputStreamProcessFunction<KeyAndValue, KeyAndValue>() { + @Override + public void processRecord( + KeyAndValue record, + Collector<KeyAndValue> output, + PartitionedContext<KeyAndValue> ctx) + throws Exception {} + }); + } + + private static class TestJoinFunction + implements JoinFunction<KeyAndValue, KeyAndValue, String> { + + @Override + public void processRecord( + KeyAndValue leftRecord, + KeyAndValue rightRecord, + Collector<String> output, + RuntimeContext ctx) + throws Exception { + assertThat(leftRecord.key).isNotNull(); + assertThat(leftRecord.key).isEqualTo(rightRecord.key); + String result = leftRecord.key + ":" + leftRecord.value + ":" + rightRecord.value; + output.collect(result); + } + } + + private static class TestSink implements Sink<String> { + + @Override + public SinkWriter<String> createWriter(WriterInitContext context) throws IOException { + return new TestSinkWriter(); + } + } + + private static class TestSinkWriter implements SinkWriter<String> { + @Override + public void write(String element, Context context) + throws IOException, InterruptedException { + sinkResults.add(element); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException {} + + @Override + public void close() throws Exception {} + } + + private static void expectInAnyOrder(String... expected) { + List<String> listExpected = Arrays.asList(expected); + Collections.sort(listExpected); + Collections.sort(sinkResults); + assertThat(listExpected).isEqualTo(sinkResults); + } +}