This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db460814d365 fix(flink): Fix minibatch lookup for global record level 
index (#18759)
db460814d365 is described below

commit db460814d365d0a08bbf6bd0940cd88fc9e8be65
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon May 18 15:51:17 2026 +0800

    fix(flink): Fix minibatch lookup for global record level index (#18759)
---
 .../sink/partitioner/BucketAssignFunction.java     | 126 ++++++++++++++-------
 .../partitioner/MinibatchBucketAssignFunction.java |  67 +++++++++--
 .../index/GlobalRecordLevelIndexBackend.java       |  16 +--
 .../TestMinibatchBucketAssignFunction.java         |   4 +-
 .../index/TestGlobalRecordLevelIndexBackend.java   |  17 +--
 .../sink/utils/StreamWriteFunctionWrapper.java     |  67 +++++++++--
 .../apache/hudi/table/ITTestHoodieDataSource.java  |   7 ++
 7 files changed, 221 insertions(+), 83 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index c74bf0d0dd2b..7faff523e689 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 /**
@@ -101,6 +102,11 @@ public class BucketAssignFunction
   @Setter
   protected transient Correspondent correspondent;
 
+  /**
+   * Processor for data records selected by the write operation type.
+   */
+  private transient Processor recordProcessor;
+
   /**
    * If the index is global, update the index for the old partition path
    * if same key record with different partition path came in.
@@ -132,6 +138,7 @@ public class BucketAssignFunction
         HoodieTableType.valueOf(conf.get(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
+    this.recordProcessor = initRecordProcessor();
   }
 
   @Override
@@ -148,67 +155,102 @@ public class BucketAssignFunction
 
   @Override
   public void processElement(HoodieFlinkInternalRow value, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
-    processRecord(value, ctx.getCurrentKey(), out);
+    if (value.isIndexRecord()) {
+      processIndexRecord(value, ctx.getCurrentKey());
+    } else {
+      recordProcessor.process(value, out);
+    }
   }
 
-  protected void processRecord(HoodieFlinkInternalRow record, String 
recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception {
-    if (record.isIndexRecord()) {
-      indexBackend.update(
-          recordKey, new HoodieRecordGlobalLocation(record.getPartitionPath(), 
record.getInstantTime(), record.getFileId()));
-      return;
-    }
+  protected void processIndexRecord(
+      HoodieFlinkInternalRow record,
+      String recordKey) throws Exception {
+    indexBackend.update(
+        recordKey, new HoodieRecordGlobalLocation(record.getPartitionPath(), 
record.getInstantTime(), record.getFileId()));
+  }
+
+  protected void processChangingRecord(
+      HoodieFlinkInternalRow record,
+      String recordKey,
+      Collector<HoodieFlinkInternalRow> out,
+      HoodieRecordGlobalLocation prefetchedOldLoc,
+      boolean prefetched) throws Exception {
     // 1. put the record into the BucketAssigner;
     // 2. look up the state for location, if the record has a location, just 
send it out;
     // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
     final String partitionPath = record.getPartitionPath();
     final HoodieRecordLocation location;
-    if (isChangingRecords) {
-      // Only changing records need looking up the index for the location,
-      // append only records are always recognized as INSERT.
-      // Structured as Tuple(partition, fileId, instantTime).
-      HoodieRecordGlobalLocation oldLoc = indexBackend.get(recordKey);
-      if (oldLoc != null) {
-        // Set up the instant time as "U" to mark the bucket as an update 
bucket.
-        String partitionFromState = oldLoc.getPartitionPath();
-        String fileIdFromState = oldLoc.getFileId();
-        if (!Objects.equals(partitionFromState, partitionPath)) {
-          if (globalIndex) {
-            // if partition path changes, emit a delete record for old 
partition path,
-            // then update the index state using location with new partition 
path.
-            RowData row = record.getRowData();
-            RowKind orginalRowKind = row.getRowKind();
-            row.setRowKind(RowKind.DELETE);
-            // the operationType field is used as the index operation type, 
and only 'I' and 'D' index operation will be written to the metadata table.
-            // for record key, whose partition path is updated, we simply 
ignore the DELETE index record, and the location for this key will be updated
-            // by the following INSERT index record.
-            HoodieFlinkInternalRow deleteRecord =
-                new HoodieFlinkInternalRow(record.getRecordKey(), 
partitionFromState, fileIdFromState, "U", "-U", false, row);
-            out.collect(deleteRecord);
-            row.setRowKind(orginalRowKind);
-          }
-          location = getNewRecordLocation(partitionPath);
-        } else {
-          location = oldLoc.toLocal("U");
-          this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
+    // Only changing records need looking up the index for the location,
+    // append only records are always recognized as INSERT.
+    // Structured as Tuple(partition, fileId, instantTime).
+    HoodieRecordGlobalLocation oldLoc = prefetched ? prefetchedOldLoc : 
indexBackend.get(recordKey);
+    if (oldLoc != null) {
+      // Set up the instant time as "U" to mark the bucket as an update bucket.
+      String partitionFromState = oldLoc.getPartitionPath();
+      String fileIdFromState = oldLoc.getFileId();
+      if (!Objects.equals(partitionFromState, partitionPath)) {
+        if (globalIndex) {
+          // if partition path changes, emit a delete record for old partition 
path,
+          // then update the index state using location with new partition 
path.
+          RowData row = record.getRowData();
+          RowKind orginalRowKind = row.getRowKind();
+          row.setRowKind(RowKind.DELETE);
+          // the operationType field is used as the index operation type, and 
only 'I' and 'D' index operation will be written to the metadata table.
+          // for record key, whose partition path is updated, we simply ignore 
the DELETE index record, and the location for this key will be updated
+          // by the following INSERT index record.
+          HoodieFlinkInternalRow deleteRecord =
+              new HoodieFlinkInternalRow(record.getRecordKey(), 
partitionFromState, fileIdFromState, "U", "-U", false, row);
+          out.collect(deleteRecord);
+          row.setRowKind(orginalRowKind);
         }
-      } else {
         location = getNewRecordLocation(partitionPath);
-      }
-      // refresh the index only when the location is updated.
-      if (oldLoc == null || !oldLoc.getFileId().equals(location.getFileId())) {
-        record.setOperationType("I");
-        this.indexBackend.update(recordKey, 
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
+      } else {
+        location = oldLoc.toLocal("U");
+        this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
       }
     } else {
-      log.warn("This branch should not be reached.");
       location = getNewRecordLocation(partitionPath);
     }
+    // refresh the index only when the location is updated.
+    if (oldLoc == null || !oldLoc.getFileId().equals(location.getFileId())) {
+      record.setOperationType("I");
+      this.indexBackend.update(recordKey, 
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
+    }
     record.setFileId(location.getFileId());
     record.setInstantTime(location.getInstantTime());
 
     out.collect(record);
   }
 
+  protected void processInsertRecord(
+      HoodieFlinkInternalRow record,
+      Collector<HoodieFlinkInternalRow> out) {
+    // Record is an INSERT, decide the location using the BucketAssigner then 
send it out.
+    final String partitionPath = record.getPartitionPath();
+    final HoodieRecordLocation location = getNewRecordLocation(partitionPath);
+    record.setFileId(location.getFileId());
+    record.setInstantTime(location.getInstantTime());
+    out.collect(record);
+  }
+
+  /**
+   * Initializes the processor for non-index records based on whether the 
write operation needs index lookup.
+   */
+  private Processor initRecordProcessor() {
+    if (isChangingRecords) {
+      return (value, out) -> processChangingRecord(value, 
value.getRecordKey(), out, null, false);
+    } else {
+      return this::processInsertRecord;
+    }
+  }
+
+  /**
+   * Processes regular data records after index records have been handled by 
the caller.
+   */
+  private interface Processor extends Serializable {
+    void process(HoodieFlinkInternalRow value, 
Collector<HoodieFlinkInternalRow> out) throws Exception;
+  }
+
   protected HoodieRecordLocation getNewRecordLocation(String partitionPath) {
     final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
     final HoodieRecordLocation location;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
index 70b09621aa4d..cdf30c932eb7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java
@@ -20,6 +20,7 @@ package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.adapter.ProcessFunctionAdapter;
 import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -36,8 +37,10 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -58,6 +61,8 @@ public class MinibatchBucketAssignFunction
    * extending the BucketAssignFunction here, because BucketAssignFunction is a
    * KeyedProcessFunction while MinibatchBucketAssignFunction is a 
ProcessFunction.
    */
+  @VisibleForTesting
+  @Getter
   private final BucketAssignFunction delegateFunction;
 
   /**
@@ -76,11 +81,21 @@ public class MinibatchBucketAssignFunction
 
   private transient Collector<HoodieFlinkInternalRow> outCollector;
 
+  /**
+   * Processor for buffered data records selected by the write operation type.
+   */
+  private transient Processor minibatchProcessor;
+
   public MinibatchBucketAssignFunction(Configuration conf) {
+    this(conf, 
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE), 
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue()));
+  }
+
+  @VisibleForTesting
+  public MinibatchBucketAssignFunction(Configuration conf, int miniBatchSize) {
     this.delegateFunction = new BucketAssignFunction(conf);
     this.isChangingRecords = WriteOperationType.isChangingRecords(
         WriteOperationType.fromValue(conf.get(FlinkOptions.OPERATION)));
-    this.miniBatchSize = 
Math.max(conf.get(FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE), 
FlinkOptions.INDEX_RLI_LOOKUP_MINIBATCH_SIZE.defaultValue());
+    this.miniBatchSize = miniBatchSize;
   }
 
   @Override
@@ -88,6 +103,7 @@ public class MinibatchBucketAssignFunction
     super.open(parameters);
     delegateFunction.open(parameters);
     this.recordBuffer = new ArrayList<>();
+    this.minibatchProcessor = initRecordProcessor();
   }
 
   @Override
