nsivabalan commented on code in PR #18353:
URL: https://github.com/apache/hudi/pull/18353#discussion_r2988720082


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -2031,6 +1941,138 @@ public void testFailedBootstrap() throws Exception {
     }
   }
 
+  /**
+   * Test that partitioned RLI initialization is deferred for fresh tables.
+   * Partitioned RLI should NOT be initialized on the first commit but should 
be initialized
+   * on the second commit with programmatically determined file group count 
(should be 1 for small tables).
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testPartitionedRecordIndexDeferredInitializationForFreshTable(HoodieTableType 
tableType) throws Exception {
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with partitioned record index enabled (not global)
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordLevelIndex(true)  // Partitioned RLI
+            .withPartitionedRecordIndexFileGroupCount(2,2)
+            .withDeferRliInitializationForFreshTable(true)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit - Partitioned RLI should NOT be initialized yet for a 
fresh table
+      String firstCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
1000);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
2), firstCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify metadata table exists
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+      // Verify partitioned RLI partition is NOT initialized after first commit
+      
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should NOT be initialized on first commit for a 
fresh table");
+
+      // Files partition should be initialized
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+          "Files partition should be initialized");
+
+      // Second commit - Partitioned RLI should NOW be initialized
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> moreRecords = 
dataGen.generateInserts(secondCommitTime, 500);
+      writeStatuses = client.insert(jsc.parallelize(moreRecords, 2), 
secondCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+      // Reload and verify partitioned RLI is now initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should be initialized after second commit");
+
+      // Verify file group count is 1 for small tables (150 records total)
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client, storage);
+      int fileGroupCount = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+          metadataReader.getMetadataMetaClient(), Option.empty(),
+          RECORD_INDEX.getPartitionPath()).size();
+
+      // For partitioned RLI with small data, file group count should be 6 (2 
as default for 3 partitions)
+      assertEquals(6, fileGroupCount,
+          "File group count should be 6 for partitioned RLI table, but got: " 
+ fileGroupCount);
+
+      // Validate metadata integrity
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test that partitioned RLI with larger data results in appropriate file 
group count.
+   * This validates that the file group count is determined programmatically 
based on data size,
+   * not using a hardcoded default.
+   */
+  @Test
+  public void testGlobalRecordIndexDeferredInitialization() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with partitioned record index enabled
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableGlobalRecordLevelIndex(true)  // Partitioned RLI
+            .withDeferRliInitializationForFreshTable(true)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit with moderate data size (5000 records)
+      String firstCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
4000);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
5), firstCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify RLI is NOT initialized after first commit
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should NOT be initialized on first commit");
+
+      // Second commit to trigger RLI initialization
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> moreRecords = 
dataGen.generateInserts(secondCommitTime, 2000);
+      writeStatuses = client.insert(jsc.parallelize(moreRecords, 3), 
secondCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify partitioned RLI is now initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX));
+
+      // Verify file group count is determined based on data size
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client, storage);
+      int fileGroupCount = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+          metadataReader.getMetadataMetaClient(), Option.empty(),
+          RECORD_INDEX.getPartitionPath()).size();
+
+      // For 4000 records with partitioned RLI, file group count should be 
between min (1) and max (10)
+      // It should be > 1 for this data size based on estimation logic
+      assertTrue(fileGroupCount > 1,

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to