This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e9e8262b146 feat: Support bucket assigning based on record level
index (#17803)
1e9e8262b146 is described below
commit 1e9e8262b146f622313230990ceefd93ff773458
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Jan 13 17:51:00 2026 +0800
feat: Support bucket assigning based on record level index (#17803)
---
.../apache/hudi/index/FlinkHoodieIndexFactory.java | 2 +
.../sink/partitioner/BucketAssignFunction.java | 41 +++----
.../partitioner/index/FlinkStateIndexBackend.java | 52 ++++++++
.../hudi/sink/partitioner/index/IndexBackend.java | 65 ++++++++++
.../partitioner/index/IndexBackendFactory.java | 94 +++++++++++++++
.../partitioner/index/MinibatchIndexBackend.java | 36 ++++++
.../partitioner/index/RecordLevelIndexBackend.java | 134 +++++++++++++++++++++
.../org/apache/hudi/table/HoodieTableFactory.java | 19 ++-
.../org/apache/hudi/sink/TestWriteCopyOnWrite.java | 46 +++++++
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 5 +
.../hudi/sink/TestWriteMergeOnReadWithCompact.java | 5 +
.../index/TestRecordLevelIndexBackend.java | 110 +++++++++++++++++
.../sink/utils/StreamWriteFunctionWrapper.java | 14 ++-
.../apache/hudi/table/TestHoodieTableFactory.java | 6 +
14 files changed, 599 insertions(+), 30 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index 2024c6016759..feca357bc493 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -46,6 +46,8 @@ public final class FlinkHoodieIndexFactory {
case FLINK_STATE:
// Flink state index stores the index mappings with a state-backend,
// instantiates an in-memory HoodieIndex component as a placeholder.
+ case GLOBAL_RECORD_LEVEL_INDEX:
+ // todo: aligned with flink state index currently, may need further
improving.
case INMEMORY:
return new FlinkInMemoryStateIndex(context, config);
case BLOOM:
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index e7ddb83e6ddf..201699940158 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -30,16 +30,14 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.partitioner.index.IndexBackend;
+import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.util.FlinkTaskContextSupplier;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.RuntimeContextUtils;
-import org.apache.hudi.utils.StateTtlConfigUtils;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -70,7 +68,7 @@ public class BucketAssignFunction
implements CheckpointedFunction, CheckpointListener {
/**
- * Index cache(speed-up) state for the underneath file based(BloomFilter)
indices.
+ * Index cache(speed-up) for the underneath file based indices.
* When a record came in, we do these check:
*
* <ul>
@@ -80,7 +78,7 @@ public class BucketAssignFunction
* <li>If it does not, use the {@link BucketAssigner} to generate a new
bucket ID</li>
* </ul>
*/
- private ValueState<HoodieRecordGlobalLocation> indexState;
+ private transient IndexBackend indexBackend;
/**
* Bucket assigner to assign new bucket IDs or reuse existing ones.
@@ -125,34 +123,27 @@ public class BucketAssignFunction
}
@Override
- public void snapshotState(FunctionSnapshotContext context) {
- this.bucketAssigner.reset();
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ this.indexBackend = IndexBackendFactory.create(conf, context,
getRuntimeContext());
}
@Override
- public void initializeState(FunctionInitializationContext context) {
- ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
- new ValueStateDescriptor<>(
- "indexState",
- TypeInformation.of(HoodieRecordGlobalLocation.class));
- double ttl = conf.get(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 * 1000;
- if (ttl > 0) {
-
indexStateDesc.enableTimeToLive(StateTtlConfigUtils.createTtlConfig((long)
ttl));
- }
- indexState = context.getKeyedStateStore().getState(indexStateDesc);
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ this.bucketAssigner.reset();
+ this.indexBackend.onCheckpoint(context.getCheckpointId());
}
@Override
public void processElement(HoodieFlinkInternalRow value, Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception {
if (value.isIndexRecord()) {
- this.indexState.update(
- new HoodieRecordGlobalLocation(value.getPartitionPath(),
value.getInstantTime(), value.getFileId()));
+ indexBackend.update(
+ ctx.getCurrentKey(), new
HoodieRecordGlobalLocation(value.getPartitionPath(), value.getInstantTime(),
value.getFileId()));
} else {
- processRecord(value, out);
+ processRecord(value, ctx, out);
}
}
- private void processRecord(HoodieFlinkInternalRow record,
Collector<HoodieFlinkInternalRow> out) throws Exception {
+ private void processRecord(HoodieFlinkInternalRow record, Context ctx,
Collector<HoodieFlinkInternalRow> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just
send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner
then send it out.
@@ -162,7 +153,7 @@ public class BucketAssignFunction
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// Structured as Tuple(partition, fileId, instantTime).
- HoodieRecordGlobalLocation oldLoc = indexState.value();
+ HoodieRecordGlobalLocation oldLoc =
indexBackend.get(ctx.getCurrentKey());
if (oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update
bucket.
String partitionFromState = oldLoc.getPartitionPath();
@@ -186,7 +177,7 @@ public class BucketAssignFunction
location = getNewRecordLocation(partitionPath);
}
// always refresh the index
-
this.indexState.update(HoodieRecordGlobalLocation.fromLocal(partitionPath,
location));
+ this.indexBackend.update(ctx.getCurrentKey(),
HoodieRecordGlobalLocation.fromLocal(partitionPath, location));
} else {
location = getNewRecordLocation(partitionPath);
}
@@ -219,6 +210,8 @@ public class BucketAssignFunction
public void notifyCheckpointComplete(long checkpointId) {
// Refresh the table state when there are new commits.
this.bucketAssigner.reload(checkpointId);
+ // todo #17700: check the file based mapping between checkpoint id and
instant to get the latest successful instant.
+ this.indexBackend.onCommitSuccess(checkpointId - 1);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
new file mode 100644
index 000000000000..c952f8f047d1
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/FlinkStateIndexBackend.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import org.apache.flink.api.common.state.ValueState;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on flink keyed value state.
+ */
+public class FlinkStateIndexBackend implements IndexBackend {
+
+ private final ValueState<HoodieRecordGlobalLocation> indexState;
+
+ public FlinkStateIndexBackend(ValueState<HoodieRecordGlobalLocation>
indexState) {
+ this.indexState = indexState;
+ }
+
+ @Override
+ public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
+ return indexState.value();
+ }
+
+ @Override
+ public void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) throws IOException {
+ this.indexState.update(recordGlobalLocation);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing.
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
new file mode 100644
index 000000000000..e89bb4fe9ebe
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackend.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * An interface that provides an abstraction for managing record location
mappings in the index system.
+ * It serves as the backend for storing and retrieving the location of records
identified by their unique keys.
+ */
+public interface IndexBackend extends Closeable {
+
+ /**
+ * Retrieves the global location of a record based on its key.
+ *
+ * @param recordKey the unique key identifying the record
+ * @return the global location of the record, or null if the record is not
found in the index
+ */
+ HoodieRecordGlobalLocation get(String recordKey) throws IOException;
+
+ /**
+ * Updates the global location of a record in the index.
+ *
+ * @param recordKey the unique key identifying the record
+ * @param recordGlobalLocation the new global location of the record
+ */
+ void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) throws IOException;
+
+ /**
+ * Listener method called when the bucket assign operator finishes the
checkpoint with {@code checkpointId}.
+ *
+ * @param checkpointId checkpoint id.
+ */
+ default void onCheckpoint(long checkpointId) {
+ // do nothing.
+ }
+
+ /**
+ * Listener method called when the instant associated with {@code
checkpointId} is committed successfully.
+ *
+ * @param checkpointId checkpoint id.
+ */
+ default void onCommitSuccess(long checkpointId) {
+ // do nothing.
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
new file mode 100644
index 000000000000..b8410e2a2386
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.utils.RuntimeContextUtils;
+import org.apache.hudi.utils.StateTtlConfigUtils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import java.util.stream.StreamSupport;
+
+/**
+ * Factory to create an {@link IndexBackend} based on the configured index
type.
+ */
+public class IndexBackendFactory {
+ public static IndexBackend create(Configuration conf,
FunctionInitializationContext context, RuntimeContext runtimeContext) throws
Exception {
+ HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+ switch (indexType) {
+ case FLINK_STATE:
+ ValueStateDescriptor<HoodieRecordGlobalLocation> indexStateDesc =
+ new ValueStateDescriptor<>(
+ "indexState",
+ TypeInformation.of(HoodieRecordGlobalLocation.class));
+ double ttl = conf.get(FlinkOptions.INDEX_STATE_TTL) * 24 * 60 * 60 *
1000;
+ if (ttl > 0) {
+
indexStateDesc.enableTimeToLive(StateTtlConfigUtils.createTtlConfig((long)
ttl));
+ }
+ ValueState<HoodieRecordGlobalLocation> indexState =
context.getKeyedStateStore().getState(indexStateDesc);
+ ValidationUtils.checkArgument(indexState != null, "indexState should
not be null when using FLINK_STATE index!");
+ return new FlinkStateIndexBackend(indexState);
+ case GLOBAL_RECORD_LEVEL_INDEX:
+ ListState<JobID> jobIdState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "bucket-assign-job-id-state",
+ TypeInformation.of(JobID.class)
+ ));
+ long initCheckpointId = -1;
+ if (context.isRestored()) {
+ int attemptId = RuntimeContextUtils.getAttemptNumber(runtimeContext);
+ initCheckpointId = initCheckpointId(attemptId, jobIdState,
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+ }
+ // set the jobId state with current job id.
+ jobIdState.clear();
+ jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
+ return new RecordLevelIndexBackend(conf, initCheckpointId);
+ default:
+ throw new UnsupportedOperationException("Index type " + indexType + "
is not supported for bucket assigning yet.");
+ }
+ }
+
+ private static long initCheckpointId(int attemptId, ListState<JobID>
jobIdState, long restoredCheckpointId, RuntimeContext runtimeContext) throws
Exception {
+ if (attemptId <= 0) {
+ // returns early if the job/task is initially started.
+ return -1;
+ }
+ JobID currentJobId = RuntimeContextUtils.getJobId(runtimeContext);
+ if (StreamSupport.stream(jobIdState.get().spliterator(), false)
+ .noneMatch(currentJobId::equals)) {
+ // do not set up the checkpoint id if the state comes from the old job.
+ return -1;
+ }
+ // sets up the known checkpoint id as the last successful checkpoint id
for purposes of cache cleaning.
+ return restoredCheckpointId;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
new file mode 100644
index 000000000000..0b79ee10bcdf
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/MinibatchIndexBackend.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Index delegator which supports mini-batch index operations.
+ */
+public interface MinibatchIndexBackend extends IndexBackend {
+
+ Map<String, HoodieRecordGlobalLocation> get(List<String> recordKey) throws
IOException;
+
+ void update(List<Pair<String, HoodieRecordGlobalLocation>>
recordKeysAndLocations) throws IOException;
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
new file mode 100644
index 000000000000..bcd0dcc4b7b1
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.util.StreamerUtil;
+
+import lombok.Getter;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An implementation of {@link IndexBackend} based on the record level index
in metadata table.
+ */
+public class RecordLevelIndexBackend implements MinibatchIndexBackend {
+ @VisibleForTesting
+ @Getter
+ private final RecordIndexCache recordIndexCache;
+ private final Configuration conf;
+ private final HoodieTableMetaClient metaClient;
+ private HoodieTableMetadata metadataTable;
+
+ public RecordLevelIndexBackend(Configuration conf, long initCheckpointId) {
+ this.metaClient = StreamerUtil.createMetaClient(conf);
+ this.conf = conf;
+ this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
+ reloadMetadataTable();
+ }
+
+ @Override
+ public HoodieRecordGlobalLocation get(String recordKey) throws IOException {
+ return get(Collections.singletonList(recordKey)).get(recordKey);
+ }
+
+ @Override
+ public void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) {
+ recordIndexCache.update(recordKey, recordGlobalLocation);
+ }
+
+ @Override
+ public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys)
throws IOException {
+ // use a linked hash map to keep the natural order.
+ Map<String, HoodieRecordGlobalLocation> keysAndLocations = new
LinkedHashMap<>();
+ List<String> missedKeys = new ArrayList<>();
+ for (String key: recordKeys) {
+ HoodieRecordGlobalLocation location = recordIndexCache.get(key);
+ if (location == null) {
+ missedKeys.add(key);
+ }
+ // insert anyway even the location is null to keep the natural order.
+ keysAndLocations.put(key, location);
+ }
+ if (!missedKeys.isEmpty()) {
+ HoodiePairData<String, HoodieRecordGlobalLocation> recordIndexData =
+
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(missedKeys));
+ List<Pair<String, HoodieRecordGlobalLocation>> recordIndexLocations =
HoodieDataUtils.dedupeAndCollectAsList(recordIndexData);
+ recordIndexLocations.forEach(keyAndLocation -> {
+ recordIndexCache.update(keyAndLocation.getKey(),
keyAndLocation.getValue());
+ keysAndLocations.put(keyAndLocation.getKey(),
keyAndLocation.getValue());
+ });
+ }
+ return keysAndLocations;
+ }
+
+ @Override
+ public void update(List<Pair<String, HoodieRecordGlobalLocation>>
recordKeysAndLocations) throws IOException {
+ recordKeysAndLocations.forEach(keyAndLocation ->
recordIndexCache.update(keyAndLocation.getKey(), keyAndLocation.getValue()));
+ }
+
+ @Override
+ public void onCheckpoint(long checkpointId) {
+ recordIndexCache.addCheckpointCache(checkpointId);
+ }
+
+ @Override
+ public void onCommitSuccess(long checkpointId) {
+ recordIndexCache.clean(checkpointId);
+ this.metaClient.reloadActiveTimeline();
+ reloadMetadataTable();
+ }
+
+ private void reloadMetadataTable() {
+ this.metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ conf.get(FlinkOptions.PATH));
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.recordIndexCache.close();
+ if (this.metadataTable == null) {
+ return;
+ }
+ try {
+ this.metadataTable.close();
+ } catch (Exception e) {
+ throw new HoodieException("Exception happened during close metadata
table.", e);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 87006a03d630..05d04553d0c0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,10 +18,12 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -181,9 +183,14 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
* Validate the index type.
*/
private void checkIndexType(Configuration conf) {
- String indexType = conf.get(FlinkOptions.INDEX_TYPE);
- if (!StringUtils.isNullOrEmpty(indexType)) {
- HoodieIndexConfig.INDEX_TYPE.checkValues(indexType);
+ String indexTypeStr = conf.get(FlinkOptions.INDEX_TYPE);
+ if (!StringUtils.isNullOrEmpty(indexTypeStr)) {
+ HoodieIndexConfig.INDEX_TYPE.checkValues(indexTypeStr);
+ }
+ HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+ if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
+ ValidationUtils.checkArgument(conf.get(FlinkOptions.METADATA_ENABLED),
+ "Metadata table should be enabled when index.type is
GLOBAL_RECORD_LEVEL_INDEX.");
}
}
@@ -401,6 +408,12 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
&& OptionsResolver.isCowTable(conf)) {
conf.set(FlinkOptions.PRE_COMBINE, true);
}
+ HoodieIndex.IndexType indexType = OptionsResolver.getIndexType(conf);
+ // enable hoodie record index if the index type is configured as
GLOBAL_RECORD_LEVEL_INDEX.
+ if (indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX) {
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
+ }
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index eebcdd8c5c97..6d89f521e995 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -19,13 +19,18 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -35,12 +40,14 @@ import
org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
import org.apache.hudi.io.HoodieWriteMergeHandle;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
import org.junit.jupiter.api.Test;
@@ -49,10 +56,12 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.index.HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -818,4 +827,41 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
}
testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
}
+
+ @Test
+ public void testBucketAssignWithRLI() throws Exception {
+ // use record level index
+ conf.set(FlinkOptions.INDEX_TYPE, GLOBAL_RECORD_LEVEL_INDEX.name());
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ TestHarness testHarness =
+ preparePipeline(conf)
+ .consume(TestData.DATA_SET_INSERT)
+ // no checkpoint, so the coordinator does not accept any events
+ .checkpoint(1)
+ .assertNextEvent(4, "par1,par2,par3,par4")
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+ HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ conf.get(FlinkOptions.PATH));
+
+ // validate the record level index data
+ String firstCommitTime =
TestUtils.getLastCompleteInstant(conf.get(FlinkOptions.PATH));
+ Map<String, HoodieRecordGlobalLocation> result =
HoodieDataUtils.dedupeAndCollectAsMap(
+ metadataTable.readRecordIndexLocationsWithKeys(
+ HoodieListData.eager(Arrays.asList("id1", "id2", "id3", "id4"))));
+ assertEquals(4, result.size());
+ result.values().forEach(location -> assertEquals(firstCommitTime,
location.getInstantTime()));
+
+ testHarness.consume(TestData.DATA_SET_INSERT)
+ .checkpoint(2)
+ .assertNextEvent(4, "par1,par2,par3,par4")
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED1)
+ .end();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 6deef9590170..ff00ae50220d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -253,6 +253,11 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @Override
+ public void testBucketAssignWithRLI() throws Exception {
+ // can be re-enabled after #17701
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
index 7515f8904327..63d087c5b28c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java
@@ -354,6 +354,11 @@ public class TestWriteMergeOnReadWithCompact extends
TestWriteCopyOnWrite {
pipeline2.end();
}
+ @Override
+ public void testBucketAssignWithRLI() throws Exception {
+ // can be re-enabled after #17701
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
new file mode 100644
index 000000000000..0b10bc4f4dc9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+
+/**
+ * Test cases for {@link RecordLevelIndexBackend}.
+ */
+public class TestRecordLevelIndexBackend {
+
+ private Configuration conf;
+
+ @TempDir
+ File tempFile;
+
+ @BeforeEach
+ void beforeEach() throws IOException {
+ conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE.name());
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ StreamerUtil.initTableIfNotExists(conf);
+ }
+
+ @Test
+ void testRecordLevelIndexBackend() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ String firstCommitTime =
TestUtils.getLastCompleteInstant(tempFile.toURI().toString());
+
+ try (RecordLevelIndexBackend recordLevelIndexBackend = new
RecordLevelIndexBackend(conf, -1)) {
+ // get record location
+ HoodieRecordGlobalLocation location = recordLevelIndexBackend.get("id1");
+ assertNotNull(location);
+ assertEquals("par1", location.getPartitionPath());
+ assertEquals(firstCommitTime, location.getInstantTime());
+
+ // get record location with non existed key
+ location = recordLevelIndexBackend.get("new_key");
+ assertNull(location);
+
+ // get records locations for multiple record keys
+ Map<String, HoodieRecordGlobalLocation> locations =
recordLevelIndexBackend.get(Arrays.asList("id1", "id2", "id3"));
+ assertEquals(3, locations.size());
+ locations.values().forEach(Assertions::assertNotNull);
+
+ // get records locations for multiple record keys with unexisted key
+ locations = recordLevelIndexBackend.get(Arrays.asList("id1", "id2",
"new_key"));
+ assertEquals(3, locations.size());
+ assertNull(locations.get("new_key"));
+
+ // new checkpoint
+ recordLevelIndexBackend.onCheckpoint(1);
+
+ // update record location
+ HoodieRecordGlobalLocation newLocation = new
HoodieRecordGlobalLocation("par5", "1003", "file_id_4");
+ recordLevelIndexBackend.update("new_key", newLocation);
+ location = recordLevelIndexBackend.get("new_key");
+ assertEquals(newLocation, location);
+
+ // previous instant commit success, clean
+ recordLevelIndexBackend.onCommitSuccess(1);
+ assertEquals(1,
recordLevelIndexBackend.getRecordIndexCache().getCaches().size());
+ // the cache will only contain 'new_key', others are cleaned.
+ location = recordLevelIndexBackend.getRecordIndexCache().get("new_key");
+ assertEquals(newLocation, location);
+ location = recordLevelIndexBackend.getRecordIndexCache().get("id1");
+ assertNull(location);
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5359baaf8e87..bd4c5f7153b0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorCo
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
@@ -63,6 +64,9 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* A wrapper class to manipulate the instance {@link StreamWriteFunction} for
testing.
*
@@ -81,6 +85,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
private StreamWriteOperatorCoordinator coordinator;
private final MockStateInitializationContext stateInitializationContext;
private final TreeMap<Long, byte[]> coordinatorStateStore;
+ private final KeyedProcessFunction.Context context;
/**
* Function that converts row data to HoodieRecord.
@@ -140,6 +145,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
.setExecutionConfig(new ExecutionConfig().enableObjectReuse())
.build();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf,
this.streamTask, this.streamConfig);
+ this.context = mock(KeyedProcessFunction.Context.class);
}
public void openFunction() throws Exception {
@@ -164,7 +170,8 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
Collector<HoodieFlinkInternalRow> collector =
RecordsCollector.getInstance();
for (HoodieFlinkInternalRow bootstrapRecord : output.getRecords()) {
stateInitializationContext.getKeyedStateStore().setCurrentKey(bootstrapRecord.getRecordKey());
- bucketAssignerFunction.processElement(bootstrapRecord, null,
collector);
+
when(context.getCurrentKey()).thenReturn(bootstrapRecord.getRecordKey());
+ bucketAssignerFunction.processElement(bootstrapRecord, context,
collector);
bucketAssignFunctionContext.setCurrentKey(bootstrapRecord.getRecordKey());
}
}
@@ -180,7 +187,8 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
HoodieFlinkInternalRow hoodieRecord = toHoodieFunction.map((RowData)
record);
stateInitializationContext.getKeyedStateStore().setCurrentKey(hoodieRecord.getRecordKey());
RecordsCollector<HoodieFlinkInternalRow> collector =
RecordsCollector.getInstance();
- bucketAssignerFunction.processElement(hoodieRecord, null, collector);
+ when(context.getCurrentKey()).thenReturn(hoodieRecord.getRecordKey());
+ bucketAssignerFunction.processElement(hoodieRecord, context, collector);
bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
for (HoodieFlinkInternalRow row: collector.getVal()) {
writeFunction.processElement(row, null, null);
@@ -210,7 +218,7 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator.snapshotState(null);
}
- bucketAssignerFunction.snapshotState(null);
+ bucketAssignerFunction.snapshotState(new
MockFunctionSnapshotContext(checkpointId));
writeFunction.snapshotState(new MockFunctionSnapshotContext(checkpointId));
stateInitializationContext.checkpointBegin(checkpointId);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index adce67c00f30..bc6be839cc03 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -215,6 +215,12 @@ public class TestHoodieTableFactory {
this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET");
final MockContext sourceContext3 = MockContext.getInstance(this.conf,
schema, "f2");
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext3));
+
+ // Valid index type will be ok
+ this.conf.set(FlinkOptions.INDEX_TYPE, "GLOBAL_RECORD_LEVEL_INDEX");
+ this.conf.set(FlinkOptions.METADATA_ENABLED, false);
+ final MockContext sourceContext4 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertThrows(IllegalArgumentException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext4));
}
@Test