This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3e7f9bd972cd1924670b68915c14eb508714fc3
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Wed Jan 15 16:56:43 2025 +0800

    [FLINK-37135][runtime] Implement Join extension for DataStream V2
    
    This closes #25961
---
 .../datastream/impl/builtin/BuiltinJoinFuncs.java  |  38 ++
 .../TwoInputNonBroadcastJoinProcessFunction.java   |  59 ++++
 .../TwoInputNonBroadcastJoinProcessOperator.java   | 110 ++++++
 .../impl/stream/KeyedPartitionStreamImpl.java      |  53 ++-
 .../flink/datastream/impl/utils/StreamUtils.java   |  17 +
 .../test/streaming/api/datastream/JoinITCase.java  | 387 +++++++++++++++++++++
 6 files changed, 655 insertions(+), 9 deletions(-)

diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java
new file mode 100644
index 00000000000..cbe6b386aaf
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/builtin/BuiltinJoinFuncs.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.builtin;
+
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
+import org.apache.flink.datastream.api.extension.join.JoinType;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction;
+
+/** Join-related implementations in {@link BuiltinFuncs}. */
+public class BuiltinJoinFuncs {
+
+    /**
+     * Wrap the JoinFunction and JoinType within a ProcessFunction to perform 
the Join operation.
+     * Note that the wrapped process function should only be used with 
KeyedStream.
+     */
+    public static <IN1, IN2, OUT> 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join(
+            JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) {
+        return new TwoInputNonBroadcastJoinProcessFunction<>(joinFunction, 
joinType);
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java
new file mode 100644
index 00000000000..0746894afa3
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.join.operators;
+
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
+import org.apache.flink.datastream.api.extension.join.JoinType;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+
+/**
+ * Wrap the user-defined {@link JoinFunction} as {@link 
TwoInputNonBroadcastStreamProcessFunction}
+ * to execute the Join operation within Join extension.
+ */
+public class TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT>
+        implements TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> {
+
+    private final JoinFunction<IN1, IN2, OUT> joinFunction;
+
+    private final JoinType joinType;
+
+    public TwoInputNonBroadcastJoinProcessFunction(
+            JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) {
+        this.joinFunction = joinFunction;
+        this.joinType = joinType;
+    }
+
+    @Override
+    public void processRecordFromFirstInput(
+            IN1 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception {}
+
+    @Override
+    public void processRecordFromSecondInput(
+            IN2 record, Collector<OUT> output, PartitionedContext<OUT> ctx) 
throws Exception {}
+
+    public JoinFunction<IN1, IN2, OUT> getJoinFunction() {
+        return joinFunction;
+    }
+
+    public JoinType getJoinType() {
+        return joinType;
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java
new file mode 100644
index 00000000000..135ca15b938
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/join/operators/TwoInputNonBroadcastJoinProcessOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.join.operators;
+
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.datastream.api.extension.join.JoinType;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.v2.ListStateDescriptor;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Operator for executing the Join operation in Join extension. Note that this 
should be executed
+ * with two {@link KeyedPartitionStream}.
+ */
+public class TwoInputNonBroadcastJoinProcessOperator<K, IN1, IN2, OUT>
+        extends KeyedTwoInputNonBroadcastProcessOperator<K, IN1, IN2, OUT> {
+
+    private final TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT> 
joinProcessFunction;
+
+    private final ListStateDescriptor<IN1> leftStateDescriptor;
+
+    private final ListStateDescriptor<IN2> rightStateDescriptor;
+
+    /** The state that stores the left input records. */
+    private transient ListState<IN1> leftState;
+
+    /** The state that stores the right input records. */
+    private transient ListState<IN2> rightState;
+
+    public TwoInputNonBroadcastJoinProcessOperator(
+            TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
userFunction,
+            ListStateDescriptor<IN1> leftStateDescriptor,
+            ListStateDescriptor<IN2> rightStateDescriptor) {
+        super(userFunction);
+        this.joinProcessFunction =
+                (TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT>) 
userFunction;
+        checkArgument(
+                joinProcessFunction.getJoinType() == JoinType.INNER,
+                "Currently only support INNER join.");
+        this.leftStateDescriptor = leftStateDescriptor;
+        this.rightStateDescriptor = rightStateDescriptor;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        leftState =
+                getOrCreateKeyedState(
+                        VoidNamespace.INSTANCE,
+                        VoidNamespaceSerializer.INSTANCE,
+                        leftStateDescriptor);
+        rightState =
+                getOrCreateKeyedState(
+                        VoidNamespace.INSTANCE,
+                        VoidNamespaceSerializer.INSTANCE,
+                        rightStateDescriptor);
+    }
+
+    @Override
+    public void processElement1(StreamRecord<IN1> element) throws Exception {
+        collector.setTimestampFromStreamRecord(element);
+        IN1 leftRecord = element.getValue();
+        Iterable<IN2> rightRecords = rightState.get();
+        if (rightRecords != null) {
+            for (IN2 rightRecord : rightRecords) {
+                joinProcessFunction
+                        .getJoinFunction()
+                        .processRecord(leftRecord, rightRecord, collector, 
partitionedContext);
+            }
+        }
+        leftState.add(leftRecord);
+    }
+
+    @Override
+    public void processElement2(StreamRecord<IN2> element) throws Exception {
+        collector.setTimestampFromStreamRecord(element);
+        Iterable<IN1> leftRecords = leftState.get();
+        IN2 rightRecord = element.getValue();
+        if (leftRecords != null) {
+            for (IN1 leftRecord : leftState.get()) {
+                joinProcessFunction
+                        .getJoinFunction()
+                        .processRecord(leftRecord, rightRecord, collector, 
partitionedContext);
+            }
+        }
+        rightState.add(rightRecord);
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
index 6fca168b426..4bbabb9653f 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java
@@ -37,11 +37,14 @@ import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessCon
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndTwoNonKeyedPartitionStream;
 import org.apache.flink.datastream.api.stream.ProcessConfigurable;
 import org.apache.flink.datastream.impl.attribute.AttributeParser;
+import 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction;
+import 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessOperator;
 import org.apache.flink.datastream.impl.operators.KeyedProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoInputNonBroadcastProcessOperator;
 import 
org.apache.flink.datastream.impl.operators.KeyedTwoOutputProcessOperator;
 import org.apache.flink.datastream.impl.utils.StreamUtils;
+import org.apache.flink.runtime.state.v2.ListStateDescriptor;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import 
org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
@@ -263,15 +266,22 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         getType(),
                         ((KeyedPartitionStreamImpl<K, T_OTHER>) 
other).getType());
 
-        KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> 
processOperator =
-                new 
KeyedTwoInputNonBroadcastProcessOperator<>(processFunction);
-        Transformation<OUT> outTransformation =
-                StreamUtils.getTwoInputTransformation(
-                        "Keyed-TwoInput-Process",
-                        this,
-                        (KeyedPartitionStreamImpl<K, T_OTHER>) other,
-                        outTypeInfo,
-                        processOperator);
+        Transformation<OUT> outTransformation;
+
+        if (processFunction instanceof 
TwoInputNonBroadcastJoinProcessFunction) {
+            outTransformation = getJoinTransformation(other, processFunction, 
outTypeInfo);
+        } else {
+            KeyedTwoInputNonBroadcastProcessOperator<K, V, T_OTHER, OUT> 
processOperator =
+                    new 
KeyedTwoInputNonBroadcastProcessOperator<>(processFunction);
+            outTransformation =
+                    StreamUtils.getTwoInputTransformation(
+                            "Keyed-TwoInput-Process",
+                            this,
+                            (KeyedPartitionStreamImpl<K, T_OTHER>) other,
+                            outTypeInfo,
+                            processOperator);
+        }
+
         
outTransformation.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(outTransformation);
         return StreamUtils.wrapWithConfigureHandle(
@@ -389,6 +399,31 @@ public class KeyedPartitionStreamImpl<K, V> extends 
AbstractDataStream<V>
                         TypeExtractor.getKeySelectorTypes(newKeySelector, 
outputStream.getType())));
     }
 
+    public <T_OTHER, OUT> Transformation<OUT> getJoinTransformation(
+            KeyedPartitionStream<K, T_OTHER> other,
+            TwoInputNonBroadcastStreamProcessFunction<V, T_OTHER, OUT> 
processFunction,
+            TypeInformation<OUT> outTypeInfo) {
+        ListStateDescriptor<V> leftStateDesc =
+                new ListStateDescriptor<>(
+                        "join-left-state", 
TypeExtractor.createTypeInfo(getType().getTypeClass()));
+        ListStateDescriptor<T_OTHER> rightStateDesc =
+                new ListStateDescriptor<>(
+                        "join-right-state",
+                        TypeExtractor.createTypeInfo(
+                                ((KeyedPartitionStreamImpl<Object, T_OTHER>) 
other)
+                                        .getType()
+                                        .getTypeClass()));
+        TwoInputNonBroadcastJoinProcessOperator<K, V, T_OTHER, OUT> 
joinProcessOperator =
+                new TwoInputNonBroadcastJoinProcessOperator<>(
+                        processFunction, leftStateDesc, rightStateDesc);
+        return StreamUtils.getTwoInputTransformation(
+                "Keyed-Join-Process",
+                this,
+                (KeyedPartitionStreamImpl<K, T_OTHER>) other,
+                outTypeInfo,
+                joinProcessOperator);
+    }
+
     public TypeInformation<K> getKeyType() {
         return keyType;
     }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
index 688b24e7fb1..6ec420a8135 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.connector.dsv2.WrappedSink;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
 import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
 import 
org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
 import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
@@ -33,6 +34,7 @@ import 
org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
 import 
org.apache.flink.datastream.api.stream.GlobalStream.ProcessConfigurableAndGlobalStream;
 import 
org.apache.flink.datastream.api.stream.KeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream;
 import 
org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream;
+import 
org.apache.flink.datastream.impl.extension.join.operators.TwoInputNonBroadcastJoinProcessFunction;
 import org.apache.flink.datastream.impl.stream.AbstractDataStream;
 import org.apache.flink.datastream.impl.stream.GlobalStreamImpl;
 import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl;
@@ -84,6 +86,21 @@ public final class StreamUtils {
                     TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> 
processFunction,
                     TypeInformation<IN1> in1TypeInformation,
                     TypeInformation<IN2> in2TypeInformation) {
+        if (processFunction instanceof 
TwoInputNonBroadcastJoinProcessFunction) {
+            return TypeExtractor.getBinaryOperatorReturnType(
+                    ((TwoInputNonBroadcastJoinProcessFunction<IN1, IN2, OUT>) 
processFunction)
+                            .getJoinFunction(),
+                    JoinFunction.class,
+                    0,
+                    1,
+                    2,
+                    TypeExtractor.NO_INDEX,
+                    in1TypeInformation,
+                    in2TypeInformation,
+                    Utils.getCallLocationName(),
+                    true);
+        }
+
         return TypeExtractor.getBinaryOperatorReturnType(
                 processFunction,
                 TwoInputNonBroadcastStreamProcessFunction.class,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java
new file mode 100644
index 00000000000..2c892feb693
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/datastream/JoinITCase.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.api.datastream;
+
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.context.RuntimeContext;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
+import org.apache.flink.datastream.api.extension.join.JoinType;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test the Join extension on DataStream V2. */
+class JoinITCase implements Serializable {
+    private transient ExecutionEnvironment env;
+    private static List<String> sinkResults;
+
+    @BeforeEach
+    void before() throws Exception {
+        env = ExecutionEnvironment.getInstance();
+        sinkResults = new ArrayList<>();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        sinkResults.clear();
+    }
+
+    @Test
+    void testInnerJoinWithSameKey() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source2");
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(
+                        source1,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        source2,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        new TestJoinFunction(),
+                        JoinType.INNER);
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWithSameKey");
+
+        expectInAnyOrder(
+                "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", 
"key:1:2", "key:2:0",
+                "key:2:1", "key:2:2");
+    }
+
+    @Test
+    void testInnerJoinWithMultipleKeys() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key0", 0),
+                                KeyAndValue.of("key1", 1),
+                                KeyAndValue.of("key2", 2),
+                                KeyAndValue.of("key2", 3)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key2", 4),
+                                KeyAndValue.of("key2", 5),
+                                KeyAndValue.of("key0", 6),
+                                KeyAndValue.of("key1", 7)),
+                        "source2");
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(
+                        source1,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        source2,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        new TestJoinFunction(),
+                        JoinType.INNER);
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWithMultipleKeys");
+
+        expectInAnyOrder("key0:0:6", "key1:1:7", "key2:2:4", "key2:2:5", 
"key2:3:4", "key2:3:5");
+    }
+
+    @Test
+    void testInnerJoinWhenLeftInputNoData() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key0", 0),
+                                KeyAndValue.of("key1", 1),
+                                KeyAndValue.of("key2", 2),
+                                KeyAndValue.of("key2", 3)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(new ArrayList<>(), "source2");
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(
+                        source1,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        source2,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        new TestJoinFunction(),
+                        JoinType.INNER);
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWhenLeftInputNoData");
+
+        expectInAnyOrder();
+    }
+
+    @Test
+    void testInnerJoinWhenRightInputNoData() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(new ArrayList<>(), "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key0", 0),
+                                KeyAndValue.of("key1", 1),
+                                KeyAndValue.of("key2", 2),
+                                KeyAndValue.of("key2", 3)),
+                        "source2");
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(
+                        source1,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        source2,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        new TestJoinFunction(),
+                        JoinType.INNER);
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWhenRightInputNoData");
+
+        expectInAnyOrder();
+    }
+
+    /** Test Join using {@link BuiltinFuncs#join(JoinFunction)}. */
+    @Test
+    void testJoinWithWrappedJoinProcessFunction() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source2");
+
+        KeyedPartitionStream<String, KeyAndValue> keyedStream1 =
+                source1.keyBy((KeySelector<KeyAndValue, String>) keyAndValue 
-> keyAndValue.key);
+        KeyedPartitionStream<String, KeyAndValue> keyedStream2 =
+                source2.keyBy((KeySelector<KeyAndValue, String>) keyAndValue 
-> keyAndValue.key);
+        TwoInputNonBroadcastStreamProcessFunction<KeyAndValue, KeyAndValue, 
String>
+                wrappedJoinProcessFunction = BuiltinFuncs.join(new 
TestJoinFunction());
+        NonKeyedPartitionStream<String> joinedStream =
+                keyedStream1.connectAndProcess(keyedStream2, 
wrappedJoinProcessFunction);
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWithSameKey");
+
+        expectInAnyOrder(
+                "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", 
"key:1:2", "key:2:0",
+                "key:2:1", "key:2:2");
+    }
+
+    /**
+     * Test Join using {@link BuiltinFuncs#join(KeyedPartitionStream, 
KeyedPartitionStream,
+     * JoinFunction)}.
+     */
+    @Test
+    void testJoinWithKeyedStream() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source2");
+
+        KeyedPartitionStream<String, KeyAndValue> keyedStream1 =
+                source1.keyBy((KeySelector<KeyAndValue, String>) keyAndValue 
-> keyAndValue.key);
+        KeyedPartitionStream<String, KeyAndValue> keyedStream2 =
+                source2.keyBy((KeySelector<KeyAndValue, String>) keyAndValue 
-> keyAndValue.key);
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(keyedStream1, keyedStream2, new 
TestJoinFunction());
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWithSameKey");
+
+        expectInAnyOrder(
+                "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", 
"key:1:2", "key:2:0",
+                "key:2:1", "key:2:2");
+    }
+
+    /**
+     * Test Join using {@link BuiltinFuncs#join(NonKeyedPartitionStream, 
KeySelector,
+     * NonKeyedPartitionStream, KeySelector, JoinFunction)}.
+     */
+    @Test
+    void testJoinWithNonKeyedStream() throws Exception {
+        NonKeyedPartitionStream<KeyAndValue> source1 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source1");
+        NonKeyedPartitionStream<KeyAndValue> source2 =
+                getSourceStream(
+                        Arrays.asList(
+                                KeyAndValue.of("key", 0),
+                                KeyAndValue.of("key", 1),
+                                KeyAndValue.of("key", 2)),
+                        "source2");
+
+        NonKeyedPartitionStream<String> joinedStream =
+                BuiltinFuncs.join(
+                        source1,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        source2,
+                        (KeySelector<KeyAndValue, String>) keyAndValue -> 
keyAndValue.key,
+                        new TestJoinFunction());
+
+        joinedStream.toSink(new WrappedSink<>(new TestSink()));
+        env.execute("testInnerJoinWithSameKey");
+
+        expectInAnyOrder(
+                "key:0:0", "key:0:1", "key:0:2", "key:1:0", "key:1:1", 
"key:1:2", "key:2:0",
+                "key:2:1", "key:2:2");
+    }
+
+    private static class KeyAndValue {
+        public final String key;
+        public final int value;
+
+        public KeyAndValue(String key, int value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public static KeyAndValue of(String key, int value) {
+            return new KeyAndValue(key, value);
+        }
+    }
+
+    private NonKeyedPartitionStream<KeyAndValue> getSourceStream(
+            Collection<KeyAndValue> data, String sourceName) {
+        if (data.isEmpty()) {
+            return getEmptySourceStream(data, sourceName);
+        }
+        return env.fromSource(DataStreamV2SourceUtils.fromData(data), 
sourceName);
+    }
+
+    private NonKeyedPartitionStream<KeyAndValue> getEmptySourceStream(
+            Collection<KeyAndValue> data, String sourceName) {
+        // Since env#fromSource fails when the data is empty, we add a dummy 
record and then filter
+        // it out.
+        data.add(KeyAndValue.of("", -1));
+        NonKeyedPartitionStream<KeyAndValue> source =
+                env.fromSource(DataStreamV2SourceUtils.fromData(data), 
sourceName);
+        return source.process(
+                new OneInputStreamProcessFunction<KeyAndValue, KeyAndValue>() {
+                    @Override
+                    public void processRecord(
+                            KeyAndValue record,
+                            Collector<KeyAndValue> output,
+                            PartitionedContext<KeyAndValue> ctx)
+                            throws Exception {}
+                });
+    }
+
+    private static class TestJoinFunction
+            implements JoinFunction<KeyAndValue, KeyAndValue, String> {
+
+        @Override
+        public void processRecord(
+                KeyAndValue leftRecord,
+                KeyAndValue rightRecord,
+                Collector<String> output,
+                RuntimeContext ctx)
+                throws Exception {
+            assertThat(leftRecord.key).isNotNull();
+            assertThat(leftRecord.key).isEqualTo(rightRecord.key);
+            String result = leftRecord.key + ":" + leftRecord.value + ":" + 
rightRecord.value;
+            output.collect(result);
+        }
+    }
+
+    private static class TestSink implements Sink<String> {
+
+        @Override
+        public SinkWriter<String> createWriter(WriterInitContext context) 
throws IOException {
+            return new TestSinkWriter();
+        }
+    }
+
+    private static class TestSinkWriter implements SinkWriter<String> {
+        @Override
+        public void write(String element, Context context)
+                throws IOException, InterruptedException {
+            sinkResults.add(element);
+        }
+
+        @Override
+        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {}
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    private static void expectInAnyOrder(String... expected) {
+        List<String> listExpected = Arrays.asList(expected);
+        Collections.sort(listExpected);
+        Collections.sort(sinkResults);
+        assertThat(listExpected).isEqualTo(sinkResults);
+    }
+}


Reply via email to