This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 60371b5dd5 [KYUUBI #7122] Support ORC hive table pushdown filter
60371b5dd5 is described below

commit 60371b5dd58592c66dcfb6883f9080abeeb8c2fa
Author: tian bao <[email protected]>
AuthorDate: Wed Jul 9 13:38:51 2025 +0800

    [KYUUBI #7122] Support ORC hive table pushdown filter
    
    ### Why are the changes needed?
    
    Previously, the `HiveScan` class was used to read data. If it is determined 
to be ORC type, the `ORCScan` from Spark datasourcev2 can be used. `ORCScan` 
supports pushfilter down, but `HiveScan` does not yet support it.
    
    In our testing, we are able to achieve approximately 2x performance 
improvement.
    
    The conversation can be controlled by setting 
`spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc`. When enabled, the 
data source ORC reader is used to process ORC tables created by using the 
HiveQL syntax, instead of Hive SerDe.
    
    close https://github.com/apache/kyuubi/issues/7122
    
    ### How was this patch tested?
    
    added unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #7123 from flaming-archer/master_scanbuilder_new.
    
    Closes #7122
    
    c3f412f90 [tian bao] add case _
    2be48909f [tian bao] Merge branch 'master_scanbuilder_new' of 
github.com:flaming-archer/kyuubi into master_scanbuilder_new
    c825d0f8c [tian bao] review change
    8a26d6a8a [tian bao] Update 
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
    68d41969f [tian bao] review change
    bed007fea [tian bao] review change
    b89e6e67a [tian bao] Optimize UT
    5a8941b2d [tian bao] fix failed ut
    dc1ba47e3 [tian bao] orc pushdown version 0
    
    Authored-by: tian bao <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/spark/connector/hive/HiveTable.scala    | 23 +++++-
 .../connector/hive/KyuubiHiveConnectorConf.scala   | 10 +++
 .../spark/connector/hive/read/HiveFileIndex.scala  |  7 ++
 .../spark/connector/hive/HiveCatalogSuite.scala    | 29 ++++++-
 .../spark/connector/hive/HiveQuerySuite.scala      | 93 ++++++++++++++++++++++
 .../spark/connector/hive/KyuubiHiveTest.scala      | 12 +++
 .../hive/command/DDLCommandTestUtils.scala         | 13 ---
 7 files changed, 170 insertions(+), 17 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
index ee6d5fc234..afa2527fd0 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.spark.connector.hive
 
 import java.util
+import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -31,10 +32,12 @@ import 
org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
 import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, 
LogicalExpressions}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
+import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
 import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, 
HiveScanBuilder}
 import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder
 
@@ -59,6 +62,20 @@ case class HiveTable(
       catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
   }
 
+  lazy val convertedProvider: Option[String] = {
+    val serde = 
catalogTable.storage.serde.getOrElse("").toUpperCase(Locale.ROOT)
+    val parquet = serde.contains("PARQUET")
+    val orc = serde.contains("ORC")
+    val provider = catalogTable.provider.map(_.toUpperCase(Locale.ROOT))
+    if (orc || provider.contains("ORC")) {
+      Some("ORC")
+    } else if (parquet || provider.contains("PARQUET")) {
+      Some("PARQUET")
+    } else {
+      None
+    }
+  }
+
   override def name(): String = catalogTable.identifier.unquotedString
 
   override def schema(): StructType = catalogTable.schema
@@ -77,7 +94,11 @@ case class HiveTable(
   }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
