This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fa9ca56 [SPARK-30972][SQL] PruneHiveTablePartitions should be
executed as earlyScanPushDownRules
fa9ca56 is described below
commit fa9ca56ef03fee37f90edd4aae18728a4b58aea7
Author: yi.wu <[email protected]>
AuthorDate: Fri Feb 28 11:50:20 2020 +0800
[SPARK-30972][SQL] PruneHiveTablePartitions should be executed as
earlyScanPushDownRules
### What changes were proposed in this pull request?
Make rule `PruneHiveTablePartitions` to execute as `earlyScanPushDownRules`.
### Why are the changes needed?
Similar to rule `PruneFileSourcePartitions`, `PruneHiveTablePartitions`
should also be executed as earlyScanPushDownRules to eliminate the impact on
statistic computation later.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Pass Jenkins.
Closes #27723 from Ngone51/early_hive_prune.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit a1d2ce90b0954a6ff2da630893d48ba0e7594b08)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 12 ++++++++----
.../spark/sql/internal/BaseSessionStateBuilder.scala | 11 +++++++++++
.../apache/spark/sql/hive/HiveSessionStateBuilder.scala | 14 ++------------
3 files changed, 21 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 45e5f41..47284f3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -133,10 +133,14 @@ case class OptimizeMetadataOnlyQuery(catalog:
SessionCatalog) extends Rule[Logic
CaseInsensitiveMap(relation.tableMeta.storage.properties)
val timeZoneId =
caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(SQLConf.get.sessionLocalTimeZone)
- val partitions = if (partFilters.nonEmpty) {
- catalog.listPartitionsByFilter(relation.tableMeta.identifier,
normalizedFilters)
- } else {
- catalog.listPartitions(relation.tableMeta.identifier)
+ val partitions = relation.prunedPartitions match {
+ // for the case where partitions have already been pruned by
PruneHiveTablePartitions
+ case Some(parts) => parts
+ case None => if (partFilters.nonEmpty) {
+ catalog.listPartitionsByFilter(relation.tableMeta.identifier,
normalizedFilters)
+ } else {
+ catalog.listPartitions(relation.tableMeta.identifier)
+ }
}
val partitionData = partitions.map { p =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 20e1b56..9556d4d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -232,6 +232,9 @@ abstract class BaseSessionStateBuilder(
*/
protected def optimizer: Optimizer = {
new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
+ override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
+ super.earlyScanPushDownRules ++ customEarlyScanPushDownRules
+
override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
super.extendedOperatorOptimizationRules ++
customOperatorOptimizationRules
}
@@ -248,6 +251,14 @@ abstract class BaseSessionStateBuilder(
}
/**
+ * Custom early scan push down rules to add to the Optimizer. Prefer
overriding this instead
+ * of creating your own Optimizer.
+ *
+ * Note that this may NOT depend on the `optimizer` function.
+ */
+ protected def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] = Nil
+
+ /**
* Planner that converts optimized logical plans to physical plans.
*
* Note: this depends on the `conf` and `experimentalMethods` fields.
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 7ec52ba..6472675 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -98,18 +98,8 @@ class HiveSessionStateBuilder(session: SparkSession,
parentState: Option[Session
customCheckRules
}
- /**
- * Logical query plan optimizer that takes into account Hive.
- */
- override protected def optimizer: Optimizer = {
- new SparkOptimizer(catalogManager, catalog, experimentalMethods) {
- override def postHocOptimizationBatches: Seq[Batch] =
super.postHocOptimizationBatches ++
- Seq(Batch("Prune Hive Table Partitions", Once, new
PruneHiveTablePartitions(session)))
-
- override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =
- super.extendedOperatorOptimizationRules ++
customOperatorOptimizationRules
- }
- }
+ override def customEarlyScanPushDownRules: Seq[Rule[LogicalPlan]] =
+ Seq(new PruneHiveTablePartitions(session))
/**
* Planner that takes into account Hive-specific strategies.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]