This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 60371b5dd5 [KYUUBI #7122] Support ORC hive table pushdown filter
60371b5dd5 is described below
commit 60371b5dd58592c66dcfb6883f9080abeeb8c2fa
Author: tian bao <[email protected]>
AuthorDate: Wed Jul 9 13:38:51 2025 +0800
[KYUUBI #7122] Support ORC hive table pushdown filter
### Why are the changes needed?
Previously, the `HiveScan` class was used to read data. If it is determined
to be ORC type, the `ORCScan` from Spark datasourcev2 can be used. `ORCScan`
supports pushfilter down, but `HiveScan` does not yet support it.
In our testing, we are able to achieve approximately 2x performance
improvement.
The conversation can be controlled by setting
`spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc`. When enabled, the
data source ORC reader is used to process ORC tables created by using the
HiveQL syntax, instead of Hive SerDe.
close https://github.com/apache/kyuubi/issues/7122
### How was this patch tested?
added unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #7123 from flaming-archer/master_scanbuilder_new.
Closes #7122
c3f412f90 [tian bao] add case _
2be48909f [tian bao] Merge branch 'master_scanbuilder_new' of
github.com:flaming-archer/kyuubi into master_scanbuilder_new
c825d0f8c [tian bao] review change
8a26d6a8a [tian bao] Update
extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
68d41969f [tian bao] review change
bed007fea [tian bao] review change
b89e6e67a [tian bao] Optimize UT
5a8941b2d [tian bao] fix failed ut
dc1ba47e3 [tian bao] orc pushdown version 0
Authored-by: tian bao <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/spark/connector/hive/HiveTable.scala | 23 +++++-
.../connector/hive/KyuubiHiveConnectorConf.scala | 10 +++
.../spark/connector/hive/read/HiveFileIndex.scala | 7 ++
.../spark/connector/hive/HiveCatalogSuite.scala | 29 ++++++-
.../spark/connector/hive/HiveQuerySuite.scala | 93 ++++++++++++++++++++++
.../spark/connector/hive/KyuubiHiveTest.scala | 12 +++
.../hive/command/DDLCommandTestUtils.scala | 13 ---
7 files changed, 170 insertions(+), 17 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
index ee6d5fc234..afa2527fd0 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.spark.connector.hive
import java.util
+import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -31,10 +32,12 @@ import
org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper,
LogicalExpressions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex,
HiveScanBuilder}
import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder
@@ -59,6 +62,20 @@ case class HiveTable(
catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
}
+ lazy val convertedProvider: Option[String] = {
+ val serde =
catalogTable.storage.serde.getOrElse("").toUpperCase(Locale.ROOT)
+ val parquet = serde.contains("PARQUET")
+ val orc = serde.contains("ORC")
+ val provider = catalogTable.provider.map(_.toUpperCase(Locale.ROOT))
+ if (orc || provider.contains("ORC")) {
+ Some("ORC")
+ } else if (parquet || provider.contains("PARQUET")) {
+ Some("PARQUET")
+ } else {
+ None
+ }
+ }
+
override def name(): String = catalogTable.identifier.unquotedString
override def schema(): StructType = catalogTable.schema
@@ -77,7 +94,11 @@ case class HiveTable(
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
- HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
+ convertedProvider match {
+ case Some("ORC") if
sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) =>
+ OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+ case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema,
catalogTable)
+ }
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
index cc5ffde9ce..075f12060c 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala
@@ -19,6 +19,8 @@ package org.apache.kyuubi.spark.connector.hive
import java.util.Locale
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
object KyuubiHiveConnectorConf {
import org.apache.spark.sql.internal.SQLConf.buildStaticConf
@@ -39,4 +41,12 @@ object KyuubiHiveConnectorConf {
"Invalid value for
'spark.sql.kyuubi.hive.connector.externalCatalog.share.policy'." +
"Valid values are 'ONE_FOR_ONE', 'ONE_FOR_ALL'.")
.createWithDefault(OneForAllPolicy.name)
+
+ val READ_CONVERT_METASTORE_ORC =
+ buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreOrc")
+ .doc("When enabled, the data source ORC reader is used to process " +
+ "ORC tables created by using the HiveQL syntax, instead of Hive
SerDe.")
+ .version("1.11.0")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
index 55c9168ed9..0142c55619 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala
@@ -54,6 +54,13 @@ class HiveCatalogFileIndex(
override def partitionSchema: StructType = table.partitionSchema
+ override def listFiles(
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ val fileIndex = filterPartitions(partitionFilters)
+ fileIndex.listFiles(partitionFilters, dataFilters)
+ }
+
private[hive] def listHiveFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression])
: (Seq[PartitionDirectory], Map[PartitionDirectory,
CatalogTablePartition]) = {
val fileIndex = filterPartitions(partitionFilters)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 0485087bf0..c083d75503 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -31,11 +31,13 @@ import
org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchT
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC
import org.apache.kyuubi.spark.connector.hive.read.HiveScan
class HiveCatalogSuite extends KyuubiHiveTest {
@@ -355,8 +357,29 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
orcProps.put(TableCatalog.PROP_PROVIDER, "orc")
val ot = catalog.createTable(orc_table, schema, Array.empty[Transform],
orcProps)
- val orcScan = ot.asInstanceOf[HiveTable]
-
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
- assert(orcScan.isSplitable(new Path("empty")))
+
+ Seq("true", "false").foreach { value =>
+ withSparkSession(Map(READ_CONVERT_METASTORE_ORC.key -> value)) { _ =>
+ val scan = ot.asInstanceOf[HiveTable]
+ .newScanBuilder(CaseInsensitiveStringMap.empty()).build()
+
+ val orcScan = value match {
+ case "true" =>
+ assert(
+ scan.isInstanceOf[OrcScan],
+ s"Expected OrcScan, got ${scan.getClass.getSimpleName}")
+ scan.asInstanceOf[OrcScan]
+ case "false" =>
+ assert(
+ scan.isInstanceOf[HiveScan],
+ s"Expected HiveScan, got ${scan.getClass.getSimpleName}")
+ scan.asInstanceOf[HiveScan]
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Unexpected value: '$value'. Only 'true' or 'false' are
allowed.")
+ }
+ assert(orcScan.isSplitable(new Path("empty")))
+ }
+ }
}
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
index 65ad81d95c..35b2f54766 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala
@@ -260,6 +260,99 @@ class HiveQuerySuite extends KyuubiHiveTest {
}
}
+ test("ORC filter pushdown") {
+ val table = "hive.default.orc_filter_pushdown"
+ withTable(table) {
+ spark.sql(
+ s"""
+ | CREATE TABLE $table (
+ | id INT,
+ | data STRING,
+ | value INT
+ | ) PARTITIONED BY (dt STRING, region STRING)
+ | STORED AS ORC
+ | """.stripMargin).collect()
+
+ // Insert test data with partitions
+ spark.sql(
+ s"""
+ | INSERT INTO $table PARTITION (dt='2024-01-01', region='east')
+ | VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b',
200)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ | INSERT INTO $table PARTITION (dt='2024-01-01', region='west')
+ | VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd',
400)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ | INSERT INTO $table PARTITION (dt='2024-01-02', region='east')
+ | VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff',
600)
+ | """.stripMargin)
+
+ // Test multiple partition filters
+ val df1 = spark.sql(
+ s"""
+ | SELECT * FROM $table
+ | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500
+ |""".stripMargin)
+ assert(df1.count() === 0)
+
+ // Test multiple partition filters
+ val df2 = spark.sql(
+ s"""
+ | SELECT * FROM $table
+ | WHERE dt = '2024-01-01' AND region = 'east' AND value > 150
+ |""".stripMargin)
+ assert(df2.count() === 2)
+ assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22))
+
+ // Test explain
+ val df3 = spark.sql(
+ s"""
+ | EXPLAIN SELECT count(*) as total_rows
+ | FROM $table
+ | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1
+ |""".stripMargin)
+ assert(df3.count() === 1)
+ // contains like : PushedFilters: [IsNotNull(value),
GreaterThan(value,1)]
+ assert(df3.collect().map(_.getString(0)).filter { s =>
+ s.contains("PushedFilters") && !s.contains("PushedFilters: []")
+ }.toSet.size == 1)
+
+ // Test aggregation pushdown partition filters
+ spark.conf.set("spark.sql.orc.aggregatePushdown", true)
+
+ // Test aggregation pushdown partition filters
+ val df4 = spark.sql(
+ s"""
+ | SELECT count(*) as total_rows
+ | FROM $table
+ | WHERE dt = '2024-01-01' AND region = 'east'
+ | group by dt, region
+ | """.stripMargin)
+ assert(df4.count() === 1)
+ assert(df4.collect().map(_.getLong(0)).toSet === Set(4L))
+
+ val df5 = spark.sql(
+ s"""
+ | EXPLAIN SELECT count(*) as total_rows
+ | FROM $table
+ | WHERE dt = '2024-01-01' AND region = 'east'
+ | group by dt, region
+ | """.stripMargin)
+ assert(df5.count() === 1)
+ // contains like : PushedAggregation: [COUNT(*)],
+ assert(df5.collect().map(_.getString(0)).filter { s =>
+ s.contains("PushedAggregation") && !s.contains("PushedAggregation: []")
+ }.toSet.size == 1)
+
+ spark.conf.set("spark.sql.orc.aggregatePushdown", false)
+
+ }
+ }
+
private def readPartitionedTable(format: String, hiveTable: Boolean): Unit =
{
withSparkSession() { spark =>
val table = "hive.default.employee"
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index 851659b15e..fb5bcd6218 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -21,6 +21,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.catalog.{SupportsNamespaces,
TableCatalog}
+import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils
import org.apache.kyuubi.spark.connector.common.LocalSparkSession
@@ -77,5 +78,16 @@ abstract class KyuubiHiveTest extends QueryTest with Logging
{
f(innerSpark)
}
+ /**
+ * Drops table `tableName` after calling `f`.
+ */
+ protected def withTable(tableNames: String*)(f: => Unit): Unit = {
+ Utils.tryWithSafeFinally(f) {
+ tableNames.foreach { name =>
+ spark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+
override def spark: SparkSession = innerSpark
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
index 83fd95b6b3..2a56025540 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/DDLCommandTestUtils.scala
@@ -78,19 +78,6 @@ trait DDLCommandTestUtils extends KyuubiHiveTest {
fs.makeQualified(hadoopPath).toUri
}
- /**
- * Drops table `tableName` after calling `f`.
- */
- protected def withTable(tableNames: String*)(f: => Unit): Unit = {
- try {
- f
- } finally {
- tableNames.foreach { name =>
- spark.sql(s"DROP TABLE IF EXISTS $name")
- }
- }
- }
-
protected def withNamespaceAndTable(
ns: String,
tableName: String,