@@ -102,7 +118,7 @@ public class MinibatchBucketAssignFunction
     }
     if (record.isIndexRecord()) {
       // handle index records immediately, do not need buffering
-      delegateFunction.processRecord(record, record.getRecordKey(), 
outCollector);
+      delegateFunction.processIndexRecord(record, record.getRecordKey());
     } else {
       // Add data records to the buffer
       recordBuffer.add(record);
@@ -120,20 +136,47 @@ public class MinibatchBucketAssignFunction
     if (recordBuffer.isEmpty()) {
       return;
     }
-
     // process batch of records.
+    minibatchProcessor.process(recordBuffer, out);
+    // Clear the buffer after processing
+    recordBuffer.clear();
+  }
+
+  /**
+   * Initializes the processor for buffered data records based on whether 
minibatch index lookup is needed.
+   */
+  private Processor initRecordProcessor() {
     if (isChangingRecords) {
-      List<String> recordKeys = 
recordBuffer.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
-      MinibatchIndexBackend minibatchIndexBackend = (MinibatchIndexBackend) 
delegateFunction.getIndexBackend();
-      // load the record location mapping into the record index cache.
-      minibatchIndexBackend.get(recordKeys);
-    }
-    for (HoodieFlinkInternalRow record: recordBuffer) {
-      delegateFunction.processRecord(record, record.getRecordKey(), out);
+      return new Processor() {
+        @Override
+        public void process(List<HoodieFlinkInternalRow> records, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+          List<String> recordKeys = 
records.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
+          MinibatchIndexBackend minibatchIndexBackend = 
(MinibatchIndexBackend) delegateFunction.getIndexBackend();
+          // get record locations by minibatch
+          Map<String, HoodieRecordGlobalLocation> recordLocations = 
minibatchIndexBackend.get(recordKeys);
+          for (HoodieFlinkInternalRow record: records) {
+            String recordKey = record.getRecordKey();
+            delegateFunction.processChangingRecord(record, recordKey, out, 
recordLocations.get(recordKey), true);
+          }
+        }
+      };
+    } else {
+      return new Processor() {
+        @Override
+        public void process(List<HoodieFlinkInternalRow> records, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+          for (HoodieFlinkInternalRow record: recordBuffer) {
+            delegateFunction.processInsertRecord(record, out);
+          }
+        }
+      };
     }
+  }
 
