yihua commented on code in PR #11472:
URL: https://github.com/apache/hudi/pull/11472#discussion_r1662945410
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala:
##########
@@ -34,74 +43,294 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
val sqlTempTable = "hudi_tbl"
test("Test partition stats index following insert, merge into, update and
delete") {
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- // Create table with date type partition
- spark.sql(
- s"""
- | create table $tableName using hudi
- | partitioned by (dt)
- | tblproperties(
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | 'hoodie.metadata.index.partition.stats.enable' = 'true'
- | )
- | location '$tablePath'
- | AS
- | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
cast('2021-05-06' as date) as dt
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Create table with date type partition
+ spark.sql(
+ s"""
+ | create table $tableName using hudi
+ | partitioned by (dt)
+ | tblproperties(
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | 'hoodie.metadata.index.partition.stats.enable' = 'true',
+ | 'hoodie.metadata.index.column.stats.column.list' = 'name'
+ | )
+ | location '$tablePath'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts,
cast('2021-05-06' as date) as dt
""".stripMargin
- )
+ )
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ HoodieSparkSqlTestBase.getLastCommitMetadata(spark,
tablePath).getOperationType
+ }
+ checkAnswer(s"select id, name, price, ts, cast(dt as string) from
$tableName")(
+ Seq(1, "a1", 10, 1000, "2021-05-06")
+ )
+
+ val partitionValue = "2021-05-06"
+
+ // Check the missing properties for spark sql
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ val properties = metaClient.getTableConfig.getProps.asScala.toMap
+
assertResult(true)(properties.contains(HoodieTableConfig.CREATE_SCHEMA.key))
+ assertResult("dt")(properties(HoodieTableConfig.PARTITION_FIELDS.key))
+ assertResult("ts")(properties(HoodieTableConfig.PRECOMBINE_FIELD.key))
+ assertResult(tableName)(metaClient.getTableConfig.getTableName)
+ // Validate partition_stats index exists
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+
+ // Test insert into
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000,
cast('$partitionValue' as date))")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 10, 1000, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+ )
+ // Test merge into
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (select 1 as id, 'a1' as name, 11 as price, 1001 as ts,
'$partitionValue' as dt) s0
+ |on h0.id = s0.id
+ |when matched then update set *
+ |""".stripMargin)
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 10, 1000, partitionValue)
+ )
+ // Test update
+ spark.sql(s"update $tableName set price = price + 1 where id = 2")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("1", s"dt=$partitionValue", 1, "a1", 11, 1001, partitionValue),
+ Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+ )
+ // Test delete
+ spark.sql(s"delete from $tableName where id = 1")
+ checkAnswer(s"select _hoodie_record_key, _hoodie_partition_path, id,
name, price, ts, cast(dt as string) from $tableName order by id")(
+ Seq("2", s"dt=$partitionValue", 2, "a2", 11, 1000, partitionValue)
+ )
+ }
+ }
+ }
+
+ test("Test partition stats index on string type field with insert and file
pruning") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Create table with date type partition
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | ts BIGINT,
+ | uuid STRING,
+ | rider STRING,
+ | driver STRING,
+ | city STRING,
+ | state STRING
+ |) using hudi
+ | options(
+ | type = '$tableType',
+ | primaryKey ='uuid',
+ | preCombineField = 'ts',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'rider'
+ |)
+ |PARTITIONED BY (state)
+ |location '$tablePath'
+ """.stripMargin
+ )
+ // set small file limit to 0 so that each insert creates a new file
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ // insert data in below pattern so that multiple records for 'texas'
and 'california' partition are in same file
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values
(1695159649,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K','san_francisco','california'),
(1695091554,'e96c4396-3fad-413a-a942-4cb36106d721','rider-F','driver-M','sunnyvale','california')
+ | """.stripMargin
+ )
+ spark.sql(s"INSERT INTO $tableName VALUES
(1695332066,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-B','driver-L','new
york city','new york')")
+ spark.sql(s"INSERT INTO $tableName VALUES
(1695516137,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-D','driver-M','princeton','new
jersey')")
+ spark.sql(
+ s"""
+ | insert into $tableName
+ | values
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas')
+ | """.stripMargin
+ )
Review Comment:
> because fgId only has one in one commit, FileFormateUtils::mergeRanges can
not work.

Not getting your point here. `reduce` transformation should still work on a
list containing a single item.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]