This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5180b49a4e77 feat(flink): lookup join with retry and async 
capabilities (#18193)
5180b49a4e77 is described below

commit 5180b49a4e7767b73141799588140e933f5aa283
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Feb 16 12:00:37 2026 +0700

    feat(flink): lookup join with retry and async capabilities (#18193)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../apache/hudi/configuration/FlinkOptions.java    | 11 +++
 .../org/apache/hudi/table/HoodieTableSource.java   | 10 ++-
 .../table/lookup/AsyncLookupFunctionWrapper.java   | 87 ++++++++++++++++++++++
 .../hudi/table/lookup/HoodieLookupFunction.java    | 34 ++++-----
 .../table/lookup/LookupRuntimeProviderFactory.java | 33 ++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 56 +++++++++++++-
 6 files changed, 207 insertions(+), 24 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 5dee4b8b1213..b2ac00c2171b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -1301,6 +1301,17 @@ public class FlinkOptions extends HoodieConfig {
           .withDescription(
               "The cache TTL (e.g. 10min) for the build table in lookup 
join.");
 
+  public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+      key("lookup.async")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("Whether to enable async lookup join.");
+
+  public static final ConfigOption<Integer> LOOKUP_ASYNC_THREAD_NUMBER =
+      key("lookup.async-thread-number")
+          .intType()
+          .defaultValue(16)
+          .withDescription("The thread number for lookup async.");
 
   // -------------------------------------------------------------------------
   //  Utilities
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 94eb813b01cb..5f898e73ef57 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -20,7 +20,6 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
 import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
-import org.apache.hudi.adapter.TableFunctionProviderAdapter;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -72,6 +71,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
 import org.apache.hudi.table.lookup.HoodieLookupFunction;
 import org.apache.hudi.table.lookup.HoodieLookupTableReader;
+import org.apache.hudi.table.lookup.LookupRuntimeProviderFactory;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.ExpressionUtils;
 import org.apache.hudi.util.ChangelogModes;
@@ -125,6 +125,8 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_ASYNC;
+import static 
org.apache.hudi.configuration.FlinkOptions.LOOKUP_ASYNC_THREAD_NUMBER;
 import static org.apache.hudi.configuration.FlinkOptions.LOOKUP_JOIN_CACHE_TTL;
 import static 
org.apache.hudi.configuration.HadoopConfigurations.getParquetConf;
 import static org.apache.hudi.util.ExpressionUtils.filterSimpleCallExpression;
@@ -399,14 +401,16 @@ public class HoodieTableSource extends FileIndexReader 
implements
   @Override
   public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) 
{
     Duration duration = conf.get(LOOKUP_JOIN_CACHE_TTL);
-    return TableFunctionProviderAdapter.of(
+    boolean asyncEnabled = conf.get(LOOKUP_ASYNC);
+    int asyncThreadNumber = conf.get(LOOKUP_ASYNC_THREAD_NUMBER);
+    return LookupRuntimeProviderFactory.create(
         new HoodieLookupFunction(
             new HoodieLookupTableReader(this::getBatchInputFormat, conf),
             (RowType) getProducedDataType().notNull().getLogicalType(),
             getLookupKeys(context.getKeys()),
             duration,
             conf
-        ));
+        ), asyncEnabled, asyncThreadNumber);
   }
 
   private DataType getProducedDataType() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
new file mode 100644
index 000000000000..af33caaaa03d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/AsyncLookupFunctionWrapper.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A {@link AsyncLookupFunction} to wrap sync function. */
+public class AsyncLookupFunctionWrapper extends AsyncLookupFunction {
+
+  private final LookupFunction function;
+  private final int threadNumber;
+
+  private transient ExecutorService lazyExecutor;
+
+  public AsyncLookupFunctionWrapper(LookupFunction function, int threadNumber) 
{
+    this.function = function;
+    this.threadNumber = threadNumber;
+  }
+
+  @Override
+  public void open(FunctionContext context) throws Exception {
+    function.open(context);
+  }
+
+  private Collection<RowData> lookup(RowData keyRow) {
+    ClassLoader cl = Thread.currentThread().getContextClassLoader();
+    
Thread.currentThread().setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
+    try {
+      synchronized (function) {
+        return function.lookup(keyRow);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    } finally {
+      Thread.currentThread().setContextClassLoader(cl);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
+    return CompletableFuture.supplyAsync(() -> lookup(keyRow), executor());
+  }
+
+  @Override
+  public void close() throws Exception {
+    function.close();
+    if (lazyExecutor != null) {
+      lazyExecutor.shutdownNow();
+      lazyExecutor = null;
+    }
+  }
+
+  private ExecutorService executor() {
+    if (lazyExecutor == null) {
+      lazyExecutor = Executors.newFixedThreadPool(threadNumber,
+          new ExecutorThreadFactory(Thread.currentThread().getName() + 
"-async"));
+    }
+    return lazyExecutor;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index d6b2db86b775..6663eefd01a7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -27,20 +27,21 @@ import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import java.io.Closeable;
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +52,9 @@ import java.util.Map;
  * <p>Note: reference Flink FileSystemLookupFunction to avoid additional 
connector jar dependencies.
  */
 @Slf4j
-public class HoodieLookupFunction extends TableFunction<RowData> {
+public class HoodieLookupFunction  extends LookupFunction implements 
Serializable, Closeable {
+
+  private static final long serialVersionUID = 1L;
 
   // the max number of retries before throwing exception, in case of failure 
to load the table
   // into cache
@@ -73,6 +76,7 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
   private transient HoodieTableMetaClient metaClient;
   private transient HoodieInstant currentCommit;
   private final Configuration conf;
+  protected FunctionContext functionContext;
 
   public HoodieLookupFunction(
       HoodieLookupTableReader partitionReader,
@@ -94,7 +98,7 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
 
   @Override
   public void open(FunctionContext context) throws Exception {
-    super.open(context);
+    functionContext = context;
     cache = new HashMap<>();
     nextLoadTime = -1L;
     org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
@@ -102,18 +106,12 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
   }
 
   @Override
-  public TypeInformation<RowData> getResultType() {
-    return InternalTypeInfo.of(rowType);
-  }
-
-  public void eval(Object... values) {
-    checkCacheReload();
-    RowData lookupKey = GenericRowData.of(values);
-    List<RowData> matchedRows = cache.get(lookupKey);
-    if (matchedRows != null) {
-      for (RowData matchedRow : matchedRows) {
-        collect(matchedRow);
-      }
+  public Collection<RowData> lookup(RowData keyRow) {
+    try {
+      checkCacheReload();
+      return cache.get(keyRow);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -189,7 +187,7 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
   }
 
   @Override
-  public void close() throws Exception {
+  public void close() {
     // no operation
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 000000000000..36aaba4e9800
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.lookup;
+
+import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import 
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+  public static LookupRuntimeProvider create(HoodieLookupFunction function, 
boolean enableAsync, int asyncThreadNumber) {
+    return enableAsync
+        ? AsyncLookupFunctionProvider.of(new 
AsyncLookupFunctionWrapper(function, asyncThreadNumber))
+        : LookupFunctionProvider.of(function);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 6c1cc68705cc..239a254a4f5f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -105,6 +105,7 @@ import static 
org.apache.hudi.utils.TestData.assertRowsEquals;
 import static org.apache.hudi.utils.TestData.assertRowsEqualsUnordered;
 import static org.apache.hudi.utils.TestData.map;
 import static org.apache.hudi.utils.TestData.row;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertLinesMatch;
@@ -825,8 +826,8 @@ public class ITTestHoodieDataSource {
   }
 
   @ParameterizedTest
-  @EnumSource(value = HoodieTableType.class)
-  void testLookupJoin(HoodieTableType tableType) {
+  @MethodSource("tableTypeAndAsyncLookupParams")
+  void testLookupJoin(HoodieTableType tableType, boolean async) {
     TableEnvironment tableEnv = streamTableEnv;
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
@@ -849,7 +850,7 @@ public class ITTestHoodieDataSource {
 
     // Join two hudi tables with the same data
     String sql = "insert into t2 select b.* from t1_view o "
-        + "       join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day') */  "
+        + "       join t1/*+ OPTIONS('lookup.join.cache.ttl'= '2 day', 
'lookup.async'='" + async + "') */  "
         + "       FOR SYSTEM_TIME AS OF o.proc_time AS b on o.uuid = b.uuid";
     execInsertSql(tableEnv, sql);
     List<Row> result = CollectionUtil.iterableToList(
@@ -858,6 +859,42 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  private void initTablesForLookupJoin(HoodieTableType tableType) {
+    String tDDL = "create table T(i INT PRIMARY KEY NOT ENFORCED, `proctime` 
AS PROCTIME())"
+        + " with ('connector'='hudi', 'path'='" + tempFile.getAbsolutePath() + 
"/T')";
+    streamTableEnv.executeSql(tDDL);
+    String dimDDL = "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, 
k1 INT, k2 INT) "
+        + "with ('connector'='hudi', 'table.type'='" + tableType + "',"
+        + " 'path'='" + tempFile.getAbsolutePath() + "/DIM', 
'continuous.discovery-interval'='1 ms')";
+    streamTableEnv.executeSql(dimDDL);
+  }
+
+  @ParameterizedTest
+  @MethodSource("tableTypeAndAsyncLookupParams")
+  void testLookup(HoodieTableType tableType, boolean async) {
+    initTablesForLookupJoin(tableType);
+    execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (1, 11, 111, 1111), 
(2, 22, 222, 2222)");
+    execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3)");
+
+    String query = "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ 
OPTIONS('lookup.async'='" + async
+        + "', 'lookup.join.cache.ttl'='1s') */ for system_time as of 
T.proctime AS D ON T.i = D.i";
+    List<Row> result = CollectionUtil.iterableToList(() -> 
streamTableEnv.executeSql(query).collect());
+    assertThat(result).containsExactlyInAnyOrder(
+        Row.of(1, 11, 111, 1111),
+        Row.of(2, 22, 222, 2222),
+        Row.of(3, null, null, null));
+
+    execInsertSql(streamTableEnv, "INSERT INTO DIM VALUES (2, 44, 444, 4444), 
(3, 33, 333, 3333)");
+    execInsertSql(streamTableEnv, "INSERT INTO T VALUES (1), (2), (3), (4)");
+
+    result = CollectionUtil.iterableToList(() -> 
streamTableEnv.executeSql(query).collect());
+    assertThat(result).containsExactlyInAnyOrder(
+        Row.of(1, 11, 111, 1111),
+        Row.of(2, 44, 444, 4444),
+        Row.of(3, 33, 333, 3333),
+        Row.of(4, null, null, null));
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndReadParMiddle(ExecMode execMode) throws Exception {
@@ -3148,6 +3185,19 @@ public class ITTestHoodieDataSource {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (table type, async lookup).
+   */
+  private static Stream<Arguments> tableTypeAndAsyncLookupParams() {
+    Object[][] data = new Object[][] {
+        {HoodieTableType.COPY_ON_WRITE, false},
+        {HoodieTableType.COPY_ON_WRITE, true},
+        {HoodieTableType.MERGE_ON_READ, false},
+        {HoodieTableType.MERGE_ON_READ, true}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private static Stream<Arguments> parametersForMetaColumnsSkip() {
     Object[][] data =
         new Object[][] {

Reply via email to