[
https://issues.apache.org/jira/browse/HUDI-9568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davis Zhang updated HUDI-9568:
------------------------------
Priority: Blocker (was: Major)
> secondary index version 8 insert failure
> ----------------------------------------
>
> Key: HUDI-9568
> URL: https://issues.apache.org/jira/browse/HUDI-9568
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Davis Zhang
> Priority: Blocker
>
> test fail
>
> - insert data
> - create SI
> - insert more data -> insertion is not successful
> /**
> * Test case to verify that secondary indexes are automatically dropped when a
> table is upgraded
> * from version 8 to version 9. This test:
> * 1. Creates a table with version 8
> * 2. Creates secondary indexes on 'name' and 'price' columns
> * 3. Verifies the indexes are created successfully
> */
> test("Auto upgrade/downgrade drops secondary index") {
> defverifyIndexVersion(basePath: String, tblVersion: Int, idxVersion:
> Int):Unit= {
> // Verify the table version
> valmetaClient=HoodieTableMetaClient.builder().setBasePath(basePath)
> .setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
> assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(),
> tblVersion)
> valindexDefs= metaClient.getIndexMetadata.get().getIndexDefinitions
> indexDefs.forEach((indexName, idxDef) => {
> if (indexName =="column_stats") {
> assertEquals(idxDef.getVersion, HoodieIndexVersion.V1)
> } else if (indexName.startsWith("secondary_index_")) {
> assertEquals(idxDef.getVersion.versionCode(), idxVersion)
> }
> })
> }
> defverifyData(tableName: String, expectedData: Seq[Seq[Any]]):Unit= {
> // Verify data after insert
> checkAnswer(s"select id, name, price, ts from $tableName order by
> id")(expectedData: _*)
> checkAnswer(s"select id, name, price, ts from $tableName where name = 'a1'")(
> expectedData.head
> )
> }
> /**
> * Helper function to create and validate secondary indexes
> *
> * @param tableName Name of the table
> * @param basePath Base path of the table
> * @param tableVersion Expected table version
> * @param indexVersion Expected index version
> */
> defdropRecreateIdxAndValidate(tableName: String, basePath: String,
> tableVersion: Int, indexVersion: Int, dropRecreate: Boolean, expectedData:
> Seq[Seq[Any]]):Unit= {
> // Drop and recreate secondary indexes on name and price columns
> if (dropRecreate) {
> spark.sql(s"drop index idx_name on $tableName")
> spark.sql(s"drop index idx_price on $tableName")
> }
> spark.sql(s"create index idx_name on $tableName (name)")
> spark.sql(s"create index idx_price on $tableName (price)")
> // Both indexes should be shown
> checkAnswer(s"show indexes from $tableName")(
> Seq("column_stats", "column_stats", ""),
> Seq("secondary_index_idx_name", "secondary_index", "name"),
> Seq("secondary_index_idx_price", "secondary_index", "price"),
> Seq("record_index", "record_index", "")
> )
> verifyData(tableName, expectedData)
> verifyIndexVersion(basePath, tableVersion, indexVersion)
> }
> // Test for both Copy-on-Write (COW) and Merge-on-Read (MOR) table types
> Seq("cow", "mor").foreach { tableType =>
> withTempDir { tmp =>
> valtableName= generateTableName
> valbasePath=s"${tmp.getCanonicalPath}/$tableName"
> spark.sql("set hoodie.embed.timeline.server=false")
> // Create table with version 8
> spark.sql(
> s"""
> |create table $tableName (
> | id int,
> | name string,
> | price double,
> | ts long
> |) using hudi
> | options (
> | primaryKey ='id',
> | type = '$tableType',
> | preCombineField = 'ts',
> | hoodie.metadata.enable = 'true',
> | hoodie.metadata.record.index.enable = 'true',
> | hoodie.metadata.index.column.stats.enable = 'true',
> | hoodie.metadata.index.secondary.enable = 'true',
> | hoodie.write.table.version = '8',
> | hoodie.datasource.write.payload.class =
> 'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload'
> | )
> | location '$basePath'
> """.stripMargin)
> // Insert initial test data
> spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
> spark.sql(s"insert into $tableName values(2, 'a2', 20, 1000)")
> spark.sql(s"insert into $tableName values(3, 'a3', 30, 1000)")
> // Secondary index is created by default for non record key column when index
> type is not specified
> // Before we create any index/after upgrade/downgrade, by default we should
> only have indexes below.
> checkAnswer(s"show indexes from $tableName")(
> Seq("column_stats", "column_stats", ""),
> Seq("record_index", "record_index", "")
> )
> // Create and validate secondary indexes for version 8
> dropRecreateIdxAndValidate(tableName, basePath, 8, 1, dropRecreate = false,
> Seq(
> Seq(1, "a1", 10.0, 1000),
> Seq(2, "a2", 20.0, 1000),
> Seq(3, "a3", 30.0, 1000)
> ))
> // Upgrade table to version 9 and verify secondary indexes are dropped
> withSparkSqlSessionConfig(s"hoodie.write.table.version" -> "9") {
> // Update a record to trigger version upgrade
> spark.sql(s"insert into $tableName values(1, 'a1', 11, 1001)")
> // Both indexes should be shown
> checkAnswer(s"show indexes from $tableName")(
> Seq("column_stats", "column_stats", ""),
> Seq("secondary_index_idx_name", "secondary_index", "name"),
> Seq("secondary_index_idx_price", "secondary_index", "price"),
> Seq("record_index", "record_index", "")
> )
> valexpected=Seq(
> Seq(1, "a1", 11.0, 1001),
> Seq(2, "a2", 20.0, 1000),
> Seq(3, "a3", 30.0, 1000)
> )
> verifyData(tableName, expected)
> verifyIndexVersion(basePath, 9, 1)
>
> // Verify that secondary indexes are dropped after upgrade
> dropRecreateIdxAndValidate(tableName, basePath, 9, 2, dropRecreate = true,
> expected)
> }
> }
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)