hudi-agent commented on code in PR #18865:
URL: https://github.com/apache/hudi/pull/18865#discussion_r3313537008
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -146,7 +146,8 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
with SparkDatasetMix
"Metadata files partition count should be lower than data table file
count after rebootstrap")
}
Review Comment:
🤖 nit: `deferRLIInit` reads as an imperative verb rather than a boolean
state — could you rename it to `rliInitDeferred` (or `isRliInitDeferred`) to
match the past-participle style used by the sibling parameter
`streamingWriteEnabled`?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -301,6 +311,108 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionedRecordLevelIndexDefer(streamingWriteEnabled: Boolean):
Unit = {
+ val holder = new testRecordLevelIndexHolder
+ testRecordLevelIndex(HoodieTableType.MERGE_ON_READ, streamingWriteEnabled,
holder, true)
+ assertEquals("deltacommit",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ val writeConfig = getWriteConfig(holder.options)
+ var metadata = metadataWriter(writeConfig).getTableMetadata
+ doAllAssertions(holder, metadata)
+ val writeClient = new SparkRDDWriteClient(new
HoodieSparkEngineContext(jsc), writeConfig)
+ val timeOpt = writeClient.scheduleCompaction(HOption.empty())
+ assertTrue(timeOpt.isPresent)
+ writeClient.compact(timeOpt.get())
+ metaClient.reloadActiveTimeline()
+ assertEquals("compaction",
metaClient.getActiveTimeline.lastInstant().get().getAction)
+ metadata = metadataWriter(writeConfig).getTableMetadata
+ doAllAssertions(holder, metadata)
+ writeClient.close()
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testPartitionedRecordLevelIndexDeferWithBulkInsert(streamingWriteEnabled:
Boolean): Unit = {
+ val tableType = HoodieTableType.MERGE_ON_READ
+ val dataGen = new HoodieTestDataGenerator()
+ val inserts1 = dataGen.generateInserts("001", 5)
+ val batch1Df = toDataset(spark, inserts1)
+ val insertDf1 = batch1Df.withColumn("data_partition_path",
lit("partition1"))
+ .union(batch1Df.withColumn("data_partition_path", lit("partition2")))
+
+ val options = Map(
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "data_partition_path",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() ->
streamingWriteEnabled.toString,
+ HoodieMetadataConfig.DEFER_RLI_INIT_FOR_FRESH_TABLE.key() -> "true",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name())
+
+ // Commit #1: bulk_insert on a fresh table with defer enabled.
+ insertDf1.write.format("hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+ // Defer should have kicked in: RLI partition is not initialized after the
first bulk_insert.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertFalse(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+ "RLI partition should not be initialized after the first bulk_insert
when defer is enabled")
+
+ // Commit #2: another bulk_insert into a new partition. New rows must use
distinct record keys.
+ val inserts2 = dataGen.generateInserts("002", 5)
+ val batch2Df = toDataset(spark, inserts2)
+ val insertDf2 = batch2Df.withColumn("data_partition_path",
lit("partition3"))
+
+ insertDf2.write.format("hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key(),
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ assertEquals(15, spark.read.format("hudi").load(basePath).count())
+
+ // Build metadata writer/reader; this entry will initialize RLI now that
there is a completed commit.
+ val writeConfig = getWriteConfig(options)
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+
+ // RLI partition should now be present in the metadata table.
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath),
+ "RLI partition should be initialized once a completed commit exists on
the data table")
+ assertTrue(HoodieRecordIndex.isPartitioned(
+
metaClient.getIndexMetadata.get().getIndexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)),
+ "RLI should be initialized as partitioned RLI")
+
+ // Validate record key -> location mapping for both batches against the
data.
Review Comment:
🤖 nit: `df` is a bit misleading here since `.collect()` returns an
`Array[Row]`, not a DataFrame — something like `allRows` or `tableRows` would
make the type/intent clearer for `validateDFWithLocations` callers reading
below.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]