Davis Zhang created HUDI-9568:
---------------------------------
Summary: secondary index version 8 insert failure
Key: HUDI-9568
URL: https://issues.apache.org/jira/browse/HUDI-9568
Project: Apache Hudi
Issue Type: Bug
Reporter: Davis Zhang
test fail
- insert data
- create SI
- insert more data -> insertion is not successful
/**
* Test case to verify that secondary indexes are automatically dropped when a
table is upgraded
* from version 8 to version 9. This test:
* 1. Creates a table with version 8
* 2. Creates secondary indexes on 'name' and 'price' columns
* 3. Verifies the indexes are created successfully
*/
test("Auto upgrade/downgrade drops secondary index") {
defverifyIndexVersion(basePath: String, tblVersion: Int, idxVersion: Int):Unit=
{
// Verify the table version
valmetaClient=HoodieTableMetaClient.builder().setBasePath(basePath)
.setConf(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf())).build()
assertEquals(metaClient.getTableConfig.getTableVersion.versionCode(),
tblVersion)
valindexDefs= metaClient.getIndexMetadata.get().getIndexDefinitions
indexDefs.forEach((indexName, idxDef) => {
if (indexName =="column_stats") {
assertEquals(idxDef.getVersion, HoodieIndexVersion.V1)
} else if (indexName.startsWith("secondary_index_")) {
assertEquals(idxDef.getVersion.versionCode(), idxVersion)
}
})
}
defverifyData(tableName: String, expectedData: Seq[Seq[Any]]):Unit= {
// Verify data after insert
checkAnswer(s"select id, name, price, ts from $tableName order by
id")(expectedData: _*)
checkAnswer(s"select id, name, price, ts from $tableName where name = 'a1'")(
expectedData.head
)
}
/**
* Helper function to create and validate secondary indexes
*
* @param tableName Name of the table
* @param basePath Base path of the table
* @param tableVersion Expected table version
* @param indexVersion Expected index version
*/
defdropRecreateIdxAndValidate(tableName: String, basePath: String,
tableVersion: Int, indexVersion: Int, dropRecreate: Boolean, expectedData:
Seq[Seq[Any]]):Unit= {
// Drop and recreate secondary indexes on name and price columns
if (dropRecreate) {
spark.sql(s"drop index idx_name on $tableName")
spark.sql(s"drop index idx_price on $tableName")
}
spark.sql(s"create index idx_name on $tableName (name)")
spark.sql(s"create index idx_price on $tableName (price)")
// Both indexes should be shown
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("secondary_index_idx_name", "secondary_index", "name"),
Seq("secondary_index_idx_price", "secondary_index", "price"),
Seq("record_index", "record_index", "")
)
verifyData(tableName, expectedData)
verifyIndexVersion(basePath, tableVersion, indexVersion)
}
// Test for both Copy-on-Write (COW) and Merge-on-Read (MOR) table types
Seq("cow", "mor").foreach { tableType =>
withTempDir { tmp =>
valtableName= generateTableName
valbasePath=s"${tmp.getCanonicalPath}/$tableName"
spark.sql("set hoodie.embed.timeline.server=false")
// Create table with version 8
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts',
| hoodie.metadata.enable = 'true',
| hoodie.metadata.record.index.enable = 'true',
| hoodie.metadata.index.column.stats.enable = 'true',
| hoodie.metadata.index.secondary.enable = 'true',
| hoodie.write.table.version = '8',
| hoodie.datasource.write.payload.class =
'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload'
| )
| location '$basePath'
""".stripMargin)
// Insert initial test data
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 20, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 30, 1000)")
// Secondary index is created by default for non record key column when index
type is not specified
// Before we create any index/after upgrade/downgrade, by default we should
only have indexes below.
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("record_index", "record_index", "")
)
// Create and validate secondary indexes for version 8
dropRecreateIdxAndValidate(tableName, basePath, 8, 1, dropRecreate = false, Seq(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 20.0, 1000),
Seq(3, "a3", 30.0, 1000)
))
// Upgrade table to version 9 and verify secondary indexes are dropped
withSparkSqlSessionConfig(s"hoodie.write.table.version" -> "9") {
// Update a record to trigger version upgrade
spark.sql(s"insert into $tableName values(1, 'a1', 11, 1001)")
// Both indexes should be shown
checkAnswer(s"show indexes from $tableName")(
Seq("column_stats", "column_stats", ""),
Seq("secondary_index_idx_name", "secondary_index", "name"),
Seq("secondary_index_idx_price", "secondary_index", "price"),
Seq("record_index", "record_index", "")
)
valexpected=Seq(
Seq(1, "a1", 11.0, 1001),
Seq(2, "a2", 20.0, 1000),
Seq(3, "a3", 30.0, 1000)
)
verifyData(tableName, expected)
verifyIndexVersion(basePath, 9, 1)
// Verify that secondary indexes are dropped after upgrade
dropRecreateIdxAndValidate(tableName, basePath, 9, 2, dropRecreate = true,
expected)
}
}
}
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)