-    // Clear the buffer after processing
-    recordBuffer.clear();
+  /**
+   * Processes buffered data records while keeping index records out of the 
minibatch buffer.
+   */
+  private interface Processor extends Serializable {
+    void process(List<HoodieFlinkInternalRow> records, 
Collector<HoodieFlinkInternalRow> out) throws Exception;
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index cef722c87a4b..a95e18e863a1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -38,8 +37,7 @@ import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -74,7 +72,7 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
 
   @Override
   public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
-    return get(Collections.singletonList(recordKey)).get(recordKey);
+    throw new UnsupportedOperationException(this.getClass().getSimpleName() + 
" doesn't support lookup with a single key.");
   }
 
   @Override
@@ -84,22 +82,20 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
 
   @Override
   public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
-    // use a linked hash map to keep the natural order.
-    Map<String, HoodieRecordGlobalLocation> keysAndLocations = new 
LinkedHashMap<>();
+    Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
     List<String> missedKeys = new ArrayList<>();
     for (String key: recordKeys) {
       HoodieRecordGlobalLocation location = recordIndexCache.get(key);
       if (location == null) {
         missedKeys.add(key);
+      } else {
+        keysAndLocations.put(key, location);
       }
-      // insert anyway even the location is null to keep the natural order.
-      keysAndLocations.put(key, location);
     }
     if (!missedKeys.isEmpty()) {
       HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
           
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
-      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
-      recordIndexLocations.forEach(keyAndLocation -> {
+      recordIndexData.forEach(keyAndLocation -> {
         recordIndexCache.update(keyAndLocation.getKey(), 
keyAndLocation.getValue());
         keysAndLocations.put(keyAndLocation.getKey(), 
keyAndLocation.getValue());
       });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 055a85de0695..a98a95529fc1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -58,8 +58,8 @@ public class TestMinibatchBucketAssignFunction {
     final String basePath = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(basePath);
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
-    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    conf.set(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
   }
 
   @BeforeEach
@@ -244,4 +244,4 @@ public class TestMinibatchBucketAssignFunction {
     // Close should not throw any exceptions
     testHarness.close();
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 396a57185cec..3b3f72a9fcb5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -43,6 +44,7 @@ import java.util.UUID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
 import static org.mockito.Mockito.mock;
@@ -74,14 +76,15 @@ public class TestGlobalRecordLevelIndexBackend {
 
     try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
       // get record location
-      HoodieRecordGlobalLocation location = 
globalRecordLevelIndexBackend.get("id1");
+      HoodieRecordGlobalLocation location = 
globalRecordLevelIndexBackend.get(Collections.singletonList("id1")).get("id1");
       assertNotNull(location);
       assertEquals("par1", location.getPartitionPath());
       assertEquals(firstCommitTime, location.getInstantTime());
 
       // get record location with non existed key
-      location = globalRecordLevelIndexBackend.get("new_key");
+      location = 
globalRecordLevelIndexBackend.get(Collections.singletonList("new_key")).get("new_key");
       assertNull(location);
+      assertThrows(UnsupportedOperationException.class, () -> 
globalRecordLevelIndexBackend.get("id1"));
 
       // get records locations for multiple record keys
       Map<String, HoodieRecordGlobalLocation> locations = 
globalRecordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
@@ -90,7 +93,7 @@ public class TestGlobalRecordLevelIndexBackend {
 
       // get records locations for multiple record keys with unexisted key
       locations = globalRecordLevelIndexBackend.get(Arrays.asList("id1", 
"id2", "new_key"));
-      assertEquals(3, locations.size());
+      assertEquals(2, locations.size());
       assertNull(locations.get("new_key"));
 
       // new checkpoint
@@ -99,7 +102,7 @@ public class TestGlobalRecordLevelIndexBackend {
       // update record location
       HoodieRecordGlobalLocation newLocation = new 
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
       globalRecordLevelIndexBackend.update("new_key", newLocation);
-      location = globalRecordLevelIndexBackend.get("new_key");
+      location = 
globalRecordLevelIndexBackend.get(Collections.singletonList("new_key")).get("new_key");
       assertEquals(newLocation, location);
 
       // previous instant commit success, clean
@@ -182,9 +185,9 @@ public class TestGlobalRecordLevelIndexBackend {
       // cache for the oldest ckp id will be cleaned
       
assertNull(globalRecordLevelIndexBackend.getRecordIndexCache().getCaches().get(-1L));
       // caches for the latest 3 ckp id still in the cache
-      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id2_0").getPartitionPath());
-      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id3_0").getPartitionPath());
-      assertEquals("par1", 
globalRecordLevelIndexBackend.get("id4_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get(Collections.singletonList("id2_0")).get("id2_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get(Collections.singletonList("id3_0")).get("id3_0").getPartitionPath());
+      assertEquals("par1", 
globalRecordLevelIndexBackend.get(Collections.singletonList("id4_0")).get("id4_0").getPartitionPath());
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index dac07150295e..dec0936f05a6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -35,6 +35,7 @@ import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.MinibatchBucketAssignFunction;
 import org.apache.hudi.sink.partitioner.index.IndexWriteFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.util.HoodieSchemaConverter;
@@ -109,6 +110,10 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
    * Function that assigns bucket ID.
    */
   private BucketAssignFunction bucketAssignerFunction;
+  /**
+   * Function that assigns bucket ID with minibatch RLI lookup.
+   */
+  private MinibatchBucketAssignFunction minibatchBucketAssignerFunction;
   /**
    * BucketAssignOperator context.
    **/
@@ -177,11 +182,19 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
 
-    bucketAssignerFunction = new BucketAssignFunction(conf);
-    bucketAssignerFunction.setRuntimeContext(runtimeContext);
-    bucketAssignerFunction.setCorrespondent(correspondent);
-    bucketAssignerFunction.open(conf);
-    bucketAssignerFunction.initializeState(this.stateInitializationContext);
+    if (useMinibatchBucketAssignFunction()) {
+      minibatchBucketAssignerFunction = new 
MinibatchBucketAssignFunction(conf, 1);
+      minibatchBucketAssignerFunction.setRuntimeContext(runtimeContext);
+      minibatchBucketAssignerFunction.setCorrespondent(correspondent);
+      minibatchBucketAssignerFunction.open(conf);
+      
minibatchBucketAssignerFunction.initializeState(this.stateInitializationContext);
+    } else {
+      bucketAssignerFunction = new BucketAssignFunction(conf);
+      bucketAssignerFunction.setRuntimeContext(runtimeContext);
+      bucketAssignerFunction.setCorrespondent(correspondent);
+      bucketAssignerFunction.open(conf);
+      bucketAssignerFunction.initializeState(this.stateInitializationContext);
+    }
 
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       bootstrapOperator = isStreamingWriteIndexEnabled ? Mockito.spy(new 
RLIBootstrapOperator(conf)) : new BootstrapOperator(conf);
@@ -216,7 +229,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     
stateInitializationContext.getKeyedStateStore().setCurrentKey(hoodieRecord.getRecordKey());
     RecordsCollector<HoodieFlinkInternalRow> collector = 
RecordsCollector.getInstance(rowType);
     when(context.getCurrentKey()).thenReturn(hoodieRecord.getRecordKey());
-    bucketAssignerFunction.processElement(hoodieRecord, context, collector);
+    if (useMinibatchBucketAssignFunction()) {
+      minibatchBucketAssignerFunction.processElement(hoodieRecord, null, 
collector);
+    } else {
+      bucketAssignerFunction.processElement(hoodieRecord, context, collector);
+    }
     bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
     RecordsCollector<RowData> indexRecordCollector = 
RecordsCollector.getInstance();
     for (HoodieFlinkInternalRow row: collector.getVal()) {
@@ -268,7 +285,12 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       bootstrapOperator.snapshotState(new 
MockStateSnapshotContext(checkpointId));
     }
-    bucketAssignerFunction.snapshotState(new 
MockFunctionSnapshotContext(checkpointId));
+    if (useMinibatchBucketAssignFunction()) {
+      minibatchBucketAssignerFunction.prepareSnapshotPreBarrier(checkpointId);
+      minibatchBucketAssignerFunction.snapshotState(new 
MockFunctionSnapshotContext(checkpointId));
+    } else {
+      bucketAssignerFunction.snapshotState(new 
MockFunctionSnapshotContext(checkpointId));
+    }
 
     writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
     if (isStreamingWriteIndexEnabled) {
@@ -294,6 +316,13 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   }
 
   public void endInput() {
+    if (useMinibatchBucketAssignFunction()) {
+      try {
+        minibatchBucketAssignerFunction.endInput();
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    }
     writeFunction.endInput();
     if (isStreamingWriteIndexEnabled) {
       indexWriteFunction.endInput();
@@ -304,7 +333,15 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     stateInitializationContext.checkpointSuccess(checkpointId);
     indexStateInitializationContext.checkpointSuccess(checkpointId);
     coordinator.notifyCheckpointComplete(checkpointId);
-    this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+    if (useMinibatchBucketAssignFunction()) {
+      try {
+        
this.minibatchBucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+      } catch (Exception e) {
+        throw new HoodieException(e);
+      }
+    } else {
+      this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
+    }
     if (asyncCompaction) {
       try {
         compactFunctionWrapper.compact(checkpointId);
@@ -367,7 +404,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   public void close() throws Exception {
     coordinator.close();
     ioManager.close();
-    bucketAssignerFunction.close();
+    if (useMinibatchBucketAssignFunction()) {
+      minibatchBucketAssignerFunction.close();
+    } else {
+      bucketAssignerFunction.close();
+    }
     writeFunction.close();
     if (indexWriteFunction != null) {
       indexWriteFunction.close();
@@ -388,7 +429,9 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   @Override
   public BucketAssignFunction getBucketAssignFunction() {
-    return this.bucketAssignerFunction;
+    return useMinibatchBucketAssignFunction()
+        ? this.minibatchBucketAssignerFunction.getDelegateFunction()
+        : this.bucketAssignerFunction;
   }
 
   public boolean isKeyInState(HoodieKey hoodieKey) {
@@ -425,6 +468,10 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     indexWriteFunction.open(conf);
   }
 
+  private boolean useMinibatchBucketAssignFunction() {
+    return OptionsResolver.isGlobalRecordLevelIndex(conf) && 
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+  }
+
   private void setupIndexBootstrapFunction() {
     bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
     CollectOutputAdapter<HoodieFlinkInternalRow> output = new 
CollectOutputAdapter<>();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 2da870f1ef8c..5d6a5daa061e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3167,6 +3167,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
         .options(getDefaultKeys())
         .option(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
+        .option(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, false)
         .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
         .option(FlinkOptions.TABLE_TYPE, tableType.name())
         .end();
@@ -3176,6 +3177,12 @@ public class ITTestHoodieDataSource {
     List<Row> result1 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+    // insert another batch of records, so that minibatch lookup results are 
not empty
+    execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+    result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result1, TestData.DATA_SET_SOURCE_MERGED);
   }
 
   @ParameterizedTest

Reply via email to