codope commented on code in PR #18353:
URL: https://github.com/apache/hudi/pull/18353#discussion_r2987935295
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -2031,6 +1941,138 @@ public void testFailedBootstrap() throws Exception {
}
}
+ /**
+ * Test that partitioned RLI initialization is deferred for fresh tables.
+ * Partitioned RLI should NOT be initialized on the first commit but should
be initialized
+ * on the second commit with programmatically determined file group count
(should be 1 for small tables).
+ */
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void
testPartitionedRecordIndexDeferredInitializationForFreshTable(HoodieTableType
tableType) throws Exception {
+ init(tableType);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Config with partitioned record index enabled (not global)
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordLevelIndex(true) // Partitioned RLI
+ .withPartitionedRecordIndexFileGroupCount(2,2)
+ .withDeferRliInitializationForFreshTable(true)
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // First commit - Partitioned RLI should NOT be initialized yet for a
fresh table
+ String firstCommitTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime,
1000);
+ List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
2), firstCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+ // Verify metadata table exists
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+ // Verify partitioned RLI partition is NOT initialized after first commit
+
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Partitioned RLI should NOT be initialized on first commit for a
fresh table");
+
+ // Files partition should be initialized
+
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+ "Files partition should be initialized");
+
+ // Second commit - Partitioned RLI should NOW be initialized
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> moreRecords =
dataGen.generateInserts(secondCommitTime, 500);
+ writeStatuses = client.insert(jsc.parallelize(moreRecords, 2),
secondCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+ // Reload and verify partitioned RLI is now initialized
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Partitioned RLI should be initialized after second commit");
+
+ // Verify file group count is 1 for small tables (150 records total)
Review Comment:
typo: the test inserts 1000+500 records right (1500 records total). Also,
the assertion below checks for 6 filegroups, not 1.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -371,6 +371,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("The current number of records are multiplied by this
number when estimating the number of "
+ "file groups to create automatically. This helps account for
growth in the number of records in the dataset.");
+ public static final ConfigProperty<Boolean> DEFER_RLI_INIT_FOR_FRESH_TABLE =
ConfigProperty
+ .key(METADATA_PREFIX + ".record.level.index.defer.for.fresh.table")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("When enabled, defers RLI initialization to 2nd
commit for a fresh table. This should help with determining the file group "
+ + "count dynamically for RLI index (global and non non global RLI)");
Review Comment:
```suggestion
+ "count dynamically for RLI index (global and non-global RLI)");
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -450,6 +450,13 @@ private boolean initializeFromFilesystem(String
dataTableInstantTime, List<Metad
}
}
+ // for a fresh table, lets defer RLI initialization
+ if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable()
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
+ &&
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() ==
0) {
+ this.enabledPartitionTypes.remove(RECORD_INDEX);
+ partitionsToInit.remove(RECORD_INDEX);
+ }
Review Comment:
Why not only remove from `partitionsToInit` (the local variable), and use a
separate flag or check to skip RLI during the first commit? One concern I have
is that mutating `enabledPartitionTypes` may break RLI updates within the same
writer lifecycle.. it works when the writer is recreated per commit, but could
cause subtle bugs in streaming/long-lived writer scenarios.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -690,6 +687,10 @@ public boolean isRecordLevelIndexEnabled() {
return isEnabled() && getBooleanOrDefault(RECORD_LEVEL_INDEX_ENABLE_PROP);
}
+ public boolean shouldDeferRliInitForFreshTable() {
+ return isEnabled() && getBooleanOrDefault(DEFER_RLI_INIT_FOR_FRESH_TABLE);
Review Comment:
callsite already `this.enabledPartitionTypes.contains(RECORD_INDEX)` which
implicitly means metadata is enabled. Is the `isEnabled()` check here more for
safety?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -2031,6 +1941,138 @@ public void testFailedBootstrap() throws Exception {
}
}
+ /**
+ * Test that partitioned RLI initialization is deferred for fresh tables.
+ * Partitioned RLI should NOT be initialized on the first commit but should
be initialized
+ * on the second commit with programmatically determined file group count
(should be 1 for small tables).
+ */
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void
testPartitionedRecordIndexDeferredInitializationForFreshTable(HoodieTableType
tableType) throws Exception {
+ init(tableType);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Config with partitioned record index enabled (not global)
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordLevelIndex(true) // Partitioned RLI
+ .withPartitionedRecordIndexFileGroupCount(2,2)
+ .withDeferRliInitializationForFreshTable(true)
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // First commit - Partitioned RLI should NOT be initialized yet for a
fresh table
+ String firstCommitTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime,
1000);
+ List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
2), firstCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+ // Verify metadata table exists
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+ // Verify partitioned RLI partition is NOT initialized after first commit
+
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Partitioned RLI should NOT be initialized on first commit for a
fresh table");
+
+ // Files partition should be initialized
+
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+ "Files partition should be initialized");
+
+ // Second commit - Partitioned RLI should NOW be initialized
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> moreRecords =
dataGen.generateInserts(secondCommitTime, 500);
+ writeStatuses = client.insert(jsc.parallelize(moreRecords, 2),
secondCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+ // Reload and verify partitioned RLI is now initialized
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Partitioned RLI should be initialized after second commit");
+
+ // Verify file group count is 1 for small tables (150 records total)
+ HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata)
metadata(client, storage);
+ int fileGroupCount =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+ metadataReader.getMetadataMetaClient(), Option.empty(),
+ RECORD_INDEX.getPartitionPath()).size();
+
+ // For partitioned RLI with small data, file group count should be 6 (2
as default for 3 partitions)
+ assertEquals(6, fileGroupCount,
+ "File group count should be 6 for partitioned RLI table, but got: "
+ fileGroupCount);
+
+ // Validate metadata integrity
+ validateMetadata(client);
+ }
+ }
+
+ /**
+ * Test that partitioned RLI with larger data results in appropriate file
group count.
+ * This validates that the file group count is determined programmatically
based on data size,
+ * not using a hardcoded default.
+ */
+ @Test
+ public void testGlobalRecordIndexDeferredInitialization() throws Exception {
+ init(HoodieTableType.COPY_ON_WRITE);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ // Config with partitioned record index enabled
+ HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableGlobalRecordLevelIndex(true) // Partitioned RLI
+ .withDeferRliInitializationForFreshTable(true)
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
writeConfig)) {
+ // First commit with moderate data size (5000 records)
+ String firstCommitTime = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime,
4000);
+ List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records,
5), firstCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+ // Verify RLI is NOT initialized after first commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Partitioned RLI should NOT be initialized on first commit");
+
+ // Second commit to trigger RLI initialization
+ String secondCommitTime = client.startCommit();
+ List<HoodieRecord> moreRecords =
dataGen.generateInserts(secondCommitTime, 2000);
+ writeStatuses = client.insert(jsc.parallelize(moreRecords, 3),
secondCommitTime).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+ // Verify partitioned RLI is now initialized
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX));
+
+ // Verify file group count is determined based on data size
+ HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata)
metadata(client, storage);
+ int fileGroupCount =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+ metadataReader.getMetadataMetaClient(), Option.empty(),
+ RECORD_INDEX.getPartitionPath()).size();
+
+ // For 4000 records with partitioned RLI, file group count should be
between min (1) and max (10)
+ // It should be > 1 for this data size based on estimation logic
+ assertTrue(fileGroupCount > 1,
Review Comment:
let's also assert `fileGroupCount <= maxConfiguredValue` to catch
regressions where file group count estimation overshoots
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]