yihua commented on code in PR #18353:
URL: https://github.com/apache/hudi/pull/18353#discussion_r2989563852


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -450,6 +450,13 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
       }
     }
 
+    // for a fresh table, lets defer RLI initialization
+    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
+        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
+      this.enabledPartitionTypes.remove(RECORD_INDEX);
+      partitionsToInit.remove(RECORD_INDEX);
+    }

Review Comment:
   I think this is OK for now.  Once we introduce the index abstraction from 
#12983, this code logic can be easily improved.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -1941,6 +1941,138 @@ public void testFailedBootstrap() throws Exception {
     }
   }
 
+  /**
+   * Test that partitioned RLI initialization is deferred for fresh tables.
+   * Partitioned RLI should NOT be initialized on the first commit but should 
be initialized
+   * on the second commit with programmatically determined file group count 
(should be 1 for small tables).
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void 
testPartitionedRecordIndexDeferredInitializationForFreshTable(HoodieTableType 
tableType) throws Exception {
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with partitioned record index enabled (not global)
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .withIndexType(HoodieIndex.IndexType.RECORD_INDEX)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordLevelIndex(true)  // Partitioned RLI
+            .withPartitionedRecordIndexFileGroupCount(2,2)
+            .withDeferRliInitializationForFreshTable(true)
+            .build())
+        .build();
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
writeConfig)) {
+      // First commit - Partitioned RLI should NOT be initialized yet for a 
fresh table
+      String firstCommitTime = client.startCommit();
+      List<HoodieRecord> records = dataGen.generateInserts(firstCommitTime, 
1000);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
2), firstCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(firstCommitTime, jsc.parallelize(writeStatuses));
+
+      // Verify metadata table exists
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      assertTrue(metaClient.getTableConfig().isMetadataTableAvailable());
+
+      // Verify partitioned RLI partition is NOT initialized after first commit
+      
assertFalse(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should NOT be initialized on first commit for a 
fresh table");
+
+      // Files partition should be initialized
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES),
+          "Files partition should be initialized");
+
+      // Second commit - Partitioned RLI should NOW be initialized
+      String secondCommitTime = client.startCommit();
+      List<HoodieRecord> moreRecords = 
dataGen.generateInserts(secondCommitTime, 500);
+      writeStatuses = client.insert(jsc.parallelize(moreRecords, 2), 
secondCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commit(secondCommitTime, jsc.parallelize(writeStatuses));
+
+      // Reload and verify partitioned RLI is now initialized
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      
assertTrue(metaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+          "Partitioned RLI should be initialized after second commit");
+
+      // Verify file group count is 6 (2 for each data table partition and we 
have 3 partitions)
+      HoodieBackedTableMetadata metadataReader = (HoodieBackedTableMetadata) 
metadata(client, storage);
+      int fileGroupCount = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(
+          metadataReader.getMetadataMetaClient(), Option.empty(),
+          RECORD_INDEX.getPartitionPath()).size();
+
+      // For partitioned RLI with small data, file group count should be 6 (2 
as default for 3 partitions)
+      assertEquals(6, fileGroupCount,
+          "File group count should be 6 for partitioned RLI table, but got: " 
+ fileGroupCount);
+
+      // Validate metadata integrity
+      validateMetadata(client);
+    }
+  }
+
+  /**
+   * Test that partitioned RLI with larger data results in appropriate file 
group count.
+   * This validates that the file group count is determined programmatically 
based on data size,
+   * not using a hardcoded default.
+   */
+  @Test
+  public void testGlobalRecordIndexDeferredInitialization() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    // Config with partitioned record index enabled
+    HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()

Review Comment:
   The Javadoc says "Test that partitioned RLI with larger data results in 
appropriate file group count" but this test is for *global* RLI (it uses 
`withEnableGlobalRecordLevelIndex(true)`). Also the inline comment on line 2041 
says `// Partitioned RLI` which adds to the confusion. Could you update the 
Javadoc and inline comments to reflect that this is testing global RLI deferred 
initialization?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -450,6 +450,13 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
       }
     }
 
+    // for a fresh table, lets defer RLI initialization
+    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)

Review Comment:
   nit: `countInstants() == 0` iterates through all instants just to check 
emptiness. Something like 
`!...filterCompletedInstants().getInstants().findAny().isPresent()` (or an 
`empty()` method if available on the timeline) would short-circuit immediately 
on non-fresh tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to