-    HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
+    convertedProvider match {
+      case Some("ORC") if 
sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) =>
+        OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+      case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, 
catalogTable)
+    }
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
index cc5ffde9ce..075f12060c 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.spark.connector.hive
 
 import java.util.Locale
 
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
 object KyuubiHiveConnectorConf {
 
   import org.apache.spark.sql.internal.SQLConf.buildStaticConf
@@ -39,4 +41,12 @@ object KyuubiHiveConnectorConf {
         "Invalid value for 
'spark.sql.kyuubi.hive.connector.externalCatalog.share.policy'." +
           "Valid values are 'ONE_FOR_ONE', 'ONE_FOR_ALL'.")
       .createWithDefault(OneForAllPolicy.name)
+
+  val READ_CONVERT_METASTORE_ORC =
+    buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc")
+      .doc("When enabled, the data source ORC reader is used to process " +
+        "ORC tables created by using the HiveQL syntax, instead of Hive 
SerDe.")
+      .version("1.11.0")
+      .booleanConf
+      .createWithDefault(true)
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 55c9168ed9..0142c55619 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -54,6 +54,13 @@ class HiveCatalogFileIndex(
 
   override def partitionSchema: StructType = table.partitionSchema
 
+  override def listFiles(
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    val fileIndex = filterPartitions(partitionFilters)
+    fileIndex.listFiles(partitionFilters, dataFilters)
+  }
+
   private[hive] def listHiveFiles(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression])
       : (Seq[PartitionDirectory], Map[PartitionDirectory, 
CatalogTablePartition]) = {
     val fileIndex = filterPartitions(partitionFilters)
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 0485087bf0..c083d75503 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -31,11 +31,13 @@ import 
org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
 import org.apache.kyuubi.spark.connector.hive.read.HiveScan
 
 class HiveCatalogSuite extends KyuubiHiveTest {
@@ -355,8 +357,29 @@ class HiveCatalogSuite extends KyuubiHiveTest {
     val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
     orcProps.put(TableCatalog.PROP_PROVIDER, "orc")
     val ot = catalog.createTable(orc_table, schema, Array.empty[Transform], 
orcProps)
-    val orcScan = ot.asInstanceOf[HiveTable]
-      
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
-    assert(orcScan.isSplitable(new Path("empty")))
+
+    Seq("true", "false").foreach { value =>
+      withSparkSession(Map(READ_CONVERT_METASTORE_ORC.key -> value)) { _ =>
+        val scan = ot.asInstanceOf[HiveTable]
+          .newScanBuilder(CaseInsensitiveStringMap.empty()).build()
+
+        val orcScan = value match {
+          case "true" =>
+            assert(
+              scan.isInstanceOf[OrcScan],
+              s"Expected OrcScan, got ${scan.getClass.getSimpleName}")
+            scan.asInstanceOf[OrcScan]
+          case "false" =>
+            assert(
+              scan.isInstanceOf[HiveScan],
+              s"Expected HiveScan, got ${scan.getClass.getSimpleName}")
+            scan.asInstanceOf[HiveScan]
+          case _ =>
+            throw new IllegalArgumentException(
+              s"Unexpected value: '$value'. Only 'true' or 'false' are 
allowed.")
+        }
+        assert(orcScan.isSplitable(new Path("empty")))
+      }
+    }
   }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 65ad81d95c..35b2f54766 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -260,6 +260,99 @@ class HiveQuerySuite extends KyuubiHiveTest {
     }
   }
 
+  test("ORC filter pushdown") {
+    val table = "hive.default.orc_filter_pushdown"
+    withTable(table) {
+      spark.sql(
+        s"""
+           | CREATE TABLE $table (
+           |  id INT,
+           |  data STRING,
+           |  value INT
+           |  ) PARTITIONED BY (dt STRING, region STRING)
+           |  STORED AS ORC
+           | """.stripMargin).collect()
+
+      // Insert test data with partitions
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-01', region='east')
+           | VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 
200)
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-01', region='west')
+           | VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 
400)
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           | INSERT INTO $table PARTITION (dt='2024-01-02', region='east')
+           | VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 
600)
+           | """.stripMargin)
+
+      // Test multiple partition filters
+      val df1 = spark.sql(
+        s"""
+           | SELECT * FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500
+           |""".stripMargin)
+      assert(df1.count() === 0)
+
+      // Test multiple partition filters
+      val df2 = spark.sql(
+        s"""
+           | SELECT * FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 150
+           |""".stripMargin)
+      assert(df2.count() === 2)
+      assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22))
+
+      // Test explain
+      val df3 = spark.sql(
+        s"""
+           | EXPLAIN SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1
+           |""".stripMargin)
+      assert(df3.count() === 1)
+      // contains like : PushedFilters: [IsNotNull(value), 
GreaterThan(value,1)]
+      assert(df3.collect().map(_.getString(0)).filter { s =>
+        s.contains("PushedFilters") && !s.contains("PushedFilters: []")
+      }.toSet.size == 1)
+
+      // Test aggregation pushdown partition filters
+      spark.conf.set("spark.sql.orc.aggregatePushdown", true)
+
+      // Test aggregation pushdown partition filters
+      val df4 = spark.sql(
+        s"""
+           | SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east'
+           | group by dt, region
+           | """.stripMargin)
+      assert(df4.count() === 1)
+      assert(df4.collect().map(_.getLong(0)).toSet === Set(4L))
+
+      val df5 = spark.sql(
+        s"""
+           | EXPLAIN SELECT count(*) as total_rows
+           | FROM $table
+           | WHERE dt = '2024-01-01' AND region = 'east'
+           | group by dt, region
+           | """.stripMargin)
+      assert(df5.count() === 1)
+      // contains like :  PushedAggregation: [COUNT(*)],
+      assert(df5.collect().map(_.getString(0)).filter { s =>
+        s.contains("PushedAggregation") && !s.contains("PushedAggregation: []")
+      }.toSet.size == 1)
+
+      spark.conf.set("spark.sql.orc.aggregatePushdown", false)
+
+    }
+  }
+
   private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = 
{
     withSparkSession() { spark =>
       val table = "hive.default.employee"
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index 851659b15e..fb5bcd6218 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{QueryTest, SparkSession}
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, 
TableCatalog}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils
 
 import org.apache.kyuubi.spark.connector.common.LocalSparkSession
 
@@ -77,5 +78,16 @@ abstract class KyuubiHiveTest extends QueryTest with Logging 
{
     f(innerSpark)
   }
 
+  /**
+   * Drops table `tableName` after calling `f`.
+   */
+  protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+    Utils.tryWithSafeFinally(f) {
+      tableNames.foreach { name =>
+        spark.sql(s"DROP TABLE IF EXISTS $name")
+      }
+    }
+  }
+
   override def spark: SparkSession = innerSpark
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
index 83fd95b6b3..2a56025540 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
@@ -78,19 +78,6 @@ trait DDLCommandTestUtils extends KyuubiHiveTest {
     fs.makeQualified(hadoopPath).toUri
   }
 
-  /**
-   * Drops table `tableName` after calling `f`.
-   */
-  protected def withTable(tableNames: String*)(f: => Unit): Unit = {
-    try {
-      f
-    } finally {
-      tableNames.foreach { name =>
-        spark.sql(s"DROP TABLE IF EXISTS $name")
-      }
-    }
-  }
-
   protected def withNamespaceAndTable(
       ns: String,
       tableName: String,

Reply via email to