This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47063d9264 [KYUUBI #7129] Support PARQUET hive table pushdown filter
47063d9264 is described below

commit 47063d926424757e75abd4addafb9a38220eefd0
Author: tian bao <2011xues...@gmail.com>
AuthorDate: Thu Jul 17 14:42:46 2025 +0800

    [KYUUBI #7129] Support PARQUET hive table pushdown filter
    
    ### Why are the changes needed?
    
    Previously, the `HiveScan` class was used to read data. If it is determined 
to be PARQUET type, the `ParquetScan` from Spark datasourcev2 can be used. 
`ParquetScan` supports pushfilter down, but `HiveScan` does not yet support it.
    
    The conversation can be controlled by setting 
`spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet`. When enabled, 
the data source PARQUET reader is used to process PARQUET tables created by 
using the HiveQL syntax, instead of Hive SerDe.
    
    close https://github.com/apache/kyuubi/issues/7129
    
    ### How was this patch tested?
    
    added unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #7130 from flaming-archer/master_parquet_filterdown.
    
    Closes #7129
    
    d7059dca4 [tian bao] Support PARQUET hive table pushdown filter
    
    Authored-by: tian bao <2011xues...@gmail.com>
    Signed-off-by: Cheng Pan <cheng...@apache.org>
---
 .../kyuubi/spark/connector/hive/HiveTable.scala    |  6 +-
 .../connector/hive/KyuubiHiveConnectorConf.scala   |  8 ++
 .../spark/connector/hive/HiveCatalogSuite.scala    | 30 ++++++-
 .../spark/connector/hive/HiveQuerySuite.scala      | 93 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 5 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
index afa2527fd0..27749158b9 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -33,11 +33,12 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
 import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, 
LogicalExpressions}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
+import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC,
 READ_CONVERT_METASTORE_PARQUET}
 import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, 
HiveScanBuilder}
 import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder
 
@@ -97,6 +98,9 @@ case class HiveTable(
     convertedProvider match {
       case Some("ORC") if 
sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) =>
         OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+      case Some("PARQUET")
+          if 
sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_PARQUET) =>
+        ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, 
options)
       case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, 
catalogTable)
     }
   }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
index 075f12060c..98968a7d41 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
@@ -49,4 +49,12 @@ object KyuubiHiveConnectorConf {
       .version("1.11.0")
       .booleanConf
       .createWithDefault(true)
+
+  val READ_CONVERT_METASTORE_PARQUET =
+    buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet")
+      .doc("When enabled, the data source PARQUET reader is used to process " +
+        "PARQUET tables created by using the HiveQL syntax, instead of Hive 
SerDe.")
+      .version("1.11.0")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index c083d75503..da53b89892 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -32,12 +32,13 @@ import 
org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
-import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
+import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC,
 READ_CONVERT_METASTORE_PARQUET}
 import org.apache.kyuubi.spark.connector.hive.read.HiveScan
 
 class HiveCatalogSuite extends KyuubiHiveTest {
@@ -349,9 +350,30 @@ class HiveCatalogSuite extends KyuubiHiveTest {
     val parProps: util.Map[String, String] = new util.HashMap[String, String]()
     parProps.put(TableCatalog.PROP_PROVIDER, "parquet")
     val pt = catalog.createTable(parquet_table, schema, 
Array.empty[Transform], parProps)
-    val parScan = pt.asInstanceOf[HiveTable]
-      
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
-    assert(parScan.isSplitable(new Path("empty")))
+
+    Seq("true", "false").foreach { value =>
+      withSparkSession(Map(READ_CONVERT_METASTORE_PARQUET.key -> value)) { _ =>
+        val scan = pt.asInstanceOf[HiveTable]
+          .newScanBuilder(CaseInsensitiveStringMap.empty()).build()
+
+        val parScan = value match {
+          case "true" =>
+            assert(
+              scan.isInstanceOf[ParquetScan],
+              s"Expected ParquetScan, got ${scan.getClass.getSimpleName}")
+            scan.asInstanceOf[ParquetScan]
+          case "false" =>
+            assert(
+              scan.isInstanceOf[HiveScan],
+              s"Expected HiveScan, got ${scan.getClass.getSimpleName}")
+            scan.asInstanceOf[HiveScan]
+          case _ =>
+            throw new IllegalArgumentException(
+              s"Unexpected value: '$value'. Only 'true' or 'false' are 
allowed.")
+        }
+        assert(parScan.isSplitable(new Path("empty")))
+      }
+    }
 
     val orc_table = Identifier.of(testNs, "orc_table")
     val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 35b2f54766..1e0b1ea756 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -353,6 +353,99 @@ class HiveQuerySuite extends KyuubiHiveTest {
     }
   }
 
+  test("PARQUET filter pushdown") {
+    val table = "hive.default.parquet_filter_pushdown"
+    withTable(table) {
+      spark.sql(
+        s"""
+           | CREATE TABLE $table (
+           |  id INT,
+           |  data STRING,
+           |  value INT
+           |  ) PARTITIONED BY (dt STRING, region STRING)
+           |  STORED AS PARQUET
+           | """.stripMargin).collect()
+
+      // Insert test data with partitions
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-01', region='east')
+           | VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 
200)
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-01', region='west')
+           | VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 
400)
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-02', region='east')
+           | VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 
600)
+           | """.stripMargin)
+
+      // Test multiple partition filters
+      val df1 = spark.sql(
+        s"""
+           | SELECT * FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500
+           |""".stripMargin)
+      assert(df1.count() === 0)
+
+      // Test multiple partition filters
+      val df2 = spark.sql(
+        s"""
+           | SELECT * FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 150
+           |""".stripMargin)
+      assert(df2.count() === 2)
+      assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22))
+
+      // Test explain
+      val df3 = spark.sql(
+        s"""
+           | EXPLAIN SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1
+           |""".stripMargin)
+      assert(df3.count() === 1)
+      // contains like : PushedFilters: [IsNotNull(value), 
GreaterThan(value,1)]
+      assert(df3.collect().map(_.getString(0)).filter { s =>
+        s.contains("PushedFilters") && !s.contains("PushedFilters: []")
+      }.toSet.size == 1)
+
+      // Test aggregation pushdown partition filters
+      spark.conf.set("spark.sql.parquet.aggregatePushdown", true)
+
+      // Test aggregation pushdown partition filters
+      val df4 = spark.sql(
+        s"""
+           | SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east'
+           | group by dt, region
+           | """.stripMargin)
+      assert(df4.count() === 1)
+      assert(df4.collect().map(_.getLong(0)).toSet === Set(4L))
+
+      val df5 = spark.sql(
+        s"""
+           | EXPLAIN SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east'
+           | group by dt, region
+           | """.stripMargin)
+      assert(df5.count() === 1)
+      // contains like :  PushedAggregation: [COUNT(*)],
+      assert(df5.collect().map(_.getString(0)).filter { s =>
+        s.contains("PushedAggregation") && !s.contains("PushedAggregation: []")
+      }.toSet.size == 1)
+
+      spark.conf.set("spark.sql.parquet.aggregatePushdown", false)
+
+    }
+  }
+
   private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = 
{
     withSparkSession() { spark =>
       val table = "hive.default.employee"

Reply via email to