This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 08d61e8f8bc [SPARK-39073][SQL] Keep rowCount after hive table 
partition pruning if table only have hive statistics
08d61e8f8bc is described below

commit 08d61e8f8bcc7f5ea657b0580535688632198e74
Author: qiuliang988 <[email protected]>
AuthorDate: Mon May 16 13:27:16 2022 +0800

    [SPARK-39073][SQL] Keep rowCount after hive table partition pruning if 
table only have hive statistics
    
    ### What changes were proposed in this pull request?
    This pr keep rowCount after hive table partition pruning if table only have 
hive statistics
    
    ### Why are the changes needed?
    Improve query performance. Since 
[SPARK-34119](https://issues.apache.org/jira/browse/SPARK-34119) keep necessary 
stats after partition pruning, But if the table only has Statistics generated 
by Hive, the HiveTableRelation does not have Statistics such as rowCount.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #36412 from qiuliang988/SPARK-39073.
    
    Authored-by: qiuliang988 <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../hive/execution/PruneHiveTablePartitions.scala  |  7 ++++++-
 .../execution/PruneHiveTablePartitionsSuite.scala  | 24 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
index 1bd47d7d7a7..395ee86579e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala
@@ -80,10 +80,15 @@ private[sql] class PruneHiveTablePartitions(session: 
SparkSession)
       val colStats = filteredStats.map(_.attributeStats.map { case (attr, 
colStat) =>
         (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
       })
+      val rowCount = if 
(prunedPartitions.forall(_.stats.flatMap(_.rowCount).exists(_ > 0))) {
+        Option(prunedPartitions.map(_.stats.get.rowCount.get).sum)
+      } else {
+        filteredStats.flatMap(_.rowCount)
+      }
       relation.tableMeta.copy(
         stats = Some(CatalogStatistics(
           sizeInBytes = BigInt(sizeOfPartitions.sum),
-          rowCount = filteredStats.flatMap(_.rowCount),
+          rowCount = rowCount,
           colStats = colStats.getOrElse(Map.empty))))
     } else {
       relation.tableMeta
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
index 5b2a1d4e0c2..42601be08e1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -138,6 +139,29 @@ class PruneHiveTablePartitionsSuite extends 
PrunePartitionSuiteBase with TestHiv
     }
   }
 
+  test("SPARK-39073: Keep rowCount after PruneHiveTablePartitions " +
+    "if table only has hive statistics") {
+    withTable("SPARK_39073") {
+      withSQLConf(
+        SQLConf.CBO_ENABLED.key -> "true",
+        "hive.exec.dynamic.partition.mode" -> "nonstrict") {
+        sql(s"CREATE TABLE SPARK_39073 PARTITIONED BY (p) STORED AS textfile 
AS " +
+          "(SELECT id, CAST(id % 5 AS STRING) AS p FROM range(20))")
+        val newPartitions = hiveClient.getPartitions("default", "SPARK_39073", 
None).map(p => {
+          val map = Map[String, String](
+            "numRows" -> "4", "rawDataSize" -> "6", "totalSize" -> "10")
+          CatalogTablePartition(
+            p.spec, p.storage, p.parameters ++ map, p.createTime, 
p.lastAccessTime, p.stats)
+        })
+        hiveClient.alterPartitions("default", "SPARK_39073", newPartitions)
+        checkOptimizedPlanStats(sql("SELECT id FROM SPARK_39073 WHERE p = 
'2'"),
+          64L,
+          Some(4),
+          Seq.empty)
+      }
+    }
+  }
+
   test("SPARK-36128: spark.sql.hive.metastorePartitionPruning should work for 
file data sources") {
     Seq(true, false).foreach { enablePruning =>
       withTable("tbl") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to