This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dbd5a71a0e2 [HUDI-8791] Test secondary index on auto keygen tables
(#12557)
dbd5a71a0e2 is described below
commit dbd5a71a0e274a7d1d28da29056cbcc31477bcc9
Author: Sagar Sumit <[email protected]>
AuthorDate: Fri Jan 3 08:23:57 2025 +0530
[HUDI-8791] Test secondary index on auto keygen tables (#12557)
---
.../hudi/command/index/TestSecondaryIndex.scala | 69 +++++++++++++++++++++-
1 file changed, 67 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
index 43b03806261..460e4f4395b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -30,8 +30,7 @@ import org.apache.hudi.config.{HoodieClusteringConfig,
HoodieCompactionConfig, H
import
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_RECORD_KEY_SEPARATOR
import org.apache.hudi.metadata.SecondaryIndexKeyUtils
import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
-
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieSparkUtils}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -507,6 +506,72 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
}
}
+ /**
+ * Test secondary index with auto generation of record keys
+ */
+ test("Test Secondary Index With Auto Record Key Generation") {
+ withTempDir { tmp =>
+ val tableName = generateTableName + s"_si_auto_keygen"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | dateDefault STRING,
+ | date STRING,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | type = 'mor',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.enable.data.skipping = 'true',
+ | hoodie.metadata.record.index.enable = 'true',
+ | hoodie.datasource.write.payload.class =
'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |INSERT INTO $tableName(ts, id, rider, driver, fare, dateDefault,
date, city, state) VALUES
+ | (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30
01:30:40', '2024-11-30', 'sunnyvale','california'),
+ | (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30
01:30:40', '2019-11-30', 'san_diego','texas')
+ |""".stripMargin)
+
+ // create secondary index
+ spark.sql(s"CREATE INDEX idx_city on $tableName (city)")
+ // validate secondary index
+ var expectedSecondaryKeys = spark.sql(s"SELECT _hoodie_record_key, city
from $tableName")
+ .collect().map(row =>
SecondaryIndexKeyUtils.constructSecondaryIndexKey(row.getString(1),
row.getString(0)))
+ var actualSecondaryKeys = spark.sql(s"SELECT key FROM
hudi_metadata('$basePath') WHERE type=7 AND key LIKE
'%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR%'")
+ .collect().map(indexKey => indexKey.getString(0))
+ assertEquals(expectedSecondaryKeys.toSet, actualSecondaryKeys.toSet)
+
+ // update record
+ spark.sql(s"UPDATE $tableName SET city = 'san_francisco' WHERE rider =
'rider-C'")
+ // validate secondary index
+ expectedSecondaryKeys = spark.sql(s"SELECT _hoodie_record_key, city from
$tableName")
+ .collect().map(row =>
SecondaryIndexKeyUtils.constructSecondaryIndexKey(row.getString(1),
row.getString(0)))
+ actualSecondaryKeys = spark.sql(s"SELECT key FROM
hudi_metadata('$basePath') WHERE type=7 AND key LIKE
'%$SECONDARY_INDEX_RECORD_KEY_SEPARATOR%'")
+ .collect().map(indexKey => indexKey.getString(0))
+ assertEquals(expectedSecondaryKeys.toSet, actualSecondaryKeys.toSet)
+ }
+ }
+
private def loadInitialBatchAndCreateSecondaryIndex(tableName: String,
basePath: String, dataGen: HoodieTestDataGenerator, numInserts: Integer = 50) =
{
val initialRecords =
recordsToStrings(dataGen.generateInserts(getInstantTime, numInserts,
true)).asScala
val initialDf =
spark.read.json(spark.sparkContext.parallelize(initialRecords.toSeq, 2))