This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fa9ca56  [SPARK-30972][SQL] PruneHiveTablePartitions should be 
executed as earlyScanPushDownRules
fa9ca56 is described below

commit fa9ca56ef03fee37f90edd4aae18728a4b58aea7
Author: yi.wu <[email protected]>
AuthorDate: Fri Feb 28 11:50:20 2020 +0800

    [SPARK-30972][SQL] PruneHiveTablePartitions should be executed as 
earlyScanPushDownRules
    
    ### What changes were proposed in this pull request?
    
    Make rule `PruneHiveTablePartitions` to execute as `earlyScanPushDownRules`.
    
    ### Why are the changes needed?
    
    Similar to rule `PruneFileSourcePartitions`, `PruneHiveTablePartitions` 
should also be executed as earlyScanPushDownRules to eliminate the impact on 
statistic computation later.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass Jenkins.
    
    Closes #27723 from Ngone51/early_hive_prune.
    
    Authored-by: yi.wu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit a1d2ce90b0954a6ff2da630893d48ba0e7594b08)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala    | 12 ++++++++----
 .../spark/sql/internal/BaseSessionStateBuilder.scala       | 11 +++++++++++
 .../apache/spark/sql/hive/HiveSessionStateBuilder.scala    | 14 ++------------
 3 files changed, 21 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 45e5f41..47284f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -133,10 +133,14 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
               CaseInsensitiveMap(relation.tableMeta.storage.properties)
             val timeZoneId = 
caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
               .getOrElse(SQLConf.get.sessionLocalTimeZone)
-            val partitions = if (partFilters.nonEmpty) {
-              catalog.listPartitionsByFilter(relation.tableMeta.identifier, 
normalizedFilters)
-            } else {
-              catalog.listPartitions(relation.tableMeta.identifier)
+            val partitions = relation.prunedPartitions match {
+              // for the case where partitions have already been pruned by 
PruneHiveTablePartitions
+              case Some(parts) => parts
+              case None => if (partFilters.nonEmpty) {
+                catalog.listPartitionsByFilter(relation.tableMeta.identifier, 
normalizedFilters)
+              } else {
+                catalog.listPartitions(relation.tableMeta.identifier)
+              }
             }
 
             val partitionData = partitions.map { p =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 20e1b56..9556d4d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -232,6 +232,9 @@ abstract class BaseSessionStateBuilder(
    */
   protected def optimizer: Optimizer = {
     new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
+      override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
+        super.earlyScanPushDownRules ++ customEarlyScanPushDownRules
+
       override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
         super.extendedOperatorOptimizationRules ++ 
customOperatorOptimizationRules
     }
@@ -248,6 +251,14 @@ abstract class BaseSessionStateBuilder(
   }
 
   /**
+   * Custom early scan push down rules to add to the Optimizer. Prefer 
overriding this instead
+   * of creating your own Optimizer.
+   *
+   * Note that this may NOT depend on the `optimizer` function.
+   */
+  protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil
+
+  /**
    * Planner that converts optimized logical plans to physical plans.
    *
    * Note: this depends on the `conf` and `experimentalMethods` fields.
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 7ec52ba..6472675 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -98,18 +98,8 @@ class HiveSessionStateBuilder(session: SparkSession, 
parentState: Option[Session
         customCheckRules
   }
 
-  /**
-   * Logical query plan optimizer that takes into account Hive.
-   */
-  override protected def optimizer: Optimizer = {
-    new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
-      override def postHocOptimizationBatches: Seq[Batch] = 
super.postHocOptimizationBatches ++
-        Seq(Batch("Prune Hive Table Partitions", Once, new 
PruneHiveTablePartitions(session)))
-
-      override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
-        super.extendedOperatorOptimizationRules ++ 
customOperatorOptimizationRules
-    }
-  }
+  override def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
+    Seq(new PruneHiveTablePartitions(session))
 
   /**
    * Planner that takes into account Hive-specific strategies.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to