This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c2b401ed70ff feat(flink): Support bootstrap from RLI to local RocksDB
for flink bucket assigner (#18254)
c2b401ed70ff is described below
commit c2b401ed70ff63ab26ceb0d776f29884ece781db
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Mar 24 16:18:37 2026 +0800
feat(flink): Support bootstrap from RLI to local RocksDB for flink bucket
assigner (#18254)
* Support scan all record index locations
* Support RocksDb index backend.
* Add a bootstrap operator for bootstraping RLI into RocksDB index backend.
* Blocking rli bootstrap on job restart/failover until all pending instants
being committed successfully.
---
.../run/strategy/JavaExecutionStrategy.java | 2 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 56 +++++
.../MultipleSparkJobExecutionStrategy.java | 2 +-
...SparkJobConsistentHashingExecutionStrategy.java | 2 +-
.../hudi/common/data/HoodieListPairData.java | 8 +
.../apache/hudi/common/data/HoodiePairData.java | 11 +
.../hudi/metadata/HoodieBackedTableMetadata.java | 67 ++++++
.../apache/hudi/metadata/HoodieTableMetadata.java | 13 ++
.../hudi/util}/LazyConcatenatingIterator.java | 4 +-
.../common/data/TestHoodieListDataPairData.java | 18 ++
.../TestHoodieBackedTableMetadataDataCleanup.java | 2 +-
.../hudi/util}/TestLazyConcatenatingIterator.java | 3 +-
.../apache/hudi/configuration/FlinkOptions.java | 18 ++
.../apache/hudi/configuration/OptionsResolver.java | 4 +
.../hudi/sink/StreamWriteOperatorCoordinator.java | 13 ++
.../sink/bootstrap/AbstractBootstrapOperator.java | 83 ++++++++
.../hudi/sink/bootstrap/BootstrapOperator.java | 50 +----
.../hudi/sink/bootstrap/RLIBootstrapOperator.java | 232 +++++++++++++++++++++
.../org/apache/hudi/sink/event/Correspondent.java | 38 ++++
.../partitioner/index/IndexBackendFactory.java | 30 +--
.../sink/partitioner/index/IndexWriteFunction.java | 1 +
.../partitioner/index/RocksDBIndexBackend.java | 53 +++++
.../org/apache/hudi/sink/utils/CommitGuard.java | 25 +++
.../org/apache/hudi/sink/utils/EventBuffers.java | 22 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 20 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 4 +
.../org/apache/hudi/util/FlinkWriteClients.java | 4 +
.../sink/TestStreamWriteOperatorCoordinator.java | 28 +++
.../partitioner/index/TestRocksDBIndexBackend.java | 53 +++++
.../apache/hudi/sink/utils/TestCommitGuard.java | 78 +++++++
.../apache/hudi/sink/utils/TestEventBuffers.java | 121 +++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 26 ++-
.../apache/hudi/utils/TestFlinkWriteClients.java | 17 ++
33 files changed, 1032 insertions(+), 76 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index f0a0cf1c47df..279bda6358fc 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -23,7 +23,7 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index e1917cc7cc35..9563439c39eb 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -170,6 +170,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -1292,6 +1293,61 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
}
+ @Test
+ public void testReadRecordIndexLocationsByBucketId() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+ HoodieEngineContext engineContext = new
HoodieJavaEngineContext(storageConf);
+
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableGlobalRecordLevelIndex(true)
+ .withRecordIndexFileGroupCount(3, 3)
+ .build())
+ .build();
+
+ try (HoodieJavaWriteClient client = new
HoodieJavaWriteClient(engineContext, writeConfig)) {
+ String instantTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 120);
+ List<WriteStatus> writeStatuses = client.insert(records, instantTime);
+ client.commit(instantTime, writeStatuses);
+ assertNoWriteErrors(writeStatuses);
+
+ HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata)
metadata(client);
+ int bucketCount =
metadataReader.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
+ assertEquals(3, bucketCount);
+
+ long totalRecords = metadataReader.readRecordIndexLocations(fileSlices
-> fileSlices).count();
+ assertEquals(records.size(), totalRecords);
+ }
+ }
+
+ @Test
+ public void
testReadRecordIndexLocationsByBucketIdFailsWhenRecordIndexDisabled() throws
Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+ HoodieEngineContext engineContext = new
HoodieJavaEngineContext(storageConf);
+
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableGlobalRecordLevelIndex(false)
+ .build())
+ .build();
+
+ try (HoodieJavaWriteClient client = new
HoodieJavaWriteClient(engineContext, writeConfig)) {
+ String instantTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 20);
+ List<WriteStatus> writeStatuses = client.insert(records, instantTime);
+ client.commit(instantTime, writeStatuses);
+ assertNoWriteErrors(writeStatuses);
+
+ HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata)
metadata(client);
+ IllegalStateException exception =
+ assertThrows(IllegalStateException.class, () ->
metadataReader.readRecordIndexLocations(fileSlices -> fileSlices));
+ assertTrue(exception.getMessage().contains("Record index is not
initialized in MDT"));
+ }
+ }
+
// Some operations are not feasible with test table infra. hence using write
client to test those cases.
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 9976793199d8..c29fc9b926d9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index 960107d06547..7b49e7972f47 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -20,7 +20,7 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
+import org.apache.hudi.util.LazyConcatenatingIterator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index 66e39d28b38a..1ade14e3e107 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@@ -85,6 +86,13 @@ public class HoodieListPairData<K, V> extends
HoodieBaseListData<Pair<K, V>> imp
return collectAsList();
}
+ @Override
+ public void forEach(Consumer<Pair<K, V>> consumer) {
+ try (Stream<Pair<K, V>> stream = asStream()) {
+ stream.sequential().forEach(consumer);
+ }
+ }
+
@Override
public void persist(String cacheConfig) {
// no-op
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index bc8e309dd728..a6e79f0b2030 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* An abstraction for pairs of key in type K and value in type V to store the
reference
@@ -169,6 +170,16 @@ public interface HoodiePairData<K, V> extends Serializable
{
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * Applies the given action to each pair in this dataset.
+ *
+ * <p>Implementations may execute this action in a streaming manner and
avoid materializing
+ * the full dataset into memory.
+ */
+ default void forEach(Consumer<Pair<K, V>> consumer) {
+ collectAsList().forEach(consumer);
+ }
+
/**
* @return the deduce number of shuffle partitions
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index d4d50c52fafb..304ea39ffe23 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -72,6 +72,7 @@ import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.util.LazyConcatenatingIterator;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -91,6 +92,7 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -355,6 +357,71 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
});
}
+ @Override
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocations(
+ SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>>
fileSlicesFilter) {
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Record index is not initialized in MDT");
+
+ return dataCleanupManager.ensureDataCleanupOnException(v -> {
+ // Get all file slices for the record index partition
+ List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+ RECORD_INDEX.getPartitionPath(),
+ k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+ metadataMetaClient, getMetadataFileSystemView(),
RECORD_INDEX.getPartitionPath()));
+
+ List<FileSlice> targetFileSlices = fileSlicesFilter.apply(fileSlices);
+ if (targetFileSlices.isEmpty()) {
+ return HoodieListPairData.eager(Collections.emptyList());
+ }
+
+ List<Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>>
iteratorSuppliers =
+ targetFileSlices.stream().map(targetFileSlice -> {
+ return new
Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>() {
+ @Override
+ public ClosableIterator<HoodieRecord<HoodieMetadataPayload>>
get() {
+ return scanRecordsItr(
+ targetFileSlice,
+ metadataRecord -> {
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
+ String rowKey = payload.key != null ? payload.key :
metadataRecord.get(KEY_FIELD_NAME).toString();
+ HoodieKey hoodieKey = new HoodieKey(rowKey,
RECORD_INDEX.getPartitionPath());
+ return new HoodieAvroRecord<>(hoodieKey, payload);
+ });
+ }
+ };
+ }).collect(Collectors.toList());
+
+ HoodieData<HoodieRecord<HoodieMetadataPayload>> allRecords =
+ HoodieListData.lazy(new
LazyConcatenatingIterator<>(iteratorSuppliers)).filter(r ->
!r.getData().isDeleted());
+ // Map records to key-location pairs
+ return allRecords.mapToPair(record -> {
+ String recordKey = record.getRecordKey();
+ HoodieRecordGlobalLocation location =
record.getData().getRecordGlobalLocation();
+ return Pair.of(recordKey, location);
+ });
+ });
+ }
+
+ /**
+ * Helper method to read all records from a file slice without key filtering
+ */
+ private ClosableIterator<HoodieRecord<HoodieMetadataPayload>> scanRecordsItr(
+ FileSlice fileSlice,
+ SerializableFunctionUnchecked<GenericRecord,
HoodieRecord<HoodieMetadataPayload>> transformer) {
+ // Read all records from the file slice without any key filtering
+ // This bypasses the normal predicate building mechanism
+ try {
+ ClosableIterator<IndexedRecord> rawIterator =
readSliceWithFilter(Predicates.alwaysTrue(), fileSlice);
+ return new CloseableMappingIterator<>(rawIterator, record -> {
+ GenericRecord metadataRecord = (GenericRecord) record;
+ return transformer.apply(metadataRecord);
+ });
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading all records from metadata
table file slice", e);
+ }
+ }
+
/**
* Reads record keys from record-level index. Deleted records are filtered
out.
* <p>
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 1da1f77400e6..98bd44914c72 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
@@ -258,6 +259,18 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
return readRecordIndexLocationsWithKeys(recordKeys).values();
}
+ /**
+ * Returns the location of record keys for file slices filtered by the given
file slice filter.
+ * <p>
+ * If the Metadata Table is not enabled, an exception is thrown to
distinguish this from the absence of the key.
+ *
+ * @param fileSlicesFilter The file slices filter function
+ * @return Pairs of (record key, location of record) from the filtered
record index file slices.
+ */
+ default HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocations(SerializableFunctionUnchecked<List<FileSlice>,
List<FileSlice>> fileSlicesFilter) {
+ throw new
UnsupportedOperationException("readRecordIndexLocations(fileSlicesFilter) is
not supported by this implementation");
+ }
+
/**
* Returns pairs of (secondary key, location of secondary key) which the
provided secondary keys maps to.
* Records that are not found are ignored and won't be part of map object
that is returned.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
b/hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
similarity index 96%
rename from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
rename to
hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
index 048c315f276b..0d3358b9f02e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyConcatenatingIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/util/LazyConcatenatingIterator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hudi.client.utils;
+package org.apache.hudi.util;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
* Provides iterator interface over List of iterators. Consumes all records
from first iterator element
* before moving to next iterator in the list. That is concatenating elements
across multiple iterators.
*
- * <p>Different with {@link ConcatenatingIterator}, the internal iterators are
instantiated lazily.
+ * <p>Different with {@code ConcatenatingIterator}, the internal iterators are
instantiated lazily.
*/
public class LazyConcatenatingIterator<T> implements ClosableIterator<T> {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 90e7873a2fef..d14465c4bc65 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -308,6 +309,23 @@ public class TestHoodieListDataPairData {
assertEquals(expected, result, "Join result does not match expected
output");
}
+ @Test
+ public void testForEachOnUnion() {
+ HoodiePairData<Integer, Integer> left = HoodieListPairData.lazy(
+ IntStream.range(0, 1000).mapToObj(i -> Pair.of(i,
i)).collect(Collectors.toList()));
+ HoodiePairData<Integer, Integer> right = HoodieListPairData.lazy(
+ IntStream.range(1000, 2000).mapToObj(i -> Pair.of(i,
i)).collect(Collectors.toList()));
+
+ HoodiePairData<Integer, Integer> unionData = left.union(right);
+ List<Pair<Integer, Integer>> actual = new ArrayList<>();
+ unionData.forEach(actual::add);
+
+ List<Pair<Integer, Integer>> expected = IntStream.range(0, 2000)
+ .mapToObj(i -> Pair.of(i, i))
+ .collect(Collectors.toList());
+ assertEquals(expected, actual);
+ }
+
@Test
public void testFilter() {
// Test filtering by key
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
index d899f68685de..03d013f4c586 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataDataCleanup.java
@@ -193,7 +193,7 @@ public class TestHoodieBackedTableMetadataDataCleanup {
// Setup mock behavior for V2 path
when(mockPairData.values()).thenReturn(mockHoodieData);
when(mockMetadata.readSecondaryIndexDataTableRecordKeysV2(any(),
anyString())).thenReturn(mockHoodieData);
-
when(mockMetadata.readRecordIndexLocations(any())).thenReturn(mockHoodieData);
+
when(mockMetadata.readRecordIndexLocations(any(HoodieData.class))).thenReturn(mockHoodieData);
// Call real method on the mock
when(mockMetadata.readSecondaryIndexLocations(secondaryKeys,
partitionName)).thenCallRealMethod();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
b/hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
similarity index 97%
rename from
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
rename to
hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
index fa1a37d02776..dac89c15e229 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestLazyConcatenatingIterator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/util/TestLazyConcatenatingIterator.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.hudi.utils;
+package org.apache.hudi.util;
-import org.apache.hudi.client.utils.LazyConcatenatingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.junit.jupiter.api.Test;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 93f11df956f9..f2907b06d66b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -38,6 +38,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.sink.buffer.BufferMemoryType;
@@ -274,6 +275,15 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(0D)
.withDescription("Index state ttl in days, default stores the index
permanently");
+ @AdvancedConfig
+ public static final ConfigOption<String> INDEX_BOOTSTRAP_ROCKSDB_PATH =
ConfigOptions
+ .key("index.bootstrap.rocksdb.path")
+ .stringType()
+ .defaultValue(FileIOUtils.getDefaultSpillableMapBasePath())
+ .withDescription("Local directory path for RocksDB when "
+ + "bootstrap is enabled for record level index type."
+ + "Each task manager creates a unique subdirectory under this
path.");
+
@AdvancedConfig
public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED =
ConfigOptions
.key("index.global.enabled")
@@ -898,6 +908,14 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(ClientIds.INIT_CLIENT_ID)
.withDescription("Unique identifier used to distinguish different writer
pipelines for concurrent mode");
+ // this is only for internal use
+ @AdvancedConfig
+ public static final ConfigOption<String> WRITE_OPERATOR_UID = ConfigOptions
+ .key("write.operator.uid")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The write operator uid used as the uid for hudi sink
transformation.");
+
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 57f031bbd85e..398ca329dbb4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -205,6 +205,10 @@ public class OptionsResolver {
return indexType == HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX;
}
+ public static boolean isRLIWithBootstrap(Configuration conf) {
+ return isRecordLevelIndex(conf) &&
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED);
+ }
+
/**
* Returns whether it is a MERGE_ON_READ table, and updates by bucket index.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 5e0677c586d8..88f48c1863ec 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -390,6 +390,9 @@ public class StreamWriteOperatorCoordinator
if (request instanceof Correspondent.InflightInstantsRequest) {
return
handleInFlightInstantsRequest((Correspondent.InflightInstantsRequest) request);
}
+ if (request instanceof Correspondent.AwaitPendingInstantsRequest) {
+ return
handleAwaitPendingInstantsRequest((Correspondent.AwaitPendingInstantsRequest)
request);
+ }
throw new HoodieException("Unexpected coordination request type: " +
request.getClass().getSimpleName());
}
@@ -417,6 +420,16 @@ public class StreamWriteOperatorCoordinator
return
CompletableFuture.completedFuture(CoordinationResponseSerDe.wrap(coordinationResponse));
}
+ private CompletableFuture<CoordinationResponse>
handleAwaitPendingInstantsRequest(Correspondent.AwaitPendingInstantsRequest
request) {
+ CompletableFuture<CoordinationResponse> response = new
CompletableFuture<>();
+ instantRequestExecutor.execute(() -> {
+ // wait until receiving any bootstrap event.
+ eventBuffers.awaitPrevInstantsToComplete(request.getCheckpointId());
+
response.complete(CoordinationResponseSerDe.wrap(Correspondent.AwaitPendingInstantsResponse.getInstance()));
+ }, "await pending instants to complete");
+ return response;
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
new file mode 100644
index 000000000000..6e6e8436d748
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base operator for bootstrap stages that emit preloaded index records.
+ */
+@Slf4j
+public abstract class AbstractBootstrapOperator
+ extends AbstractStreamOperator<HoodieFlinkInternalRow>
+ implements OneInputStreamOperator<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> {
+
+ protected final Configuration conf;
+
+ protected AbstractBootstrapOperator(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * The modifier of this method is updated to `protected` sink Flink 2.0,
here we overwrite the method
+ * with `public` modifier to make it compatible considering usage in
hudi-flink module.
+ */
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+ }
+
+ @Override
+ public void processElement(StreamRecord<HoodieFlinkInternalRow> element)
throws Exception {
+ output.collect(element);
+ }
+
+ protected void waitForBootstrapReady(int taskID) {
+ GlobalAggregateManager aggregateManager =
getRuntimeContext().getGlobalAggregateManager();
+ int taskNum =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ int readyTaskNum = 1;
+ while (taskNum != readyTaskNum) {
+ try {
+ readyTaskNum = aggregateManager.updateGlobalAggregate(
+ BootstrapAggFunction.NAME + conf.get(FlinkOptions.TABLE_NAME),
+ taskID,
+ new BootstrapAggFunction());
+ log.info("Waiting for other bootstrap tasks to complete, taskId = {},
ready = {}/{}", taskID, readyTaskNum, taskNum);
+ TimeUnit.SECONDS.sleep(5);
+ } catch (Exception e) {
+ log.error("Updating global task bootstrap summary failed", e);
+ }
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 5cbd581819d0..a3213ea59421 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -35,7 +35,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.format.FormatUtils;
@@ -54,20 +53,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
@@ -84,18 +76,13 @@ import static
org.apache.hudi.util.StreamerUtil.metadataConfig;
*/
@Slf4j
public class BootstrapOperator
- extends AbstractStreamOperator<HoodieFlinkInternalRow>
- implements OneInputStreamOperator<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> {
+ extends AbstractBootstrapOperator {
protected HoodieTable<?, ?, ?, ?> hoodieTable;
- protected final Configuration conf;
-
protected transient org.apache.hadoop.conf.Configuration hadoopConf;
protected transient HoodieWriteConfig writeConfig;
- private transient GlobalAggregateManager aggregateManager;
-
private transient ListState<String> instantState;
private transient HoodieTableMetaClient metaClient;
private transient InternalSchemaManager internalSchemaManager;
@@ -104,19 +91,10 @@ public class BootstrapOperator
private String lastInstantTime;
public BootstrapOperator(Configuration conf) {
- this.conf = conf;
+ super(conf);
this.pattern =
Pattern.compile(conf.get(FlinkOptions.INDEX_PARTITION_REGEX));
}
- /**
- * The modifier of this method is updated to `protected` sink Flink 2.0,
here we overwrite the method
- * with `public` modifier to make it compatible considering usage in
hudi-flink module.
- */
- @Override
- public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
- super.setup(containingTask, config, output);
- }
-
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
lastInstantTime =
StreamerUtil.getLastCompletedInstant(StreamerUtil.createMetaClient(this.conf));
@@ -146,7 +124,6 @@ public class BootstrapOperator
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(
this.conf, false, !OptionsResolver.isIncrementalJobGraph(conf));
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf,
getRuntimeContext());
- this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
this.metaClient = StreamerUtil.createMetaClient(conf, hadoopConf);
this.internalSchemaManager =
InternalSchemaManager.get(hoodieTable.getStorageConf(), metaClient);
@@ -173,29 +150,6 @@ public class BootstrapOperator
hoodieTable = null;
}
- /**
- * Wait for other bootstrap tasks to finish the index bootstrap.
- */
- private void waitForBootstrapReady(int taskID) {
- int taskNum =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
- int readyTaskNum = 1;
- while (taskNum != readyTaskNum) {
- try {
- readyTaskNum =
aggregateManager.updateGlobalAggregate(BootstrapAggFunction.NAME +
conf.get(FlinkOptions.TABLE_NAME), taskID, new BootstrapAggFunction());
- log.info("Waiting for other bootstrap tasks to complete, taskId =
{}.", taskID);
-
- TimeUnit.SECONDS.sleep(5);
- } catch (Exception e) {
- log.error("Updating global task bootstrap summary failed", e);
- }
- }
- }
-
- @Override
- public void processElement(StreamRecord<HoodieFlinkInternalRow> element)
throws Exception {
- output.collect(element);
- }
-
/**
* Loads all the indices of give partition path into the backup state.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
new file mode 100644
index 000000000000..55b5d65cf232
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.OperatorIDGenerator;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Bootstrap operator that loads record level index (RLI) data from metadata
table.
+ *
+ * <p>This operator reads index data from the record_index partition of the
metadata table.
+ * Each subtask reads one RLI partition (bucket) based on its task index,
enabling parallel loading.
+ *
+ * <p>The loaded index records are emitted downstream to initialize the index
state in
+ * {@link org.apache.hudi.sink.partitioner.BucketAssignFunction}.
+ */
+@Slf4j
+public class RLIBootstrapOperator
+ extends AbstractBootstrapOperator {
+
+ private final OperatorID dataWriteOperatorId;
+
+ private transient HoodieTableMetaClient metaClient;
+ private transient HoodieBackedTableMetadata metadataTable;
+ private transient Correspondent correspondent;
+ private transient long loadedCnt;
+
+ /**
+ * The last checkpoint id, starts from -1.
+ */
+ private long checkpointId = -1;
+
+ /**
+ * List state of the JobID.
+ */
+ private transient ListState<JobID> jobIdState;
+
+ public RLIBootstrapOperator(Configuration conf) {
+ super(conf);
+ String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+ ValidationUtils.checkArgument(writeOperatorUid != null,
+ "Write operator UID should not be null when index is Record Level
Index.");
+ this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+ this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
+
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ this.jobIdState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "job-id-state",
+ TypeInformation.of(JobID.class)
+ ));
+ loadedCnt = 0;
+
+ int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
+ if (context.isRestored()) {
+ initCheckpointId(attemptId,
context.getRestoredCheckpointId().orElse(-1L));
+ }
+
+ if (context.isRestored()) {
+ // Wait for pending instants being committed successfully before loading
the record index
+ log.info("Waiting for pending instants committed before RLI bootstrap.");
+ correspondent.awaitPendingInstantsCommitted(checkpointId);
+ log.info("All pending instants are completed, continue RLI bootstrap.");
+ }
+
+ this.metaClient = StreamerUtil.createMetaClient(conf);
+ this.metadataTable = (HoodieBackedTableMetadata)
metaClient.getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ metaClient.getStorage(),
+ StreamerUtil.metadataConfig(conf),
+ conf.get(FlinkOptions.PATH));
+ // Load RLI records
+ preLoadRLIRecords();
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ // Reload the job ID state
+ reloadJobIdState();
+ // Update checkpoint id
+ this.checkpointId = context.getCheckpointId();
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeMetadataTable();
+ super.close();
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ /**
+ * Reload the job id state as the current job id.
+ */
+ private void reloadJobIdState() throws Exception {
+ this.jobIdState.clear();
+ this.jobIdState.add(RuntimeContextUtils.getJobId(getRuntimeContext()));
+ }
+
+ private void initCheckpointId(int attemptId, long restoredCheckpointId)
throws Exception {
+ if (attemptId <= 0) {
+ // returns early if the job/task is initially started.
+ return;
+ }
+ JobID currentJobId = RuntimeContextUtils.getJobId(getRuntimeContext());
+ if (StreamSupport.stream(this.jobIdState.get().spliterator(), false)
+ .noneMatch(currentJobId::equals)) {
+ // do not set up the checkpoint id if the state comes from the old job.
+ return;
+ }
+ this.checkpointId = restoredCheckpointId;
+ }
+
+ private void preLoadRLIRecords() {
+ int taskID =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+ int parallelism =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+
+ log.info("Start loading RLI records from metadata table, taskId = {},
parallelism = {}", taskID, parallelism);
+
+ SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>>
fileSlicesFilter = fileSlices -> {
+ List<FileSlice> filteredFileSlices = new ArrayList<>();
+ for (int i = 0; i < fileSlices.size(); i++) {
+ if (shouldLoadBucket(i, parallelism, taskID)) {
+ filteredFileSlices.add(fileSlices.get(i));
+ }
+ }
+ log.info("Subtask: {} will load record index records from file groups:
{}, total file groups: {}.",
+ taskID,
filteredFileSlices.stream().map(FileSlice::getFileId).collect(Collectors.joining(",")),
fileSlices.size());
+ return filteredFileSlices;
+ };
+
+ // Each subtask loads buckets assigned to it
+ long startTime = System.currentTimeMillis();
+ HoodiePairData<String, HoodieRecordGlobalLocation> rliData =
metadataTable.readRecordIndexLocations(fileSlicesFilter);
+ rliData.forEach(locationPair -> emitIndexRecord(locationPair.getLeft(),
locationPair.getRight()));
+ long costMs = System.currentTimeMillis() - startTime;
+ log.info("Finish loading RLI records, total records: {}, cost: {} ms,
taskId = {}", loadedCnt, costMs, taskID);
+
+ // Wait for other tasks to complete
+ waitForBootstrapReady(taskID);
+
+ // Cleanup resources
+ closeMetadataTable();
+ }
+
+ /**
+ * Determines if the given file group should be loaded by this task.
+ * Uses round-robin assignment: file group i is assigned to task (i %
parallelism).
+ */
+ private boolean shouldLoadBucket(int fileGroupIdx, int parallelism, int
taskID) {
+ return fileGroupIdx % parallelism == taskID;
+ }
+
+ private void emitIndexRecord(String recordKey, HoodieRecordGlobalLocation
location) {
+ output.collect(new StreamRecord<>(
+ new HoodieFlinkInternalRow(
+ recordKey,
+ location.getPartitionPath(),
+ location.getFileId(),
+ String.valueOf(location.getInstantTime()))));
+ loadedCnt += 1;
+ }
+
+ private void closeMetadataTable() {
+ if (metadataTable != null) {
+ try {
+ metadataTable.close();
+ } catch (Exception e) {
+ log.warn("Failed to close metadata table", e);
+ }
+ metadataTable = null;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
index 9bc8f8242ec5..5d599514bb0b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java
@@ -100,6 +100,18 @@ public class Correspondent {
}
}
+ /**
+ * Requests coordinator to wait until all pending instants are committed if
necessary.
+ */
+ public void awaitPendingInstantsCommitted(long checkpointId) {
+ try {
+ this.gateway.sendRequestToCoordinator(this.operatorID,
+ new
SerializedValue<>(AwaitPendingInstantsRequest.getInstance(checkpointId))).get();
+ } catch (Exception e) {
+ throw new HoodieException("Error awaiting pending instants completion
from coordinator", e);
+ }
+ }
+
/**
* A request for instant time with a given checkpoint id.
*/
@@ -153,4 +165,30 @@ public class Correspondent {
return new InflightInstantsResponse(inflightInstants);
}
}
+
+ /**
+ * A request to wait until all pending instants are committed in the
coordinator.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @Getter
+ public static class AwaitPendingInstantsRequest implements
CoordinationRequest {
+
+ private final long checkpointId;
+
+ public static AwaitPendingInstantsRequest getInstance(long checkpointId) {
+ return new AwaitPendingInstantsRequest(checkpointId);
+ }
+ }
+
+ /**
+ * A response to indicate pending instants are all completed.
+ */
+ @AllArgsConstructor(access = AccessLevel.PRIVATE)
+ @Getter
+ public static class AwaitPendingInstantsResponse implements
CoordinationResponse {
+
+ public static AwaitPendingInstantsResponse getInstance() {
+ return new AwaitPendingInstantsResponse();
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
index b8410e2a2386..f3bdaad5c145 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexBackendFactory.java
@@ -58,20 +58,24 @@ public class IndexBackendFactory {
ValidationUtils.checkArgument(indexState != null, "indexState should
not be null when using FLINK_STATE index!");
return new FlinkStateIndexBackend(indexState);
case GLOBAL_RECORD_LEVEL_INDEX:
- ListState<JobID> jobIdState =
context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "bucket-assign-job-id-state",
- TypeInformation.of(JobID.class)
- ));
- long initCheckpointId = -1;
- if (context.isRestored()) {
- int attemptId = RuntimeContextUtils.getAttemptNumber(runtimeContext);
- initCheckpointId = initCheckpointId(attemptId, jobIdState,
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+ if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ return new
RocksDBIndexBackend(conf.get(FlinkOptions.INDEX_BOOTSTRAP_ROCKSDB_PATH));
+ } else {
+ ListState<JobID> jobIdState =
context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "bucket-assign-job-id-state",
+ TypeInformation.of(JobID.class)
+ ));
+ long initCheckpointId = -1;
+ if (context.isRestored()) {
+ int attemptId =
RuntimeContextUtils.getAttemptNumber(runtimeContext);
+ initCheckpointId = initCheckpointId(attemptId, jobIdState,
context.getRestoredCheckpointId().orElse(-1L), runtimeContext);
+ }
+ // set the jobId state with current job id.
+ jobIdState.clear();
+ jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
+ return new RecordLevelIndexBackend(conf, initCheckpointId);
}
- // set the jobId state with current job id.
- jobIdState.clear();
- jobIdState.add(RuntimeContextUtils.getJobId(runtimeContext));
- return new RecordLevelIndexBackend(conf, initCheckpointId);
default:
throw new UnsupportedOperationException("Index type " + indexType + "
is not supported for bucket assigning yet.");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index ee06ac12ed5a..ad13e54bd86e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -180,6 +180,7 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
@Override
public void endInput() {
+ super.endInput();
// flush index data buffer
flushBuffer(true, true);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
new file mode 100644
index 000000000000..88ae067adc4d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on RocksDB.
+ */
+public class RocksDBIndexBackend implements IndexBackend {
+ private static final String COLUMN_FAMILY = "index_cache";
+
+ private final RocksDBDAO rocksDBDAO;
+
+ public RocksDBIndexBackend(String rocksDbBasePath) {
+ this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath);
+ this.rocksDBDAO.addColumnFamily(COLUMN_FAMILY);
+ }
+
+ @Override
+ public HoodieRecordGlobalLocation get(String recordKey) {
+ return this.rocksDBDAO.get(COLUMN_FAMILY, recordKey);
+ }
+
+ @Override
+ public void update(String recordKey, HoodieRecordGlobalLocation
recordGlobalLocation) {
+ this.rocksDBDAO.put(COLUMN_FAMILY, recordKey, recordGlobalLocation);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.rocksDBDAO.close();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
index 0be33a05dffc..76836b6648b9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java
@@ -20,10 +20,12 @@ package org.apache.hudi.sink.utils;
import org.apache.hudi.exception.HoodieException;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
/**
* The commit guard used for blocking instant time generation.
@@ -66,6 +68,29 @@ public class CommitGuard {
}
}
+ /**
+ * Wait until the pending instants are committed.
+ *
+ * @param pendingInstants Supplier to get the pending instants
+ */
+ public void blockFor(Supplier<List<String>> pendingInstants) {
+ lock.lock();
+ long nanos = TimeUnit.MILLISECONDS.toNanos(commitAckTimeout);
+ try {
+ while (!pendingInstants.get().isEmpty()) {
+ if (nanos <= 0L) {
+ throw new HoodieException("Timeout(" + commitAckTimeout + "ms) while
waiting for instants ["
+ + pendingInstants.get() + "] to commit");
+ }
+ nanos = condition.awaitNanos(nanos);
+ }
+ } catch (InterruptedException e) {
+ throw new HoodieException("Blocking for instants completion is
interrupted.", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Signals all waited threads which are waiting on the condition.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
index 701e0f23bb3d..060a74198f5f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java
@@ -49,25 +49,30 @@ public class EventBuffers implements Serializable {
// {checkpointId -> (instant, data write events, index write events)}
private final Map<Long, Pair<String, EventBuffer>> eventBuffers;
private final Option<CommitGuard> commitGuardOption;
+ private final Option<CommitGuard> indexBootstrapGuardOption;
private final int dataWriteParallelism;
private final int indexWriteParallelism;
private EventBuffers(
Map<Long, Pair<String, EventBuffer>> eventBuffers,
Option<CommitGuard> commitGuardOption,
+ Option<CommitGuard> indexBootstrapGuardOption,
int dataWriteParallelism,
int indexWriteParallelism) {
this.eventBuffers = eventBuffers;
this.commitGuardOption = commitGuardOption;
this.dataWriteParallelism = dataWriteParallelism;
this.indexWriteParallelism = indexWriteParallelism;
+ this.indexBootstrapGuardOption = indexBootstrapGuardOption;
}
public static EventBuffers getInstance(Configuration conf, int
dataWriteParallelism) {
final Option<CommitGuard> commitGuardOpt =
OptionsResolver.isBlockingInstantGeneration(conf)
?
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)))
: Option.empty();
+ final Option<CommitGuard> indexBootstrapGuardOption =
OptionsResolver.isRLIWithBootstrap(conf)
+ ?
Option.of(CommitGuard.create(conf.get(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)))
: Option.empty();
final int indexWriteParallelism =
OptionsResolver.indexWriteParallelism(conf);
- return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt,
dataWriteParallelism, indexWriteParallelism);
+ return new EventBuffers(new ConcurrentSkipListMap<>(), commitGuardOpt,
indexBootstrapGuardOption, dataWriteParallelism, indexWriteParallelism);
}
public EventBuffer addEventToBuffer(WriteMetadataEvent event) {
@@ -143,9 +148,17 @@ public class EventBuffers implements Serializable {
}
}
+ public void awaitPrevInstantsToComplete(long checkpointId) {
+ List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+ if (!pendingInstants.isEmpty() &&
this.indexBootstrapGuardOption.isPresent()) {
+ this.indexBootstrapGuardOption.get().blockFor(() ->
getPendingInstantsBefore(checkpointId));
+ }
+ }
+
public void reset(long checkpointId) {
this.eventBuffers.remove(checkpointId);
this.commitGuardOption.ifPresent(CommitGuard::unblock);
+ this.indexBootstrapGuardOption.ifPresent(CommitGuard::unblock);
}
public boolean nonEmpty() {
@@ -155,6 +168,13 @@ public class EventBuffers implements Serializable {
.anyMatch(Objects::nonNull);
}
+ public List<String> getPendingInstantsBefore(long checkpointId) {
+ return this.eventBuffers.entrySet().stream()
+ .filter(entry -> entry.getKey() < checkpointId)
+ .map(entry -> entry.getValue().getLeft())
+ .collect(Collectors.toList());
+ }
+
public String getPendingInstants() {
return
this.eventBuffers.values().stream().map(Pair::getKey).collect(Collectors.joining(","));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 34d016f94960..e7fc705d4b13 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -33,6 +33,7 @@ import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteFunctions;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.bootstrap.RLIBootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
@@ -69,9 +70,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -281,13 +284,18 @@ public class Pipelines {
DataStream<HoodieFlinkInternalRow> dataStream1 =
rowDataToHoodieRecord(conf, rowType, dataStream);
if (conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
+ boolean isRliBootstrap = OptionsResolver.isRecordLevelIndex(conf);
+ if (isRliBootstrap) {
+ conf.set(FlinkOptions.WRITE_OPERATOR_UID,
Pipelines.opUID("stream_write", conf));
+ }
dataStream1 = dataStream1
.transform(
"index_bootstrap",
new HoodieFlinkInternalRowTypeInfo(rowType),
- new BootstrapOperator(conf))
+ isRliBootstrap ? new RLIBootstrapOperator(conf) : new
BootstrapOperator(conf))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
.uid(opUID("index_bootstrap", conf));
+ ((OneInputTransformation<?, ?>)
dataStream1.getTransformation()).setChainingStrategy(ChainingStrategy.ALWAYS);
}
return dataStream1;
@@ -351,8 +359,8 @@ public class Pipelines {
* @return the stream write data stream pipeline
*/
public static DataStream<RowData> hoodieStreamWrite(Configuration conf,
- RowType rowType,
-
DataStream<HoodieFlinkInternalRow> dataStream) {
+ RowType rowType,
+
DataStream<HoodieFlinkInternalRow> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
HoodieIndex.BucketIndexEngineType bucketIndexEngineType =
OptionsResolver.getBucketEngineType(conf);
switch (bucketIndexEngineType) {
@@ -398,10 +406,12 @@ public class Pipelines {
throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
}
} else {
+ // if the index is RLI, the write operator UID will be pre-generated and
set into the configuration.
+ String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+ writeOperatorUid = writeOperatorUid == null ? opUID("stream_write",
conf) : writeOperatorUid;
// uuid is used to generate operator id for the write operator, then the
bucket assign operator can send
// operator event to the coordinator of the write operator based on the
operator id.
// @see org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway.
- String writeOperatorUid = opUID("stream_write", conf);
DataStream<HoodieFlinkInternalRow> bucketAssignStream =
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
boolean isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
SingleOutputStreamOperator<RowData> writeDatastream =
@@ -444,7 +454,7 @@ public class Pipelines {
private static DataStream<HoodieFlinkInternalRow> createBucketAssignStream(
DataStream<HoodieFlinkInternalRow> inputStream, Configuration conf,
RowType rowType, String writeOperatorUid) {
String assignerOperatorName = "bucket_assigner";
- if (OptionsResolver.isRecordLevelIndex(conf)) {
+ if (OptionsResolver.isRecordLevelIndex(conf) &&
!conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
return inputStream
.partitionCustom(new RecordIndexPartitioner(conf), row -> new
HoodieKey(row.getRecordKey(), row.getPartitionPath()))
.transform(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 7580fd80ce56..37a57cdd6e78 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -413,6 +413,10 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"true");
+ // set index bootstrap as true if not specified by user explicitly.
+ if (!conf.contains(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+ }
// generally size of index data is much smaller than data record, so set
the buffer size of
// the index writer as 1/4 of that for data writer if it's not set by
user explicitly.
if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 499137acaf9b..a389d721908b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -234,6 +234,10 @@ public class FlinkWriteClients {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withEngineType(EngineType.FLINK) // this affects the default
value inference
.enable(conf.get(FlinkOptions.METADATA_ENABLED))
+ .withRecordIndexFileGroupCount(
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
"8")),
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
+
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
+ "")))
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withIndexConfig(StreamerUtil.getIndexConfig(conf))
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index a5ccb2ffb20f..0d6a375bf7a6 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.muttley.AthenaIngestionGateway;
import org.apache.hudi.sink.event.Correspondent;
@@ -671,6 +672,33 @@ public class TestStreamWriteOperatorCoordinator {
assertEquals(instant2, inflightInstants.get(2L));
}
+ @Test
+ void testHandleAwaitPendingInstantsRequest() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+ conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
+ coordinator = createCoordinator(conf, 1);
+
+ Thread t = new Thread(() -> {
+ try {
+ CompletableFuture<CoordinationResponse> responseFuture =
+
coordinator.handleCoordinationRequest(Correspondent.AwaitPendingInstantsRequest.getInstance(1));
+ Correspondent.AwaitPendingInstantsResponse response =
+ CoordinationResponseSerDe.unwrap(responseFuture.get());
+ assertNotNull(response);
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ });
+ t.start();
+ // send a bootstrap event to unblock the simulated request from bootstrap
operator.
+ WriteMetadataEvent event1 = createBootstrapEvent(0, 0,
coordinator.getInstant(), "par1");
+ coordinator.handleEventFromOperator(0, event1);
+ t.join();
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
new file mode 100644
index 000000000000..3a59193c86f5
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRocksDBIndexBackend.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test cases for {@link RocksDBIndexBackend}.
+ */
+public class TestRocksDBIndexBackend {
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ void testGetAndUpdate() throws Exception {
+ try (RocksDBIndexBackend rocksDBIndexBackend = new
RocksDBIndexBackend(tempFile.getAbsolutePath())) {
+ assertNull(rocksDBIndexBackend.get("id1"));
+
+ HoodieRecordGlobalLocation location1 = new
HoodieRecordGlobalLocation("par1", "001", "file1");
+ rocksDBIndexBackend.update("id1", location1);
+ assertEquals(location1, rocksDBIndexBackend.get("id1"));
+
+ HoodieRecordGlobalLocation location2 = new
HoodieRecordGlobalLocation("par2", "002", "file2");
+ rocksDBIndexBackend.update("id2", location2);
+ assertEquals(location2, rocksDBIndexBackend.get("id2"));
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
new file mode 100644
index 000000000000..3fc00e76d9ff
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestCommitGuard.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link CommitGuard}.
+ */
+public class TestCommitGuard {
+
+ @Test
+ void testBlockForRechecksPendingInstantsAfterSignal() throws Exception {
+ CommitGuard guard = CommitGuard.create(5_000L);
+ AtomicBoolean pending = new AtomicBoolean(true);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+ () -> guard.blockFor(() -> pending.get() ? List.of("001", "002") :
Collections.emptyList()), executor);
+ CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+ () -> guard.blockFor(() -> pending.get() ? List.of("001", "002") :
Collections.emptyList()), executor);
+
+ Thread.sleep(100);
+ guard.unblock();
+ Thread.sleep(100);
+ assertFalse(waitingFuture1.isDone(), "Should still block because pending
instants are not empty");
+ assertFalse(waitingFuture2.isDone(), "Should still block because pending
instants are not empty");
+
+ pending.set(false);
+ guard.unblock();
+ waitingFuture1.get(2, TimeUnit.SECONDS);
+ assertTrue(waitingFuture1.isDone(), "Should finish when pending instants
become empty");
+ waitingFuture2.get(2, TimeUnit.SECONDS);
+ assertTrue(waitingFuture2.isDone(), "Should finish when pending instants
become empty");
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void testBlockForTimeoutWhenPendingInstantsRemain() {
+ CommitGuard guard = CommitGuard.create(100L);
+ HoodieException hoodieException = assertThrows(
+ HoodieException.class,
+ () -> guard.blockFor(() ->
Collections.singletonList("timeout-instant")));
+ assertTrue(hoodieException.getMessage().contains("timeout-instant"));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
new file mode 100644
index 000000000000..40fe4e58edd0
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestEventBuffers.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link EventBuffers}.
+ */
+public class TestEventBuffers {
+
+ @Test
+ void testAwaitAllInstantsWaitsUntilAllPendingInstantsReset() throws
Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
+ EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
+
+ eventBuffers.initNewEventBuffer(1L, "001");
+ eventBuffers.addEventToBuffer(newWriteEvent(1L, "001"));
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ // two tasks block waiting on commit.
+ CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+ eventBuffers::awaitAllInstantsToCompleteIfNecessary, executor);
+ CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+ eventBuffers::awaitAllInstantsToCompleteIfNecessary, executor);
+
+ Thread.sleep(100);
+ assertFalse(waitingFuture1.isDone());
+ assertFalse(waitingFuture2.isDone());
+
+ eventBuffers.reset(1L);
+ Thread.sleep(100);
+ assertTrue(waitingFuture1.isDone());
+ assertTrue(waitingFuture2.isDone());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void testAwaitPrevInstantsWaitsUntilAllPreviousCheckpointsReset() throws
Exception {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+ conf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+ conf.set(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, 5_000L);
+ conf.set(FlinkOptions.INDEX_WRITE_TASKS, 4);
+ EventBuffers eventBuffers = EventBuffers.getInstance(conf, 1);
+
+ eventBuffers.initNewEventBuffer(1L, "001");
+ eventBuffers.initNewEventBuffer(2L, "002");
+ eventBuffers.initNewEventBuffer(3L, "003");
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ // two tasks block waiting on commit.
+ CompletableFuture<Void> waitingFuture1 = CompletableFuture.runAsync(
+ () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
+ CompletableFuture<Void> waitingFuture2 = CompletableFuture.runAsync(
+ () -> eventBuffers.awaitPrevInstantsToComplete(3L), executor);
+
+ Thread.sleep(100);
+ assertFalse(waitingFuture1.isDone());
+ assertFalse(waitingFuture2.isDone());
+
+ eventBuffers.reset(1L);
+ Thread.sleep(100);
+ assertFalse(waitingFuture1.isDone(), "Checkpoint 2 still pending, should
continue blocking");
+ assertFalse(waitingFuture2.isDone(), "Checkpoint 2 still pending, should
continue blocking");
+
+ eventBuffers.reset(2L);
+ waitingFuture1.get(2, TimeUnit.SECONDS);
+ assertTrue(waitingFuture1.isDone());
+ waitingFuture2.get(2, TimeUnit.SECONDS);
+ assertTrue(waitingFuture2.isDone());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ private static WriteMetadataEvent newWriteEvent(long checkpointId, String
instant) {
+ return WriteMetadataEvent.builder()
+ .taskID(0)
+ .checkpointId(checkpointId)
+ .instantTime(instant)
+ .writeStatus(Collections.emptyList())
+ .lastBatch(true)
+ .build();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 03fd014a9b78..f577d4cfaf39 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3069,6 +3069,31 @@ public class ITTestHoodieDataSource {
+ "+I[id3, id3, Julian, 43, 1970-01-01T00:00:03, par1, par1]]");
}
+ @Test
+ void testRLIBootstrap() {
+ TableEnvironment tableEnv = streamTableEnv;
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ.name())
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
+
+ // insert another batch of records
+ execInsertSql(tableEnv, TestSQL.UPDATE_INSERT_T1);
+
+ result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result1, TestData.DATA_SET_SOURCE_MERGED);
+ }
+
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testMiniBatchBucketAssign(HoodieTableType tableType) throws Exception {
@@ -3078,7 +3103,6 @@ public class ITTestHoodieDataSource {
.options(getDefaultKeys())
.option(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name())
.option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
- .option(FlinkOptions.TABLE_TYPE, COPY_ON_WRITE)
.option(FlinkOptions.TABLE_TYPE, tableType.name())
.end();
tableEnv.executeSql(hoodieTableDDL);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index 245300445ca2..0762bd66838d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.DirectWriteMarkers;
import org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers;
@@ -103,6 +105,21 @@ public class TestFlinkWriteClients {
}
}
+ @Test
+ void testDefaultGlobalRecordIndexMinFileGroupCountForFlink() {
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+ assertEquals(8, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
+ }
+
+ @Test
+ void testUserConfiguredGlobalRecordIndexMinFileGroupCountIsNotOverridden() {
+ conf.set(FlinkOptions.INDEX_TYPE,
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
"12");
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf, false, false);
+ assertEquals(12, writeConfig.getGlobalRecordLevelIndexMinFileGroupCount());
+ }
+
@ParameterizedTest
@ValueSource(strings = {"", "DIRECT", "TIMELINE_SERVER_BASED"})
void testMarkerType(String markerType) throws Exception {