lokeshj1703 commented on code in PR #12558:
URL: https://github.com/apache/hudi/pull/12558#discussion_r1907313785


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala:
##########
@@ -875,6 +875,209 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  /**
+   * Test expression index partition pruning with partition stats.
+   */
+  @Test
+  def testPartitionPruningWithPartitionStats(): Unit = {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName + 
s"_pruning_partition_filters_$tableType"
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+        spark.sql(
+          s"""
+           CREATE TABLE $tableName (
+             |    ts LONG,
+             |    id STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    fare DOUBLE,
+             |    dateDefault STRING,
+             |    date STRING,
+             |    city STRING,
+             |    state STRING
+             |) USING HUDI
+             |options(
+             |    primaryKey ='id',
+             |    type = '$tableType',
+             |    hoodie.metadata.enable = 'true',
+             |    hoodie.datasource.write.recordkey.field = 'id',
+             |    hoodie.enable.data.skipping = 'true'
+             |)
+             |PARTITIONED BY (state)
+             |location '$basePath'
+             |""".stripMargin)
+
+        spark.sql("set hoodie.parquet.small.file.limit=0")
+        if (HoodieSparkUtils.gteqSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled=false")
+        }
+
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30 
01:30:40', '2020-11-30', 'san_francisco','california'),
+             |  (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30 
01:30:40', '2021-11-30', 'san_diego','california'),
+             |  (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30 
01:30:40', '2022-11-30', 'austin','texas'),
+             |  (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30 
01:30:40', '2023-11-30', 'houston','texas')
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30 
01:30:40', '2024-11-30', 'sunnyvale','california'),
+             |  (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30 
01:30:40', '2019-11-30', 'san_diego','texas')
+             |""".stripMargin)
+
+        val tableSchema: StructType =
+          StructType(
+            Seq(
+              StructField("ts", LongType),
+              StructField("id", StringType),
+              StructField("rider", StringType),
+              StructField("driver", StringType),
+              StructField("fare", DoubleType),
+              StructField("dateDefault", StringType),
+              StructField("date", StringType),
+              StructField("city", StringType),
+              StructField("state", StringType)
+            )
+          )
+        val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+        spark.sql(s"create index idx_ts on $tableName using column_stats(ts) 
options(expr='from_unixtime', format='yyyy-MM-dd')")
+        var metaClient = createMetaClient(spark, basePath)
+        // validate skipping with both types of expression
+        val fromUnixTimeExpr = resolveExpr(spark, 
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get, 
tableSchema)
+        var literal = Literal.create("2023-11-07")
+        var dataFilter = EqualTo(fromUnixTimeExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_ts on $tableName")
+
+        spark.sql(s"create index idx_unix on $tableName using 
column_stats(date) options(expr='unix_timestamp', format='yyyy-MM-dd')")
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        val unixTimestamp = resolveExpr(spark, 
unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get, 
tableSchema)
+        literal = Literal.create(1732924800L)
+        dataFilter = EqualTo(unixTimestamp, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_unix on $tableName")
+
+        spark.sql(s"create index idx_to_date on $tableName using 
column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        val toDate = resolveExpr(spark, 
unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get, 
tableSchema)
+        dataFilter = EqualTo(toDate, lit(18230).expr)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_to_date on $tableName")
+      }
+    }
+  }
+
+  /**
+   * Test expression index pruning after update with partition stats.
+   */
+  @Test
+  def testPartitionPruningAfterUpdateWithPartitionStats(): Unit = {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val isTableMOR = tableType.equals("mor")
+        val tableName = generateTableName + 
s"_pruning_partition_filters_$tableType"
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+        spark.sql(
+          s"""
+           CREATE TABLE $tableName (
+             |    ts LONG,
+             |    id STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    fare DOUBLE,
+             |    dateDefault STRING,
+             |    date STRING,
+             |    city STRING,
+             |    state STRING
+             |) USING HUDI
+             |options(
+             |    primaryKey ='id',
+             |    type = '$tableType',
+             |    hoodie.metadata.enable = 'true',
+             |    hoodie.datasource.write.recordkey.field = 'id',
+             |    hoodie.enable.data.skipping = 'true'
+             |)
+             |PARTITIONED BY (state)
+             |location '$basePath'
+             |""".stripMargin)
+
+        spark.sql("set hoodie.parquet.small.file.limit=0")
+        if (HoodieSparkUtils.gteqSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled=false")
+        }
+
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30 
01:30:40', '2020-11-30', 'san_francisco','california'),
+             |  (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30 
01:30:40', '2021-11-30', 'san_diego','california'),
+             |  (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30 
01:30:40', '2022-11-30', 'austin','texas'),
+             |  (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30 
01:30:40', '2023-11-30', 'houston','texas')
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414520,'trip2','rider-B','driver-M',27.70,'2024-11-30 
01:30:40', '2024-11-30', 'sunnyvale','california'),
+             |  (1699349649,'trip5','rider-D','driver-Q',3.32, '2019-11-30 
01:30:40', '2019-11-30', 'san_diego','texas')
+             |""".stripMargin)
+
+        val tableSchema: StructType =
+          StructType(
+            Seq(
+              StructField("ts", LongType),
+              StructField("id", StringType),
+              StructField("rider", StringType),
+              StructField("driver", StringType),
+              StructField("fare", DoubleType),
+              StructField("dateDefault", StringType),
+              StructField("date", StringType),
+              StructField("city", StringType),
+              StructField("state", StringType)
+            )
+          )
+        val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+        spark.sql(s"create index idx_rider on $tableName using 
column_stats(rider) options(expr='upper')")
+        var metaClient = createMetaClient(spark, basePath)
+        // validate skipping with both types of expression
+        val riderExpr = resolveExpr(spark, 
unapply(functions.upper(functions.col("rider"))).get, tableSchema)
+        var literal = Literal.create("RIDER-A")
+        var dataFilter = EqualTo(riderExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)

Review Comment:
   Addressed



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala:
##########
@@ -875,6 +875,209 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  /**
+   * Test expression index partition pruning with partition stats.
+   */
+  @Test
+  def testPartitionPruningWithPartitionStats(): Unit = {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName + 
s"_pruning_partition_filters_$tableType"
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+        spark.sql(
+          s"""
+           CREATE TABLE $tableName (
+             |    ts LONG,
+             |    id STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    fare DOUBLE,
+             |    dateDefault STRING,
+             |    date STRING,
+             |    city STRING,
+             |    state STRING
+             |) USING HUDI
+             |options(
+             |    primaryKey ='id',
+             |    type = '$tableType',
+             |    hoodie.metadata.enable = 'true',
+             |    hoodie.datasource.write.recordkey.field = 'id',
+             |    hoodie.enable.data.skipping = 'true'
+             |)
+             |PARTITIONED BY (state)
+             |location '$basePath'
+             |""".stripMargin)
+
+        spark.sql("set hoodie.parquet.small.file.limit=0")
+        if (HoodieSparkUtils.gteqSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled=false")
+        }
+
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30 
01:30:40', '2020-11-30', 'san_francisco','california'),
+             |  (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30 
01:30:40', '2021-11-30', 'san_diego','california'),
+             |  (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30 
01:30:40', '2022-11-30', 'austin','texas'),
+             |  (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30 
01:30:40', '2023-11-30', 'houston','texas')
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414520,'trip2','rider-C','driver-M',27.70,'2024-11-30 
01:30:40', '2024-11-30', 'sunnyvale','california'),
+             |  (1699349649,'trip5','rider-A','driver-Q',3.32, '2019-11-30 
01:30:40', '2019-11-30', 'san_diego','texas')
+             |""".stripMargin)
+
+        val tableSchema: StructType =
+          StructType(
+            Seq(
+              StructField("ts", LongType),
+              StructField("id", StringType),
+              StructField("rider", StringType),
+              StructField("driver", StringType),
+              StructField("fare", DoubleType),
+              StructField("dateDefault", StringType),
+              StructField("date", StringType),
+              StructField("city", StringType),
+              StructField("state", StringType)
+            )
+          )
+        val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+        spark.sql(s"create index idx_ts on $tableName using column_stats(ts) 
options(expr='from_unixtime', format='yyyy-MM-dd')")
+        var metaClient = createMetaClient(spark, basePath)
+        // validate skipping with both types of expression
+        val fromUnixTimeExpr = resolveExpr(spark, 
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get, 
tableSchema)
+        var literal = Literal.create("2023-11-07")
+        var dataFilter = EqualTo(fromUnixTimeExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_ts on $tableName")
+
+        spark.sql(s"create index idx_unix on $tableName using 
column_stats(date) options(expr='unix_timestamp', format='yyyy-MM-dd')")
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        val unixTimestamp = resolveExpr(spark, 
unapply(functions.unix_timestamp(functions.col("date"), "yyyy-MM-dd")).get, 
tableSchema)
+        literal = Literal.create(1732924800L)
+        dataFilter = EqualTo(unixTimestamp, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_unix on $tableName")
+
+        spark.sql(s"create index idx_to_date on $tableName using 
column_stats(date) options(expr='to_date', format='yyyy-MM-dd')")
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        val toDate = resolveExpr(spark, 
unapply(functions.to_date(functions.col("date"), "yyyy-MM-dd")).get, 
tableSchema)
+        dataFilter = EqualTo(toDate, lit(18230).expr)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+        spark.sql(s"drop index idx_to_date on $tableName")
+      }
+    }
+  }
+
+  /**
+   * Test expression index pruning after update with partition stats.
+   */
+  @Test
+  def testPartitionPruningAfterUpdateWithPartitionStats(): Unit = {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val isTableMOR = tableType.equals("mor")
+        val tableName = generateTableName + 
s"_pruning_partition_filters_$tableType"
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+        spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
+
+        spark.sql(
+          s"""
+           CREATE TABLE $tableName (
+             |    ts LONG,
+             |    id STRING,
+             |    rider STRING,
+             |    driver STRING,
+             |    fare DOUBLE,
+             |    dateDefault STRING,
+             |    date STRING,
+             |    city STRING,
+             |    state STRING
+             |) USING HUDI
+             |options(
+             |    primaryKey ='id',
+             |    type = '$tableType',
+             |    hoodie.metadata.enable = 'true',
+             |    hoodie.datasource.write.recordkey.field = 'id',
+             |    hoodie.enable.data.skipping = 'true'
+             |)
+             |PARTITIONED BY (state)
+             |location '$basePath'
+             |""".stripMargin)
+
+        spark.sql("set hoodie.parquet.small.file.limit=0")
+        if (HoodieSparkUtils.gteqSpark3_4) {
+          spark.sql("set spark.sql.defaultColumn.enabled=false")
+        }
+
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414527,'trip1','rider-A','driver-K',19.10, '2020-11-30 
01:30:40', '2020-11-30', 'san_francisco','california'),
+             |  (1695414531,'trip6','rider-C','driver-K',17.14, '2021-11-30 
01:30:40', '2021-11-30', 'san_diego','california'),
+             |  (1695332066,'trip3','rider-E','driver-O',93.50, '2022-11-30 
01:30:40', '2022-11-30', 'austin','texas'),
+             |  (1695516137,'trip4','rider-F','driver-P',34.15, '2023-11-30 
01:30:40', '2023-11-30', 'houston','texas')
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName(ts, id, rider, driver, fare, dateDefault, 
date, city, state) VALUES
+             |  (1695414520,'trip2','rider-B','driver-M',27.70,'2024-11-30 
01:30:40', '2024-11-30', 'sunnyvale','california'),
+             |  (1699349649,'trip5','rider-D','driver-Q',3.32, '2019-11-30 
01:30:40', '2019-11-30', 'san_diego','texas')
+             |""".stripMargin)
+
+        val tableSchema: StructType =
+          StructType(
+            Seq(
+              StructField("ts", LongType),
+              StructField("id", StringType),
+              StructField("rider", StringType),
+              StructField("driver", StringType),
+              StructField("fare", DoubleType),
+              StructField("dateDefault", StringType),
+              StructField("date", StringType),
+              StructField("city", StringType),
+              StructField("state", StringType)
+            )
+          )
+        val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true")
+
+        spark.sql(s"create index idx_rider on $tableName using 
column_stats(rider) options(expr='upper')")
+        var metaClient = createMetaClient(spark, basePath)
+        // validate skipping with both types of expression
+        val riderExpr = resolveExpr(spark, 
unapply(functions.upper(functions.col("rider"))).get, tableSchema)
+        var literal = Literal.create("RIDER-A")
+        var dataFilter = EqualTo(riderExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true)
+
+        spark.sql(s"update $tableName set rider = 'rider-G' where id = 
'trip5'")
+        metaClient = createMetaClient(spark, basePath)
+        literal = Literal.create("RIDER-D")
+        dataFilter = EqualTo(riderExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true, isNoScanExpected = !isTableMOR)
+
+        if (isTableMOR) {
+          spark.sql("set hoodie.compact.inline=true")
+          spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+        }
+        spark.sql(s"update $tableName set rider = 'rider-H' where id = 
'trip5'")
+        metaClient = createMetaClient(spark, basePath)
+        literal = Literal.create("RIDER-D")
+        dataFilter = EqualTo(riderExpr, literal)
+        verifyPartitionPruning(opts, Seq(), Seq(dataFilter), metaClient, 
isDataSkippingExpected = true, isNoScanExpected = true)

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to