This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5180b49a4e77 feat(flink): lookup join with retry and async
capabilities (#18193)
5180b49a4e77 is described below
commit 5180b49a4e7767b73141799588140e933f5aa283
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Feb 16 12:00:37 2026 +0700
feat(flink): lookup join with retry and async capabilities (#18193)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../apache/hudi/configuration/FlinkOptions.java | 11 +++
.../org/apache/hudi/table/HoodieTableSource.java | 10 ++-
.../table/lookup/AsyncLookupFunctionWrapper.java | 87 ++++++++++++++++++++++
.../hudi/table/lookup/HoodieLookupFunction.java | 34 ++++-----
.../table/lookup/LookupRuntimeProviderFactory.java | 33 ++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 56 +++++++++++++-
6 files changed, 207 insertions(+), 24 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 5dee4b8b1213..b2ac00c2171b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -1301,6 +1301,17 @@ public class FlinkOptions extends HoodieConfig {
.withDescription(
"The cache TTL (e.g. 10min) for the build table in lookup
join.");
+ public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+ key("lookup.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable async lookup join.");
+
+ public static final ConfigOption<Integer> LOOKUP_ASYNC_THREAD_NUMBER =
+ key("lookup.async-thread-number")
+ .intType()
+ .defaultValue(16)
+ .withDescription("The thread number for lookup async.");
// -------------------------------------------------------------------------
// Utilities
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 94eb813b01cb..5f898e73ef57 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.table;
import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
-import org.apache.hudi.adapter.TableFunctionProviderAdapter;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -72,6 +71,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
import org.apache.hudi.table.lookup.HoodieLookupFunction;
import org.apache.hudi.table.lookup.HoodieLookupTableReader;
+import org.apache.hudi.table.lookup.LookupRuntimeProviderFactory;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.ExpressionUtils;
import org.apache.hudi.util.ChangelogModes;
@@ -125,6 +125,8 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_ASYNC;
+import static
org.apache.hudi.configuration.FlinkOptions.LOOKUP_ASYNC_THREAD_NUMBER;
import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_JOIN_CACHE_TTL;
import static
org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;
import static org.apache.hudi.util.ExpressionUtils.filterSimpleCallExpression;
@@ -399,14 +401,16 @@ public class HoodieTableSource extends FileIndexReader
implements
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context)
{
Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
- return TableFunctionProviderAdapter.of(
+ boolean asyncEnabled = conf.get(LOOKUP_ASYNC);
+ int asyncThreadNumber = conf.get(LOOKUP_ASYNC_THREAD_NUMBER);
+ return LookupRuntimeProviderFactory.create(
new HoodieLookupFunction(
new HoodieLookupTableReader(this::getBatchInputFormat, conf),
(RowType) getProducedDataType().notNull().getLogicalType(),
getLookupKeys(context.getKeys()),
duration,
conf
- ));
+ ), asyncEnabled, asyncThreadNumber);
}
private DataType getProducedDataType() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
new file mode 100644
index 000000000000..af33caaaa03d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A {@link AsyncLookupFunction} to wrap sync function. */
+public class AsyncLookupFunctionWrapper extends AsyncLookupFunction {
+
+ private final LookupFunction function;
+ private final int threadNumber;
+
+ private transient ExecutorService lazyExecutor;
+
+ public AsyncLookupFunctionWrapper(LookupFunction function, int threadNumber)
{
+ this.function = function;
+ this.threadNumber = threadNumber;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ function.open(context);
+ }
+
+ private Collection<RowData> lookup(RowData keyRow) {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
Thread.currentThread().setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
+ try {
+ synchronized (function) {
+ return function.lookup(keyRow);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(cl);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+ return CompletableFuture.supplyAsync(() -> lookup(keyRow), executor());
+ }
+
+ @Override
+ public void close() throws Exception {
+ function.close();
+ if (lazyExecutor != null) {
+ lazyExecutor.shutdownNow();
+ lazyExecutor = null;
+ }
+ }
+
+ private ExecutorService executor() {
+ if (lazyExecutor == null) {
+ lazyExecutor = Executors.newFixedThreadPool(threadNumber,
+ new ExecutorThreadFactory(Thread.currentThread().getName() +
"-async"));
+ }
+ return lazyExecutor;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index d6b2db86b775..6663eefd01a7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -27,20 +27,21 @@ import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;
+import java.io.Closeable;
+import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +52,9 @@ import java.util.Map;
* <p>Note: reference Flink FileSystemLookupFunction to avoid additional
connector jar dependencies.
*/
@Slf4j
-public class HoodieLookupFunction extends TableFunction<RowData> {
+public class HoodieLookupFunction extends LookupFunction implements
Serializable, Closeable {
+
+ private static final long serialVersionUID = 1L;
// the max number of retries before throwing exception, in case of failure
to load the table
// into cache
@@ -73,6 +76,7 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
private transient HoodieTableMetaClient metaClient;
private transient HoodieInstant currentCommit;
private final Configuration conf;
+ protected FunctionContext functionContext;
public HoodieLookupFunction(
HoodieLookupTableReader partitionReader,
@@ -94,7 +98,7 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
@Override
public void open(FunctionContext context) throws Exception {
- super.open(context);
+ functionContext = context;
cache = new HashMap<>();
nextLoadTime = -1L;
org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
@@ -102,18 +106,12 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
}
@Override
- public TypeInformation<RowData> getResultType() {
- return InternalTypeInfo.of(rowType);
- }
-
- public void eval(Object... values) {
- checkCacheReload();
- RowData lookupKey = GenericRowData.of(values);
- List<RowData> matchedRows = cache.get(lookupKey);
- if (matchedRows != null) {
- for (RowData matchedRow : matchedRows) {
- collect(matchedRow);
- }
+ public Collection<RowData> lookup(RowData keyRow) {
+ try {
+ checkCacheReload();
+ return cache.get(keyRow);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@@ -189,7 +187,7 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
}
@Override
- public void close() throws Exception {
+ public void close() {
// no operation
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 000000000000..36aaba4e9800
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+ public static LookupRuntimeProvider create(HoodieLookupFunction function,
boolean enableAsync, int asyncThreadNumber) {
+ return enableAsync
+ ? AsyncLookupFunctionProvider.of(new
AsyncLookupFunctionWrapper(function, asyncThreadNumber))
+ : LookupFunctionProvider.of(function);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 6c1cc68705cc..239a254a4f5f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -105,6 +105,7 @@ import static
org.apache.hudi.utils.TestData.assertRowsEquals;
import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
import static org.apache.hudi.utils.TestData.map;
import static org.apache.hudi.utils.TestData.row;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
@@ -825,8 +826,8 @@ public class ITTestHoodieDataSource {
}
@ParameterizedTest
- @EnumSource(value = HoodieTableType.class)
- void testLookupJoin(HoodieTableType tableType) {
+ @MethodSource("tableTypeAndAsyncLookupParams")
+ void testLookupJoin(HoodieTableType tableType, boolean async) {
TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
@@ -849,7 +850,7 @@ public class ITTestHoodieDataSource {
// Join two hudi tables with the same data
String sql = "insert into t2 select b.* from t1_view o "
- + " join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day') */ "
+ + " join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day',
'lookup.async'='" + async + "') */ "
+ " FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
execInsertSql(tableEnv, sql);
List<Row> result = CollectionUtil.iterableToList(
@@ -858,6 +859,42 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
}
+ private void initTablesForLookupJoin(HoodieTableType tableType) {
+ String tDDL = "create table T(i INT PRIMARY KEY NOT ENFORCED, `proctime`
AS PROCTIME())"
+ + " with ('connector'='hudi', 'path'='" + tempFile.getAbsolutePath() +
"/T')";
+ streamTableEnv.executeSql(tDDL);
+ String dimDDL = "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT,
k1 INT, k2 INT) "
+ + "with ('connector'='hudi', 'table.type'='" + tableType + "',"
+ + " 'path'='" + tempFile.getAbsolutePath() + "/DIM',
'continuous.discovery-interval'='1 ms')";
+ streamTableEnv.executeSql(dimDDL);
+ }
+
+ @ParameterizedTest
+ @MethodSource("tableTypeAndAsyncLookupParams")
+ void testLookup(HoodieTableType tableType, boolean async) {
+ initTablesForLookupJoin(tableType);
+ execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (1, 11, 111, 1111),
(2, 22, 222, 2222)");
+ execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3)");
+
+ String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('lookup.async'='" + async
+ + "', 'lookup.join.cache.ttl'='1s') */ for system_time as of
T.proctime AS D ON T.i = D.i";
+ List<Row> result = CollectionUtil.iterableToList(() ->
streamTableEnv.executeSql(query).collect());
+ assertThat(result).containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (2, 44, 444, 4444),
(3, 33, 333, 3333)");
+ execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3), (4)");
+
+ result = CollectionUtil.iterableToList(() ->
streamTableEnv.executeSql(query).collect());
+ assertThat(result).containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 44, 444, 4444),
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+ }
+
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
@@ -3148,6 +3185,19 @@ public class ITTestHoodieDataSource {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (table type, async lookup).
+ */
+ private static Stream<Arguments> tableTypeAndAsyncLookupParams() {
+ Object[][] data = new Object[][] {
+ {HoodieTableType.COPY_ON_WRITE, false},
+ {HoodieTableType.COPY_ON_WRITE, true},
+ {HoodieTableType.MERGE_ON_READ, false},
+ {HoodieTableType.MERGE_ON_READ, true}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
private static Stream<Arguments> parametersForMetaColumnsSkip() {
Object[][] data =
new Object[][] {