This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3d146af119eb25d402daed2ed342c87d7e6ff615 Author: Xu Huang <huangxu.wal...@gmail.com> AuthorDate: Wed Jan 15 16:56:07 2025 +0800 [FLINK-37135][API] Introduce JoinFunction, JoinType and BuiltinFuncs for Join extension in DataStream V2 --- .../flink/datastream/api/builtin/BuiltinFuncs.java | 124 +++++++++++++++++++++ .../api/extension/join/JoinFunction.java | 43 +++++++ .../datastream/api/extension/join/JoinType.java | 32 ++++++ 3 files changed, 199 insertions(+) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java new file mode 100644 index 00000000000..76b8032cf3e --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.builtin; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.extension.join.JoinFunction; +import org.apache.flink.datastream.api.extension.join.JoinType; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; + +/** Built-in functions for all extension of datastream v2. */ +@Experimental +public class BuiltinFuncs { + + // =================== Join =========================== + + static final Class<?> JOIN_FUNCS_INSTANCE; + + static { + try { + JOIN_FUNCS_INSTANCE = + Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinJoinFuncs"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } + + /** + * Wrap the JoinFunction and INNER JoinType within a ProcessFunction to perform the Join + * operation. Note that the wrapped process function should only be used with KeyedStream. + */ + public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join( + JoinFunction<IN1, IN2, OUT> joinFunction) { + return join(joinFunction, JoinType.INNER); + } + + /** + * Wrap the JoinFunction and JoinType within a ProcessFunction to perform the Join operation. + * Note that the wrapped process function should only be used with KeyedStream. + */ + public static <IN1, IN2, OUT> TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> join( + JoinFunction<IN1, IN2, OUT> joinFunction, JoinType joinType) { + try { + return (TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT>) + JOIN_FUNCS_INSTANCE + .getMethod("join", JoinFunction.class, JoinType.class) + .invoke(null, joinFunction, joinType); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Inner join two {@link KeyedPartitionStream}. */ + public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join( + KeyedPartitionStream<KEY, T1> leftStream, + KeyedPartitionStream<KEY, T2> rightStream, + JoinFunction<T1, T2, OUT> joinFunction) { + return join(leftStream, rightStream, joinFunction, JoinType.INNER); + } + + /** Join two {@link KeyedPartitionStream} with the type of {@link JoinType}. */ + public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join( + KeyedPartitionStream<KEY, T1> leftStream, + KeyedPartitionStream<KEY, T2> rightStream, + JoinFunction<T1, T2, OUT> joinFunction, + JoinType joinType) { + return leftStream.connectAndProcess(rightStream, join(joinFunction, joinType)); + } + + /** + * Inner join two {@link NonKeyedPartitionStream}. The two streams will be redistributed by + * {@link KeySelector} respectively. + */ + public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join( + NonKeyedPartitionStream<T1> leftStream, + KeySelector<T1, KEY> leftKeySelector, + NonKeyedPartitionStream<T2> rightStream, + KeySelector<T2, KEY> rightKeySelector, + JoinFunction<T1, T2, OUT> joinFunction) { + return join( + leftStream, + leftKeySelector, + rightStream, + rightKeySelector, + joinFunction, + JoinType.INNER); + } + + /** + * Join two {@link NonKeyedPartitionStream} with the type of {@link JoinType}. The two streams + * will be redistributed by {@link KeySelector} respectively. + */ + public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join( + NonKeyedPartitionStream<T1> leftStream, + KeySelector<T1, KEY> leftKeySelector, + NonKeyedPartitionStream<T2> rightStream, + KeySelector<T2, KEY> rightKeySelector, + JoinFunction<T1, T2, OUT> joinFunction, + JoinType joinType) { + return join( + leftStream.keyBy(leftKeySelector), + rightStream.keyBy(rightKeySelector), + joinFunction, + joinType); + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java new file mode 100644 index 00000000000..79ebb4056f6 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinFunction.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.join; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.RuntimeContext; + +/** + * A functional interface that defines a join operation between two input records of types {@code + * IN1} and {@code IN2}. Note that this is specifically used in non-broadcast joins. + * + * <p>This interface is used to process a pair of records from two different data streams and + * produce an output record of type {@code OUT}. Implementations of this interface can be used to + * define custom join logic in stream processing frameworks. + * + * @param <IN1> the type of the first input record + * @param <IN2> the type of the second input record + * @param <OUT> the type of the output record + */ +@FunctionalInterface +@Experimental +public interface JoinFunction<IN1, IN2, OUT> extends Function { + void processRecord(IN1 leftRecord, IN2 rightRecord, Collector<OUT> output, RuntimeContext ctx) + throws Exception; +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java new file mode 100644 index 00000000000..7deab69da67 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/join/JoinType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.join; + +import org.apache.flink.annotation.Experimental; + +/** + * The type/algorithm of join operation. Currently, we only support regular Join (Non-Window Join). + * Outer joins require the ability to determine when there is no more data for a specific join key, + * which is not feasible for non-window join over unbounded streams. Therefore, we have only + * introduced the INNER join type and plan to introduce more join types as needed in the future. + */ +@Experimental +public enum JoinType { + INNER, +}