This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b404e02  [SPARK-27476][SQL] Refactoring SchemaPruning rule to remove 
duplicate code
b404e02 is described below

commit b404e02574084c5ab550ce8716d4177464e7ce8c
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Tue Apr 16 14:50:37 2019 -0700

    [SPARK-27476][SQL] Refactoring SchemaPruning rule to remove duplicate code
    
    ## What changes were proposed in this pull request?
    
    In SchemaPruning rule, there is duplicate code for data source v1 and v2. 
Their logic is the same and we can refactor the rule to remove duplicate code.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #24383 from viirya/SPARK-27476.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   2 +-
 .../sql/execution/datasources/SchemaPruning.scala  | 100 ++++++++++-----------
 2 files changed, 47 insertions(+), 55 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f33cc86..3f59fa1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1551,7 +1551,7 @@ object SQLConf {
       .internal()
       .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
         "satisfying a query. This optimization allows columnar file format 
readers to avoid " +
-        "reading unnecessary nested column data. Currently Parquet and ORC v1 
are the " +
+        "reading unnecessary nested column data. Currently Parquet and ORC are 
the " +
         "data sources that implement this optimization.")
       .booleanConf
       .createWithDefault(false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
index 15fdf65..463ee9a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala
@@ -50,73 +50,65 @@ object SchemaPruning extends Rule[LogicalPlan] {
       case op @ PhysicalOperation(projects, filters,
           l @ LogicalRelation(hadoopFsRelation: HadoopFsRelation, _, _, _))
         if canPruneRelation(hadoopFsRelation) =>
-        val (normalizedProjects, normalizedFilters) =
-          normalizeAttributeRefNames(l.output, projects, filters)
-        val requestedRootFields = identifyRootFields(normalizedProjects, 
normalizedFilters)
-
-        // If requestedRootFields includes a nested field, continue. Otherwise,
-        // return op
-        if (requestedRootFields.exists { root: RootField => 
!root.derivedFromAtt }) {
-          val dataSchema = hadoopFsRelation.dataSchema
-          val prunedDataSchema = pruneDataSchema(dataSchema, 
requestedRootFields)
-
-          // If the data schema is different from the pruned data schema, 
continue. Otherwise,
-          // return op. We effect this comparison by counting the number of 
"leaf" fields in
-          // each schemata, assuming the fields in prunedDataSchema are a 
subset of the fields
-          // in dataSchema.
-          if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+
+        prunePhysicalColumns(l.output, projects, filters, 
hadoopFsRelation.dataSchema,
+          prunedDataSchema => {
             val prunedHadoopRelation =
               hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
-
-            val prunedRelation = buildPrunedRelation(l, prunedHadoopRelation)
-            val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
-
-            buildNewProjection(normalizedProjects, normalizedFilters, 
prunedRelation,
-              projectionOverSchema)
-          } else {
-            op
-          }
-        } else {
-          op
-        }
+            buildPrunedRelation(l, prunedHadoopRelation)
+          }).getOrElse(op)
 
       case op @ PhysicalOperation(projects, filters,
           d @ DataSourceV2Relation(table: FileTable, output, _)) if 
canPruneTable(table) =>
-        val (normalizedProjects, normalizedFilters) =
-          normalizeAttributeRefNames(output, projects, filters)
-        val requestedRootFields = identifyRootFields(normalizedProjects, 
normalizedFilters)
-
-        // If requestedRootFields includes a nested field, continue. Otherwise,
-        // return op
-        if (requestedRootFields.exists { root: RootField => 
!root.derivedFromAtt }) {
-          val dataSchema = table.dataSchema
-          val prunedDataSchema = pruneDataSchema(dataSchema, 
requestedRootFields)
-
-          // If the data schema is different from the pruned data schema, 
continue. Otherwise,
-          // return op. We effect this comparison by counting the number of 
"leaf" fields in
-          // each schemata, assuming the fields in prunedDataSchema are a 
subset of the fields
-          // in dataSchema.
-          if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+
+        prunePhysicalColumns(output, projects, filters, table.dataSchema,
+          prunedDataSchema => {
             val prunedFileTable = table match {
               case o: OrcTable => o.copy(userSpecifiedSchema = 
Some(prunedDataSchema))
               case _ =>
                 val message = s"${table.formatName} data source doesn't 
support schema pruning."
                 throw new AnalysisException(message)
             }
+            buildPrunedRelationV2(d, prunedFileTable)
+          }).getOrElse(op)
+    }
 
-
-            val prunedRelationV2 = buildPrunedRelationV2(d, prunedFileTable)
-            val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
-
-            buildNewProjection(normalizedProjects, normalizedFilters, 
prunedRelationV2,
-              projectionOverSchema)
-          } else {
-            op
-          }
-        } else {
-          op
-        }
+  /**
+   * This method returns optional logical plan. `None` is returned if no 
nested field is required or
+   * all nested fields are required.
+   */
+  private def prunePhysicalColumns(
+      output: Seq[AttributeReference],
+      projects: Seq[NamedExpression],
+      filters: Seq[Expression],
+      dataSchema: StructType,
+      leafNodeBuilder: StructType => LeafNode): Option[LogicalPlan] = {
+    val (normalizedProjects, normalizedFilters) =
+      normalizeAttributeRefNames(output, projects, filters)
+    val requestedRootFields = identifyRootFields(normalizedProjects, 
normalizedFilters)
+
+    // If requestedRootFields includes a nested field, continue. Otherwise,
+    // return op
+    if (requestedRootFields.exists { root: RootField => !root.derivedFromAtt 
}) {
+      val prunedDataSchema = pruneDataSchema(dataSchema, requestedRootFields)
+
+      // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+      // return op. We effect this comparison by counting the number of "leaf" 
fields in
+      // each schemata, assuming the fields in prunedDataSchema are a subset 
of the fields
+      // in dataSchema.
+      if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+        val prunedRelation = leafNodeBuilder(prunedDataSchema)
+        val projectionOverSchema = ProjectionOverSchema(prunedDataSchema)
+
+        Some(buildNewProjection(normalizedProjects, normalizedFilters, 
prunedRelation,
+          projectionOverSchema))
+      } else {
+        None
+      }
+    } else {
+      None
     }
+  }
 
   /**
    * Checks to see if the given relation can be pruned. Currently we support 
Parquet and ORC v1.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to