hudi-agent commented on code in PR #18865:
URL: https://github.com/apache/hudi/pull/18865#discussion_r3313537008


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -146,7 +146,8 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
       "Metadata files partition count should be lower than data table file 
count after rebootstrap")
   }

Review Comment:
   🤖 nit: `deferRLIInit` reads as an imperative verb rather than a boolean 
state — could you rename it to `rliInitDeferred` (or `isRliInitDeferred`) to 
match the past-participle style used by the sibling parameter 
`streamingWriteEnabled`?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -301,6 +311,108 @@ class TestRecordLevelIndex extends 
RecordLevelIndexTestBase with SparkDatasetMix
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testPartitionedRecordLevelIndexDefer(streamingWriteEnabled: Boolean): 
Unit = {
+    val holder = new testRecordLevelIndexHolder
+    testRecordLevelIndex(HoodieTableType.MERGE_ON_READ, streamingWriteEnabled, 
holder, true)
+    assertEquals("deltacommit", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+    val writeConfig = getWriteConfig(holder.options)
+    var metadata = metadataWriter(writeConfig).getTableMetadata
+    doAllAssertions(holder, metadata)
+    val writeClient = new SparkRDDWriteClient(new 
HoodieSparkEngineContext(jsc), writeConfig)
+    val timeOpt = writeClient.scheduleCompaction(HOption.empty())
+    assertTrue(timeOpt.isPresent)
+    writeClient.compact(timeOpt.get())
+    metaClient.reloadActiveTimeline()
+    assertEquals("compaction", 
metaClient.getActiveTimeline.lastInstant().get().getAction)
+    metadata = metadataWriter(writeConfig).getTableMetadata
+    doAllAssertions(holder, metadata)
+    writeClient.close()
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testPartitionedRecordLevelIndexDeferWithBulkInsert(streamingWriteEnabled: 
Boolean): Unit = {
+    val tableType = HoodieTableType.MERGE_ON_READ
+    val dataGen = new HoodieTestDataGenerator()
+    val inserts1 = dataGen.generateInserts("001", 5)
+    val batch1Df = toDataset(spark, inserts1)
+    val insertDf1 = batch1Df.withColumn("data_partition_path", 
lit("partition1"))
+      .union(batch1Df.withColumn("data_partition_path", lit("partition2")))
+
+    val options = Map(
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+      RECORDKEY_FIELD.key -> "_row_key",
+      PARTITIONPATH_FIELD.key -> "data_partition_path",
+      HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+      HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> 
"false",
+      HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+      HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> 
streamingWriteEnabled.toString,
+      HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() -> "true",
+      HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+      HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+
+    // Commit #1: bulk_insert on a fresh table with defer enabled.
+    insertDf1.write.format("hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+    // Defer should have kicked in: RLI partition is not initialized after the 
first bulk_insert.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+      "RLI partition should not be initialized after the first bulk_insert 
when defer is enabled")
+
+    // Commit #2: another bulk_insert into a new partition. New rows must use 
distinct record keys.
+    val inserts2 = dataGen.generateInserts("002", 5)
+    val batch2Df = toDataset(spark, inserts2)
+    val insertDf2 = batch2Df.withColumn("data_partition_path", 
lit("partition3"))
+
+    insertDf2.write.format("hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    assertEquals(15, spark.read.format("hudi").load(basePath).count())
+
+    // Build metadata writer/reader; this entry will initialize RLI now that 
there is a completed commit.
+    val writeConfig = getWriteConfig(options)
+    val metadata = metadataWriter(writeConfig).getTableMetadata
+
+    // RLI partition should now be present in the metadata table.
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+      "RLI partition should be initialized once a completed commit exists on 
the data table")
+    assertTrue(HoodieRecordIndex.isPartitioned(
+      
metaClient.getIndexMetadata.get().getIndexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)),
+      "RLI should be initialized as partitioned RLI")
+
+    // Validate record key -> location mapping for both batches against the 
data.

Review Comment:
   🤖 nit: `df` is a bit misleading here since `.collect()` returns an 
`Array[Row]`, not a DataFrame — something like `allRows` or `tableRows` would 
make the type/intent clearer for `validateDFWithLocations` callers reading 
below.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to