This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d146af119eb25d402daed2ed342c87d7e6ff615
Author: Xu Huang <huangxu.wal...@gmail.com>
AuthorDate: Wed Jan 15 16:56:07 2025 +0800

    [FLINK-37135][API] Introduce JoinFunction, JoinType and BuiltinFuncs for 
Join extension in DataStream V2
---
 .../flink/datastream/api/builtin/BuiltinFuncs.java | 124 +++++++++++++++++++++
 .../api/extension/join/JoinFunction.java           |  43 +++++++
 .../datastream/api/extension/join/JoinType.java    |  32 ++++++
 3 files changed, 199 insertions(+)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
new file mode 100644
index 00000000000..76b8032cf3e
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.builtin;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.datastream.api.extension.join.JoinFunction;
+import org.apache.flink.datastream.api.extension.join.JoinType;
+import 
org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+
+/** Built-in functions for all extension of datastream v2. */
+@Experimental
+public class BuiltinFuncs {
+
+    // =================== Join ===========================
+
+    static final Class<?> JOIN_FUNCS_INSTANCE;
+
+    static {
+        try {
+            JOIN_FUNCS_INSTANCE =
+                    
Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinJoinFuncs");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Please ensure that flink-datastream in 
your class path");
+        }
+    }
+
+    /**
+     * Wrap the JoinFunction and INNER JoinType within a ProcessFunction to 
perform the Join
+     * operation. Note that the wrapped process function should only be used 
with KeyedStream.
+     */
+    public static <IN1, IN2, OUT> 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join(
+            JoinFunction<IN1, IN2, OUT> joinFunction) {
+        return join(joinFunction, JoinType.INNER);
+    }
+
+    /**
+     * Wrap the JoinFunction and JoinType within a ProcessFunction to perform 
the Join operation.
+     * Note that the wrapped process function should only be used with 
KeyedStream.
+     */
+    public static <IN1, IN2, OUT> 
TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join(
+            JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) {
+        try {
+            return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>)
+                    JOIN_FUNCS_INSTANCE
+                            .getMethod("join", JoinFunction.class, 
JoinType.class)
+                            .invoke(null, joinFunction, joinType);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** Inner join two {@link KeyedPartitionStream}. */
+    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
+            KeyedPartitionStream<KEY, T1> leftStream,
+            KeyedPartitionStream<KEY, T2> rightStream,
+            JoinFunction<T1, T2, OUT> joinFunction) {
+        return join(leftStream, rightStream, joinFunction, JoinType.INNER);
+    }
+
+    /** Join two {@link KeyedPartitionStream} with the type of {@link 
JoinType}. */
+    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
+            KeyedPartitionStream<KEY, T1> leftStream,
+            KeyedPartitionStream<KEY, T2> rightStream,
+            JoinFunction<T1, T2, OUT> joinFunction,
+            JoinType joinType) {
+        return leftStream.connectAndProcess(rightStream, join(joinFunction, 
joinType));
+    }
+
+    /**
+     * Inner join two {@link NonKeyedPartitionStream}. The two streams will be 
redistributed by
+     * {@link KeySelector} respectively.
+     */
+    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
+            NonKeyedPartitionStream<T1> leftStream,
+            KeySelector<T1, KEY> leftKeySelector,
+            NonKeyedPartitionStream<T2> rightStream,
+            KeySelector<T2, KEY> rightKeySelector,
+            JoinFunction<T1, T2, OUT> joinFunction) {
+        return join(
+                leftStream,
+                leftKeySelector,
+                rightStream,
+                rightKeySelector,
+                joinFunction,
+                JoinType.INNER);
+    }
+
+    /**
+     * Join two {@link NonKeyedPartitionStream} with the type of {@link 
JoinType}. The two streams
+     * will be redistributed by {@link KeySelector} respectively.
+     */
+    public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join(
+            NonKeyedPartitionStream<T1> leftStream,
+            KeySelector<T1, KEY> leftKeySelector,
+            NonKeyedPartitionStream<T2> rightStream,
+            KeySelector<T2, KEY> rightKeySelector,
+            JoinFunction<T1, T2, OUT> joinFunction,
+            JoinType joinType) {
+        return join(
+                leftStream.keyBy(leftKeySelector),
+                rightStream.keyBy(rightKeySelector),
+                joinFunction,
+                joinType);
+    }
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java
new file mode 100644
index 00000000000..79ebb4056f6
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.join;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.RuntimeContext;
+
+/**
+ * A functional interface that defines a join operation between two input 
records of types {@code
+ * IN1} and {@code IN2}. Note that this is specifically used in non-broadcast 
joins.
+ *
+ * <p>This interface is used to process a pair of records from two different 
data streams and
+ * produce an output record of type {@code OUT}. Implementations of this 
interface can be used to
+ * define custom join logic in stream processing frameworks.
+ *
+ * @param <IN1> the type of the first input record
+ * @param <IN2> the type of the second input record
+ * @param <OUT> the type of the output record
+ */
+@FunctionalInterface
+@Experimental
+public interface JoinFunction<IN1, IN2, OUT> extends Function {
+    void processRecord(IN1 leftRecord, IN2 rightRecord, Collector<OUT> output, 
RuntimeContext ctx)
+            throws Exception;
+}
diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java
new file mode 100644
index 00000000000..7deab69da67
--- /dev/null
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.join;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * The type/algorithm of join operation. Currently, we only support regular 
Join (Non-Window Join).
+ * Outer joins require the ability to determine when there is no more data for 
a specific join key,
+ * which is not feasible for non-window join over unbounded streams. 
Therefore, we have only
+ * introduced the INNER join type and plan to introduce more join types as 
needed in the future.
+ */
+@Experimental
+public enum JoinType {
+    INNER,
+}

Reply via email to