This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e9e8262b146 feat: Support bucket assigning based on record level 
index (#17803)
1e9e8262b146 is described below

commit 1e9e8262b146f622313230990ceefd93ff773458
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jan 13 17:51:00 2026 +0800

    feat: Support bucket assigning based on record level index (#17803)
---
 .../apache/hudi/index/FlinkHoodieIndexFactory.java |   2 +
 .../sink/partitioner/BucketAssignFunction.java     |  41 +++----
 .../partitioner/index/FlinkStateIndexBackend.java  |  52 ++++++++
 .../hudi/sink/partitioner/index/IndexBackend.java  |  65 ++++++++++
 .../partitioner/index/IndexBackendFactory.java     |  94 +++++++++++++++
 .../partitioner/index/MinibatchIndexBackend.java   |  36 ++++++
 .../partitioner/index/RecordLevelIndexBackend.java | 134 +++++++++++++++++++++
 .../org/apache/hudi/table/HoodieTableFactory.java  |  19 ++-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |  46 +++++++
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |   5 +
 .../hudi/sink/TestWriteMergeOnReadWithCompact.java |   5 +
 .../index/TestRecordLevelIndexBackend.java         | 110 +++++++++++++++++
 .../sink/utils/StreamWriteFunctionWrapper.java     |  14 ++-
 .../apache/hudi/table/TestHoodieTableFactory.java  |   6 +
 14 files changed, 599 insertions(+), 30 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index 2024c6016759..feca357bc493 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -46,6 +46,8 @@ public final class FlinkHoodieIndexFactory {
       case FLINK_STATE:
         // Flink state index stores the index mappings with a state-backend,
         // instantiates an in-memory HoodieIndex component as a placeholder.
+      case GLOBAL_RECORD_LEVEL_INDEX:
+        // todo: aligned with flink state index currently, may need further 
improving.
       case INMEMORY:
         return new FlinkInMemoryStateIndex(context, config);
       case BLOOM:
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index e7ddb83e6ddf..201699940158 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,16 +30,14 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.util.FlinkTaskContextSupplier;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.RuntimeContextUtils;
-import org.apache.hudi.utils.StateTtlConfigUtils;
 
 import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -70,7 +68,7 @@ public class BucketAssignFunction
     implements CheckpointedFunction, CheckpointListener {
 
   /**
-   * Index cache(speed-up) state for the underneath file based(BloomFilter) 
indices.
+   * Index cache(speed-up) for the underneath file based indices.
    * When a record came in, we do these check:
    *
    * <ul>
@@ -80,7 +78,7 @@ public class BucketAssignFunction
    *   <li>If it does not, use the {@link BucketAssigner} to generate a new 
bucket ID</li>
    * </ul>
    */
-  private ValueState<HoodieRecordGlobalLocation> indexState;
+  private transient IndexBackend indexBackend;
 
   /**
    * Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -125,34 +123,27 @@ public class BucketAssignFunction
   }
 
   @Override
-  public void snapshotState(FunctionSnapshotContext context) {
-    this.bucketAssigner.reset();
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    this.indexBackend = IndexBackendFactory.create(conf, context, 
getRuntimeContext());
   }
 
   @Override
-  public void initializeState(FunctionInitializationContext context) {
-    ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
-        new ValueStateDescriptor<>(
-            "indexState",
-            TypeInformation.of(HoodieRecordGlobalLocation.class));
-    double ttl = conf.get(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
-    if (ttl > 0) {
-      
indexStateDesc.enableTimeToLive(StateTtlConfigUtils.createTtlConfig((long) 
ttl));
-    }
-    indexState = context.getKeyedStateStore().getState(indexStateDesc);
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    this.bucketAssigner.reset();
+    this.indexBackend.onCheckpoint(context.getCheckpointId());
   }
 
   @Override
   public void processElement(HoodieFlinkInternalRow value, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
     if (value.isIndexRecord()) {
-      this.indexState.update(
-          new HoodieRecordGlobalLocation(value.getPartitionPath(), 
value.getInstantTime(), value.getFileId()));
+      indexBackend.update(
+          ctx.getCurrentKey(), new 
HoodieRecordGlobalLocation(value.getPartitionPath(), value.getInstantTime(), 
value.getFileId()));
     } else {
-      processRecord(value, out);
+      processRecord(value, ctx, out);
     }
   }
 
-  private void processRecord(HoodieFlinkInternalRow record, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
+  private void processRecord(HoodieFlinkInternalRow record, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
     // 1. put the record into the BucketAssigner;
     // 2. look up the state for location, if the record has a location, just 
send it out;
     // 3. if it is an INSERT, decide the location using the BucketAssigner 
then send it out.
@@ -162,7 +153,7 @@ public class BucketAssignFunction
       // Only changing records need looking up the index for the location,
       // append only records are always recognized as INSERT.
       // Structured as Tuple(partition, fileId, instantTime).
-      HoodieRecordGlobalLocation oldLoc = indexState.value();
+      HoodieRecordGlobalLocation oldLoc = 
indexBackend.get(ctx.getCurrentKey());
       if (oldLoc != null) {
         // Set up the instant time as "U" to mark the bucket as an update 
bucket.
         String partitionFromState = oldLoc.getPartitionPath();
@@ -186,7 +177,7 @@ public class BucketAssignFunction
         location = getNewRecordLocation(partitionPath);
       }
       // always refresh the index
-      
this.indexState.update(HoodieRecordGlobalLocation.fromLocal(partitionPath, 
location));
+      this.indexBackend.update(ctx.getCurrentKey(), 
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
     } else {
       location = getNewRecordLocation(partitionPath);
     }
@@ -219,6 +210,8 @@ public class BucketAssignFunction
   public void notifyCheckpointComplete(long checkpointId) {
     // Refresh the table state when there are new commits.
     this.bucketAssigner.reload(checkpointId);
+    // todo #17700: check the file based mapping between checkpoint id and 
instant to get the latest successful instant.
+    this.indexBackend.onCommitSuccess(checkpointId - 1);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
new file mode 100644
index 000000000000..c952f8f047d1
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import org.apache.flink.api.common.state.ValueState;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on flink keyed value state.
+ */
+public class FlinkStateIndexBackend implements IndexBackend {
+
+  private final ValueState<HoodieRecordGlobalLocation> indexState;
+
+  public FlinkStateIndexBackend(ValueState<HoodieRecordGlobalLocation> 
indexState) {
+    this.indexState = indexState;
+  }
+
+  @Override
+  public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
+    return indexState.value();
+  }
+
+  @Override
+  public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) throws IOException {
+    this.indexState.update(recordGlobalLocation);
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing.
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
new file mode 100644
index 000000000000..e89bb4fe9ebe
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * An interface that provides an abstraction for managing record location 
mappings in the index system.
+ * It serves as the backend for storing and retrieving the location of records 
identified by their unique keys.
+ */
+public interface IndexBackend extends Closeable {
+
+  /**
+   * Retrieves the global location of a record based on its key.
+   *
+   * @param recordKey the unique key identifying the record
+   * @return the global location of the record, or null if the record is not 
found in the index
+   */
+  HoodieRecordGlobalLocation get(String recordKey) throws IOException;
+
+  /**
+   * Updates the global location of a record in the index.
+   *
+   * @param recordKey the unique key identifying the record
+   * @param recordGlobalLocation the new global location of the record
+   */
+  void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) throws IOException;
+
+  /**
+   * Listener method called when the bucket assign operator finishes the 
checkpoint with {@code checkpointId}.
+   *
+   * @param checkpointId checkpoint id.
+   */
+  default void onCheckpoint(long checkpointId) {
+    // do nothing.
+  }
+
+  /**
+   * Listener method called when the instant associated with {@code 
checkpointId} is committed successfully.
+   *
+   * @param checkpointId checkpoint id.
+   */
+  default void onCommitSuccess(long checkpointId) {
+    // do nothing.
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
new file mode 100644
index 000000000000..b8410e2a2386
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.utils.RuntimeContextUtils;
+import org.apache.hudi.utils.StateTtlConfigUtils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.stream.StreamSupport;
+
+/**
+ * Factory to create an {@link IndexBackend} based on the configured index 
type.
+ */
+public class IndexBackendFactory {
+  public static IndexBackend create(Configuration conf, 
FunctionInitializationContext context, RuntimeContext runtimeContext) throws 
Exception {
+    HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+    switch (indexType) {
+      case FLINK_STATE:
+        ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
+            new ValueStateDescriptor<>(
+                "indexState",
+                TypeInformation.of(HoodieRecordGlobalLocation.class));
+        double ttl = conf.get(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 
1000;
+        if (ttl > 0) {
+          
indexStateDesc.enableTimeToLive(StateTtlConfigUtils.createTtlConfig((long) 
ttl));
+        }
+        ValueState<HoodieRecordGlobalLocation> indexState = 
context.getKeyedStateStore().getState(indexStateDesc);
+        ValidationUtils.checkArgument(indexState != null, "indexState should 
not be null when using FLINK_STATE index!");
+        return new FlinkStateIndexBackend(indexState);
+      case GLOBAL_RECORD_LEVEL_INDEX:
+        ListState<JobID> jobIdState = 
context.getOperatorStateStore().getListState(
+            new ListStateDescriptor<>(
+                "bucket-assign-job-id-state",
+                TypeInformation.of(JobID.class)
+            ));
+        long initCheckpointId = -1;
+        if (context.isRestored()) {
+          int attemptId = RuntimeContextUtils.getAttemptNumber(runtimeContext);
+          initCheckpointId = initCheckpointId(attemptId, jobIdState, 
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+        }
+        // set the jobId state with current job id.
+        jobIdState.clear();
+        jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
+        return new RecordLevelIndexBackend(conf, initCheckpointId);
+      default:
+        throw new UnsupportedOperationException("Index type " + indexType + " 
is not supported for bucket assigning yet.");
+    }
+  }
+
+  private static long initCheckpointId(int attemptId, ListState<JobID> 
jobIdState, long restoredCheckpointId, RuntimeContext runtimeContext) throws 
Exception {
+    if (attemptId <= 0) {
+      // returns early if the job/task is initially started.
+      return -1;
+    }
+    JobID currentJobId = RuntimeContextUtils.getJobId(runtimeContext);
+    if (StreamSupport.stream(jobIdState.get().spliterator(), false)
+        .noneMatch(currentJobId::equals)) {
+      // do not set up the checkpoint id if the state comes from the old job.
+      return -1;
+    }
+    // sets up the known checkpoint id as the last successful checkpoint id 
for purposes of cache cleaning.
+    return restoredCheckpointId;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
new file mode 100644
index 000000000000..0b79ee10bcdf
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Index delegator which supports mini-batch index operations.
+ */
+public interface MinibatchIndexBackend extends IndexBackend {
+
+  Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws 
IOException;
+
+  void update(List<Pair<String, HoodieRecordGlobalLocation>> 
recordKeysAndLocations) throws IOException;
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
new file mode 100644
index 000000000000..bcd0dcc4b7b1
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.Getter;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An implementation of {@link IndexBackend} based on the record level index 
in metadata table.
+ */
+public class RecordLevelIndexBackend implements MinibatchIndexBackend {
+  @VisibleForTesting
+  @Getter
+  private final RecordIndexCache recordIndexCache;
+  private final Configuration conf;
+  private final HoodieTableMetaClient metaClient;
+  private HoodieTableMetadata metadataTable;
+
+  public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
+    this.metaClient = StreamerUtil.createMetaClient(conf);
+    this.conf = conf;
+    this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+    reloadMetadataTable();
+  }
+
+  @Override
+  public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
+    return get(Collections.singletonList(recordKey)).get(recordKey);
+  }
+
+  @Override
+  public void update(String recordKey, HoodieRecordGlobalLocation 
recordGlobalLocation) {
+    recordIndexCache.update(recordKey, recordGlobalLocation);
+  }
+
+  @Override
+  public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
+    // use a linked hash map to keep the natural order.
+    Map<String, HoodieRecordGlobalLocation> keysAndLocations = new 
LinkedHashMap<>();
+    List<String> missedKeys = new ArrayList<>();
+    for (String key: recordKeys) {
+      HoodieRecordGlobalLocation location = recordIndexCache.get(key);
+      if (location == null) {
+        missedKeys.add(key);
+      }
+      // insert anyway even the location is null to keep the natural order.
+      keysAndLocations.put(key, location);
+    }
+    if (!missedKeys.isEmpty()) {
+      HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+          
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
+      List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations = 
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+      recordIndexLocations.forEach(keyAndLocation -> {
+        recordIndexCache.update(keyAndLocation.getKey(), 
keyAndLocation.getValue());
+        keysAndLocations.put(keyAndLocation.getKey(), 
keyAndLocation.getValue());
+      });
+    }
+    return keysAndLocations;
+  }
+
+  @Override
+  public void update(List<Pair<String, HoodieRecordGlobalLocation>> 
recordKeysAndLocations) throws IOException {
+    recordKeysAndLocations.forEach(keyAndLocation -> 
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
+  }
+
+  @Override
+  public void onCheckpoint(long checkpointId) {
+    recordIndexCache.addCheckpointCache(checkpointId);
+  }
+
+  @Override
+  public void onCommitSuccess(long checkpointId) {
+    recordIndexCache.clean(checkpointId);
+    this.metaClient.reloadActiveTimeline();
+    reloadMetadataTable();
+  }
+
+  private void reloadMetadataTable() {
+    this.metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+        HoodieFlinkEngineContext.DEFAULT,
+        metaClient.getStorage(),
+        StreamerUtil.metadataConfig(conf),
+        conf.get(FlinkOptions.PATH));
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.recordIndexCache.close();
+    if (this.metadataTable == null) {
+      return;
+    }
+    try {
+      this.metadataTable.close();
+    } catch (Exception e) {
+      throw new HoodieException("Exception happened during close metadata 
table.", e);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 87006a03d630..05d04553d0c0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -181,9 +183,14 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    * Validate the index type.
    */
   private void checkIndexType(Configuration conf) {
-    String indexType = conf.get(FlinkOptions.INDEX_TYPE);
-    if (!StringUtils.isNullOrEmpty(indexType)) {
-      HoodieIndexConfig.INDEX_TYPE.checkValues(indexType);
+    String indexTypeStr = conf.get(FlinkOptions.INDEX_TYPE);
+    if (!StringUtils.isNullOrEmpty(indexTypeStr)) {
+      HoodieIndexConfig.INDEX_TYPE.checkValues(indexTypeStr);
+    }
+    HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+    if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
+      ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
+          "Metadata table should be enabled when index.type is 
GLOBAL_RECORD_LEVEL_INDEX.");
     }
   }
 
@@ -401,6 +408,12 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         && OptionsResolver.isCowTable(conf)) {
       conf.set(FlinkOptions.PRE_COMBINE, true);
     }
+    HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+    // enable hoodie record index if the index type is configured as 
GLOBAL_RECORD_LEVEL_INDEX.
+    if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
+      
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+      conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
+    }
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index eebcdd8c5c97..6d89f521e995 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,13 +19,18 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -35,12 +40,14 @@ import 
org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
 import org.apache.hudi.io.HoodieWriteMergeHandle;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.sink.utils.TestWriteBase;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
@@ -49,10 +56,12 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -818,4 +827,41 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
     }
     testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
   }
+
+  @Test
+  public void testBucketAssignWithRLI() throws Exception {
+    // use record level index
+    conf.set(FlinkOptions.INDEX_TYPE, GLOBAL_RECORD_LEVEL_INDEX.name());
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+    TestHarness testHarness =
+        preparePipeline(conf)
+            .consume(TestData.DATA_SET_INSERT)
+            // no checkpoint, so the coordinator does not accept any events
+            .checkpoint(1)
+            .assertNextEvent(4, "par1,par2,par3,par4")
+            .checkpointComplete(1)
+            .checkWrittenData(EXPECTED1);
+
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+    HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
+        HoodieFlinkEngineContext.DEFAULT,
+        metaClient.getStorage(),
+        StreamerUtil.metadataConfig(conf),
+        conf.get(FlinkOptions.PATH));
+
+    // validate the record level index data
+    String firstCommitTime = 
TestUtils.getLastCompleteInstant(conf.get(FlinkOptions.PATH));
+    Map<String, HoodieRecordGlobalLocation> result = 
HoodieDataUtils.dedupeAndCollectAsMap(
+        metadataTable.readRecordIndexLocationsWithKeys(
+            HoodieListData.eager(Arrays.asList("id1", "id2", "id3", "id4"))));
+    assertEquals(4, result.size());
+    result.values().forEach(location -> assertEquals(firstCommitTime, 
location.getInstantTime()));
+
+    testHarness.consume(TestData.DATA_SET_INSERT)
+        .checkpoint(2)
+        .assertNextEvent(4, "par1,par2,par3,par4")
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED1)
+        .end();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 6deef9590170..ff00ae50220d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -253,6 +253,11 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Override
+  public void testBucketAssignWithRLI() throws Exception {
+    // can be re-enabled after #17701
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7515f8904327..63d087c5b28c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -354,6 +354,11 @@ public class TestWriteMergeOnReadWithCompact extends 
TestWriteCopyOnWrite {
     pipeline2.end();
   }
 
+  @Override
+  public void testBucketAssignWithRLI() throws Exception {
+    // can be re-enabled after #17701
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
new file mode 100644
index 000000000000..0b10bc4f4dc9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+
+/**
+ * Test cases for {@link RecordLevelIndexBackend}.
+ */
+public class TestRecordLevelIndexBackend {
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  void beforeEach() throws IOException {
+    conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE.name());
+    
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  void testRecordLevelIndexBackend() throws Exception {
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String firstCommitTime = 
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+
+    try (RecordLevelIndexBackend recordLevelIndexBackend = new 
RecordLevelIndexBackend(conf, -1)) {
+      // get record location
+      HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
+      assertNotNull(location);
+      assertEquals("par1", location.getPartitionPath());
+      assertEquals(firstCommitTime, location.getInstantTime());
+
+      // get record location with non existed key
+      location = recordLevelIndexBackend.get("new_key");
+      assertNull(location);
+
+      // get records locations for multiple record keys
+      Map<String, HoodieRecordGlobalLocation> locations = 
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
+      assertEquals(3, locations.size());
+      locations.values().forEach(Assertions::assertNotNull);
+
+      // get records locations for multiple record keys with unexisted key
+      locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2", 
"new_key"));
+      assertEquals(3, locations.size());
+      assertNull(locations.get("new_key"));
+
+      // new checkpoint
+      recordLevelIndexBackend.onCheckpoint(1);
+
+      // update record location
+      HoodieRecordGlobalLocation newLocation = new 
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
+      recordLevelIndexBackend.update("new_key", newLocation);
+      location = recordLevelIndexBackend.get("new_key");
+      assertEquals(newLocation, location);
+
+      // previous instant commit success, clean
+      recordLevelIndexBackend.onCommitSuccess(1);
+      assertEquals(1, 
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+      // the cache will only contain 'new_key', others are cleaned.
+      location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
+      assertEquals(newLocation, location);
+      location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
+      assertNull(location);
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5359baaf8e87..bd4c5f7153b0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
@@ -63,6 +64,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * A wrapper class to manipulate the instance {@link StreamWriteFunction} for 
testing.
  *
@@ -81,6 +85,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   private StreamWriteOperatorCoordinator coordinator;
   private final MockStateInitializationContext stateInitializationContext;
   private final TreeMap<Long, byte[]> coordinatorStateStore;
+  private final KeyedProcessFunction.Context context;
 
   /**
    * Function that converts row data to HoodieRecord.
@@ -140,6 +145,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
         .setExecutionConfig(new ExecutionConfig().enableObjectReuse())
         .build();
     this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf, 
this.streamTask, this.streamConfig);
+    this.context = mock(KeyedProcessFunction.Context.class);
   }
 
   public void openFunction() throws Exception {
@@ -164,7 +170,8 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
       Collector<HoodieFlinkInternalRow> collector = 
RecordsCollector.getInstance();
       for (HoodieFlinkInternalRow bootstrapRecord : output.getRecords()) {
         
stateInitializationContext.getKeyedStateStore().setCurrentKey(bootstrapRecord.getRecordKey());
-        bucketAssignerFunction.processElement(bootstrapRecord, null, 
collector);
+        
when(context.getCurrentKey()).thenReturn(bootstrapRecord.getRecordKey());
+        bucketAssignerFunction.processElement(bootstrapRecord, context, 
collector);
         
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
       }
     }
@@ -180,7 +187,8 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     HoodieFlinkInternalRow hoodieRecord = toHoodieFunction.map((RowData) 
record);
     
stateInitializationContext.getKeyedStateStore().setCurrentKey(hoodieRecord.getRecordKey());
     RecordsCollector<HoodieFlinkInternalRow> collector = 
RecordsCollector.getInstance();
-    bucketAssignerFunction.processElement(hoodieRecord, null, collector);
+    when(context.getCurrentKey()).thenReturn(hoodieRecord.getRecordKey());
+    bucketAssignerFunction.processElement(hoodieRecord, context, collector);
     bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
     for (HoodieFlinkInternalRow row: collector.getVal()) {
       writeFunction.processElement(row, null, null);
@@ -210,7 +218,7 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
       bootstrapOperator.snapshotState(null);
     }
-    bucketAssignerFunction.snapshotState(null);
+    bucketAssignerFunction.snapshotState(new 
MockFunctionSnapshotContext(checkpointId));
 
     writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
     stateInitializationContext.checkpointBegin(checkpointId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index adce67c00f30..bc6be839cc03 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -215,6 +215,12 @@ public class TestHoodieTableFactory {
     this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET");
     final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema, "f2");
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext3));
+
+    // Valid index type will be ok
+    this.conf.set(FlinkOptions.INDEX_TYPE, "GLOBAL_RECORD_LEVEL_INDEX");
+    this.conf.set(FlinkOptions.METADATA_ENABLED, false);
+    final MockContext sourceContext4 = MockContext.getInstance(this.conf, 
schema, "f2");
+    assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext4));
   }
 
   @Test


Reply via email to