codope commented on code in PR #12653:
URL: https://github.com/apache/hudi/pull/12653#discussion_r1922258294
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -1524,4 +1526,59 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
// deleted record should still show in time travel view
assertEquals(1, timeTravelDF.where(s"_row_key = '$recordKey'").count())
}
+
+ /**
+ * Test Secondary Index creation through datasource and metadata write
configs.
+ *
+ * 1. Insert a batch of data with record_index enabled.
+ * 2. Upsert another batch of data with secondary index configs.
+ * 3. Validate that secondary index is created.
+ */
+ @Test
+ def testSecondaryIndexCreation(): Unit = {
+ var (writeOpts, readOpts) = getWriterReaderOpts()
+ writeOpts = writeOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key ->
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
+ HoodieCompactionConfig.INLINE_COMPACT.key -> "false",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key -> "0",
+ HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key -> "true",
+ HoodieIndexConfig.INDEX_TYPE.key -> IndexType.RECORD_INDEX.name()
+ )
+ readOpts = readOpts ++ Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"
+ )
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // Create a MOR table and add 10 records to the table.
+ val records = recordsToStrings(dataGen.generateInserts("000",
3)).asScala.toSeq
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 10))
+ inputDF.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ val snapshotDF =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
+ assertEquals(10, snapshotDF.count())
+
+ // Upsert another batch with secondary index configs
+ val secondaryIndexName = "idx_name"
+ val secondaryIndexColumn = "name"
+ writeOpts = writeOpts ++ Map(
+ HoodieMetadataConfig.SECONDARY_INDEX_ENABLE_PROP.key -> "true",
+ HoodieMetadataConfig.SECONDARY_INDEX_NAME.key -> secondaryIndexName,
+ HoodieMetadataConfig.SECONDARY_INDEX_COLUMN.key -> secondaryIndexColumn
+ )
+ val records2 = recordsToStrings(dataGen.generateInserts("001",
3)).asScala.toSeq
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2,
10))
+ inputDF2.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // validate that secondary index is created
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ // validate the secondary index is built
+ assertTrue(metadataPartitionExists(basePath, context,
SECONDARY_INDEX.getPartitionPath(metaClient, secondaryIndexName)))
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(SECONDARY_INDEX.getPartitionPath(metaClient,
secondaryIndexName)))
Review Comment:
Done and also checked for idempotency
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]