This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new 47063d9264 [KYUUBI #7129] Support PARQUET hive table pushdown filter 47063d9264 is described below commit 47063d926424757e75abd4addafb9a38220eefd0 Author: tian bao <2011xues...@gmail.com> AuthorDate: Thu Jul 17 14:42:46 2025 +0800 [KYUUBI #7129] Support PARQUET hive table pushdown filter ### Why are the changes needed? Previously, the `HiveScan` class was used to read data. If it is determined to be PARQUET type, the `ParquetScan` from Spark datasourcev2 can be used. `ParquetScan` supports pushfilter down, but `HiveScan` does not yet support it. The conversation can be controlled by setting `spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet`. When enabled, the data source PARQUET reader is used to process PARQUET tables created by using the HiveQL syntax, instead of Hive SerDe. close https://github.com/apache/kyuubi/issues/7129 ### How was this patch tested? added unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #7130 from flaming-archer/master_parquet_filterdown. Closes #7129 d7059dca4 [tian bao] Support PARQUET hive table pushdown filter Authored-by: tian bao <2011xues...@gmail.com> Signed-off-by: Cheng Pan <cheng...@apache.org> --- .../kyuubi/spark/connector/hive/HiveTable.scala | 6 +- .../connector/hive/KyuubiHiveConnectorConf.scala | 8 ++ .../spark/connector/hive/HiveCatalogSuite.scala | 30 ++++++- .../spark/connector/hive/HiveQuerySuite.scala | 93 ++++++++++++++++++++++ 4 files changed, 132 insertions(+), 5 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala index afa2527fd0..27749158b9 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala @@ -33,11 +33,12 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET} import org.apache.kyuubi.spark.connector.hive.read.{HiveCatalogFileIndex, HiveScanBuilder} import org.apache.kyuubi.spark.connector.hive.write.HiveWriteBuilder @@ -97,6 +98,9 @@ case class HiveTable( convertedProvider match { case Some("ORC") if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_ORC) => OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + case Some("PARQUET") + if sparkSession.sessionState.conf.getConf(READ_CONVERT_METASTORE_PARQUET) => + ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) case _ => HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable) } } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala index 075f12060c..98968a7d41 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala @@ -49,4 +49,12 @@ object KyuubiHiveConnectorConf { .version("1.11.0") .booleanConf .createWithDefault(true) + + val READ_CONVERT_METASTORE_PARQUET = + buildConf("spark.sql.kyuubi.hive.connector.read.convertMetastoreParquet") + .doc("When enabled, the data source PARQUET reader is used to process " + + "PARQUET tables created by using the HiveQL syntax, instead of Hive SerDe.") + .version("1.11.0") + .booleanConf + .createWithDefault(true) } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index c083d75503..da53b89892 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -32,12 +32,13 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper -import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.READ_CONVERT_METASTORE_ORC +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET} import org.apache.kyuubi.spark.connector.hive.read.HiveScan class HiveCatalogSuite extends KyuubiHiveTest { @@ -349,9 +350,30 @@ class HiveCatalogSuite extends KyuubiHiveTest { val parProps: util.Map[String, String] = new util.HashMap[String, String]() parProps.put(TableCatalog.PROP_PROVIDER, "parquet") val pt = catalog.createTable(parquet_table, schema, Array.empty[Transform], parProps) - val parScan = pt.asInstanceOf[HiveTable] - .newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan] - assert(parScan.isSplitable(new Path("empty"))) + + Seq("true", "false").foreach { value => + withSparkSession(Map(READ_CONVERT_METASTORE_PARQUET.key -> value)) { _ => + val scan = pt.asInstanceOf[HiveTable] + .newScanBuilder(CaseInsensitiveStringMap.empty()).build() + + val parScan = value match { + case "true" => + assert( + scan.isInstanceOf[ParquetScan], + s"Expected ParquetScan, got ${scan.getClass.getSimpleName}") + scan.asInstanceOf[ParquetScan] + case "false" => + assert( + scan.isInstanceOf[HiveScan], + s"Expected HiveScan, got ${scan.getClass.getSimpleName}") + scan.asInstanceOf[HiveScan] + case _ => + throw new IllegalArgumentException( + s"Unexpected value: '$value'. Only 'true' or 'false' are allowed.") + } + assert(parScan.isSplitable(new Path("empty"))) + } + } val orc_table = Identifier.of(testNs, "orc_table") val orcProps: util.Map[String, String] = new util.HashMap[String, String]() diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala index 35b2f54766..1e0b1ea756 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveQuerySuite.scala @@ -353,6 +353,99 @@ class HiveQuerySuite extends KyuubiHiveTest { } } + test("PARQUET filter pushdown") { + val table = "hive.default.parquet_filter_pushdown" + withTable(table) { + spark.sql( + s""" + | CREATE TABLE $table ( + | id INT, + | data STRING, + | value INT + | ) PARTITIONED BY (dt STRING, region STRING) + | STORED AS PARQUET + | """.stripMargin).collect() + + // Insert test data with partitions + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-01', region='east') + | VALUES (1, 'a', 100), (2, 'b', 200), (11, 'aa', 100), (22, 'b', 200) + |""".stripMargin) + + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-01', region='west') + | VALUES (3, 'c', 300), (4, 'd', 400), (33, 'cc', 300), (44, 'dd', 400) + |""".stripMargin) + spark.sql( + s""" + | INSERT INTO $table PARTITION (dt='2024-01-02', region='east') + | VALUES (5, 'e', 500), (6, 'f', 600), (55, 'ee', 500), (66, 'ff', 600) + | """.stripMargin) + + // Test multiple partition filters + val df1 = spark.sql( + s""" + | SELECT * FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1500 + |""".stripMargin) + assert(df1.count() === 0) + + // Test multiple partition filters + val df2 = spark.sql( + s""" + | SELECT * FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 150 + |""".stripMargin) + assert(df2.count() === 2) + assert(df2.collect().map(_.getInt(0)).toSet === Set(2, 22)) + + // Test explain + val df3 = spark.sql( + s""" + | EXPLAIN SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' AND value > 1 + |""".stripMargin) + assert(df3.count() === 1) + // contains like : PushedFilters: [IsNotNull(value), GreaterThan(value,1)] + assert(df3.collect().map(_.getString(0)).filter { s => + s.contains("PushedFilters") && !s.contains("PushedFilters: []") + }.toSet.size == 1) + + // Test aggregation pushdown partition filters + spark.conf.set("spark.sql.parquet.aggregatePushdown", true) + + // Test aggregation pushdown partition filters + val df4 = spark.sql( + s""" + | SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' + | group by dt, region + | """.stripMargin) + assert(df4.count() === 1) + assert(df4.collect().map(_.getLong(0)).toSet === Set(4L)) + + val df5 = spark.sql( + s""" + | EXPLAIN SELECT count(*) as total_rows + | FROM $table + | WHERE dt = '2024-01-01' AND region = 'east' + | group by dt, region + | """.stripMargin) + assert(df5.count() === 1) + // contains like : PushedAggregation: [COUNT(*)], + assert(df5.collect().map(_.getString(0)).filter { s => + s.contains("PushedAggregation") && !s.contains("PushedAggregation: []") + }.toSet.size == 1) + + spark.conf.set("spark.sql.parquet.aggregatePushdown", false) + + } + } + private def readPartitionedTable(format: String, hiveTable: Boolean): Unit = { withSparkSession() { spark => val table = "hive.default.employee"