nsivabalan commented on code in PR #18378:
URL: https://github.com/apache/hudi/pull/18378#discussion_r3450034088
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -676,6 +676,75 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
s"Failed to create empty replacement file $candidateBaseFile")
candidateBaseFile.getName
}
+
+ @Test
+ def
testPartitionedRecordLevelIndexWithHiveStylePartitioningAndDotInPartitionField():
Unit = {
+ initMetaClient(HoodieTableType.COPY_ON_WRITE)
+ val dataGen = new HoodieTestDataGenerator()
+ val inserts = dataGen.generateInserts("001", 10)
+ val insertDf = toDataset(spark, inserts)
+
+ // Use fare.currency as partition field to test dots in partition field
names with Hive-style partitioning
+ val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "fare.currency",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> "false",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name(),
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true")
+
+ insertDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+ val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(options).asJava)
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+ val recordKeys = inserts.asScala.map(i =>
i.getRecordKey).asJava.stream().collect(Collectors.toList())
+
+ // Verify record index entries for both partitions
+ // When using fare.currency field, the actual partition paths will be like
"fare.currency=USD"
+ val usdPartitionLocations = readRecordIndex(metadata, recordKeys,
HOption.of("fare.currency=USD"))
+
+ // All records should be found
Review Comment:
Good question — verified. `HoodieTestDataGenerator.generateFareNestedValues`
hard-codes `currency = "USD"` (see hudi-common
HoodieTestDataGenerator.java:612), so all generated records have
`fare.currency=USD` and the assertion is deterministic. Added a comment near
the assertion explaining this.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala:
##########
@@ -676,6 +676,75 @@ class TestRecordLevelIndex extends
RecordLevelIndexTestBase with SparkDatasetMix
s"Failed to create empty replacement file $candidateBaseFile")
candidateBaseFile.getName
}
+
+ @Test
+ def
testPartitionedRecordLevelIndexWithHiveStylePartitioningAndDotInPartitionField():
Unit = {
+ initMetaClient(HoodieTableType.COPY_ON_WRITE)
+ val dataGen = new HoodieTestDataGenerator()
+ val inserts = dataGen.generateInserts("001", 10)
+ val insertDf = toDataset(spark, inserts)
+
+ // Use fare.currency as partition field to test dots in partition field
names with Hive-style partitioning
+ val options = Map(HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.COPY_ON_WRITE.name(),
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "fare.currency",
+ HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp",
+ HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key() ->
"false",
+ HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key() -> "true",
+ HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key() -> "false",
+ HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
+ HoodieIndexConfig.INDEX_TYPE.key() -> RECORD_LEVEL_INDEX.name(),
+ DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true")
+
+ insertDf.write.format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertEquals(10, spark.read.format("hudi").load(basePath).count())
+
+ val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(options).asJava)
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+ val recordKeys = inserts.asScala.map(i =>
i.getRecordKey).asJava.stream().collect(Collectors.toList())
+
+ // Verify record index entries for both partitions
+ // When using fare.currency field, the actual partition paths will be like
"fare.currency=USD"
+ val usdPartitionLocations = readRecordIndex(metadata, recordKeys,
HOption.of("fare.currency=USD"))
+
+ // All records should be found
+ assertEquals(10, usdPartitionLocations.size)
+
+ val df = spark.read.format("hudi").load(basePath).collect()
Review Comment:
Removed in 38d8abc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]