This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2573914  [SPARK-26975][SQL] Support nested-column pruning over 
limit/sample/repartition
2573914 is described below

commit 257391497bea51553d5a9e420c20a3fdfe4d36a9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Mar 19 20:24:22 2019 -0700

    [SPARK-26975][SQL] Support nested-column pruning over 
limit/sample/repartition
    
    ## What changes were proposed in this pull request?
    
    As [SPARK-26958](https://github.com/apache/spark/pull/23862/files) 
benchmark shows, nested-column pruning has limitations. This PR aims to remove 
the limitations on `limit/repartition/sample`. Here, repartition means 
`Repartition`, not `RepartitionByExpression`.
    
    **PREPARATION**
    ```scala
    scala> spark.range(100).map(x => (x, (x, s"$x" * 100))).toDF("col1", 
"col2").write.mode("overwrite").save("/tmp/p")
    scala> sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
    scala> spark.read.parquet("/tmp/p").createOrReplaceTempView("t")
    ```
    
    **BEFORE**
    ```scala
    scala> sql("SELECT col2._1 FROM (SELECT col2 FROM t LIMIT 1000000)").explain
    == Physical Plan ==
    CollectLimit 1000000
    +- *(1) Project [col2#22._1 AS _1#28L]
       +- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [], 
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>
    
    scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM 
t)").explain
    == Physical Plan ==
    *(2) Project [col2#22._1 AS _1#33L]
    +- Exchange RoundRobinPartitioning(1)
       +- *(1) Project [col2#22]
          +- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [], 
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint,_2:string>>
    ```
    
    **AFTER**
    ```scala
    scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM 
t)").explain
    == Physical Plan ==
    Exchange RoundRobinPartitioning(1)
    +- *(1) Project [col2#5._1 AS _1#11L]
       +- *(1) FileScan parquet [col2#5] Batched: false, DataFilters: [], 
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>
    ```
    
    This supercedes https://github.com/apache/spark/pull/23542 and 
https://github.com/apache/spark/pull/23873 .
    
    ## How was this patch tested?
    
    Pass the Jenkins with a newly added test suite.
    
    Closes #23964 from dongjoon-hyun/SPARK-26975-ALIAS.
    
    Lead-authored-by: Dongjoon Hyun <[email protected]>
    Co-authored-by: DB Tsai <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    Co-authored-by: Takeshi Yamamuro <[email protected]>
    Co-authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  | 151 ++++++++++++++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   3 +
 .../optimizer/NestedColumnAliasingSuite.scala      | 229 +++++++++++++++++++++
 .../OrcNestedSchemaPruningBenchmark-results.txt    |  27 ++-
 .../OrcV2NestedSchemaPruningBenchmark-results.txt  |  27 ++-
 ...ParquetNestedSchemaPruningBenchmark-results.txt |  27 ++-
 .../benchmark/NestedSchemaPruningBenchmark.scala   |  31 ++-
 7 files changed, 460 insertions(+), 35 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
new file mode 100644
index 0000000..d9f2d36
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the 
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the 
fields
+ * in a nested attribute are used, we can substitute them by alias attributes; 
then a project
+ * of the nested fields as aliases on the children of the child will be 
created.
+ */
+object NestedColumnAliasing {
+
+  def unapply(plan: LogicalPlan)
+    : Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan 
match {
+    case Project(projectList, child)
+        if SQLConf.get.nestedSchemaPruningEnabled && 
canProjectPushThrough(child) =>
+      getAliasSubMap(projectList)
+    case _ => None
+  }
+
+  /**
+   * Replace nested columns to prune unused nested columns later.
+   */
+  def replaceToAliases(
+      plan: LogicalPlan,
+      nestedFieldToAlias: Map[GetStructField, Alias],
+      attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+    case Project(projectList, child) =>
+      Project(
+        getNewProjectList(projectList, nestedFieldToAlias),
+        replaceChildrenWithAliases(child, attrToAliases))
+  }
+
+  /**
+   * Return a replaced project list.
+   */
+  private def getNewProjectList(
+      projectList: Seq[NamedExpression],
+      nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+    projectList.map(_.transform {
+      case f: GetStructField if nestedFieldToAlias.contains(f) =>
+        nestedFieldToAlias(f).toAttribute
+    }.asInstanceOf[NamedExpression])
+  }
+
+  /**
+   * Return a plan with new children replaced with aliases.
+   */
+  private def replaceChildrenWithAliases(
+      plan: LogicalPlan,
+      attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+    plan.withNewChildren(plan.children.map { plan =>
+      Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId, 
Seq(a))), plan)
+    })
+  }
+
+  /**
+   * Returns true for those operators that project can be pushed through.
+   */
+  private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+    case _: GlobalLimit => true
+    case _: LocalLimit => true
+    case _: Repartition => true
+    case _: Sample => true
+    case _ => false
+  }
+
+  /**
+   * Return root references that are individually accessed as a whole, and 
`GetStructField`s.
+   */
+  private def collectRootReferenceAndGetStructField(e: Expression): 
Seq[Expression] = e match {
+    case _: AttributeReference | _: GetStructField => Seq(e)
+    case es if es.children.nonEmpty => 
es.children.flatMap(collectRootReferenceAndGetStructField)
+    case _ => Seq.empty
+  }
+
+  /**
+   * Return two maps in order to replace nested fields to aliases.
+   *
+   * 1. GetStructField -> Alias: A new alias is created for each nested field.
+   * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases 
pointing it.
+   */
+  private def getAliasSubMap(projectList: Seq[NamedExpression])
+    : Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+    val (nestedFieldReferences, otherRootReferences) =
+      projectList.flatMap(collectRootReferenceAndGetStructField).partition {
+        case _: GetStructField => true
+        case _ => false
+      }
+
+    val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+      .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+      .groupBy(_.references.head)
+      .flatMap { case (attr, nestedFields: Seq[GetStructField]) =>
+        // Each expression can contain multiple nested fields.
+        // Note that we keep the original names to deliver to parquet in a 
case-sensitive way.
+        val nestedFieldToAlias = nestedFields.distinct.map { f =>
+          val exprId = NamedExpression.newExprId
+          (f, Alias(f, s"_gen_alias_${exprId.id}")(exprId, Seq.empty, None))
+        }
+
+        // If all nested fields of `attr` are used, we don't need to introduce 
new aliases.
+        // By default, ColumnPruning rule uses `attr` already.
+        if (nestedFieldToAlias.nonEmpty &&
+            nestedFieldToAlias.length < totalFieldNum(attr.dataType)) {
+          Some(attr.exprId -> nestedFieldToAlias)
+        } else {
+          None
+        }
+      }
+
+    if (aliasSub.isEmpty) {
+      None
+    } else {
+      Some((aliasSub.values.flatten.toMap, aliasSub.map(x => (x._1, 
x._2.map(_._2)))))
+    }
+  }
+
+  /**
+   * Return total number of fields of this type. This is used as a threshold 
to use nested column
+   * pruning. It's okay to underestimate. If the number of reference is bigger 
than this, the parent
+   * reference is used instead of nested field references.
+   */
+  private def totalFieldNum(dataType: DataType): Int = dataType match {
+    case _: AtomicType => 1
+    case StructType(fields) => fields.map(f => totalFieldNum(f.dataType)).sum
+    case ArrayType(elementType, _) => totalFieldNum(elementType)
+    case MapType(keyType, valueType, _) => totalFieldNum(keyType) + 
totalFieldNum(valueType)
+    case _ => 1 // UDT and others
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4babd40..d4eb516 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -647,6 +647,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
     // Can't prune the columns on LeafNode
     case p @ Project(_, _: LeafNode) => p
 
+    case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
+      NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, 
attrToAliases)
+
     // for all other logical plans that inherits the output from it's children
     // Project over project is handled by the first case, skip it here.
     case p @ Project(_, child) if !child.isInstanceOf[Project] =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
new file mode 100644
index 0000000..87f7d19
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("Nested column pruning", FixedPoint(100),
+      ColumnPruning,
+      CollapseProject,
+      RemoveNoopOperators) :: Nil
+  }
+
+  private val name = StructType.fromDDL("first string, middle string, last 
string")
+  private val employer = StructType.fromDDL("id int, company 
struct<name:string, address:string>")
+  private val contact = LocalRelation(
+    'id.int,
+    'name.struct(name),
+    'address.string,
+    'friends.array(name),
+    'relatives.map(StringType, name),
+    'employer.struct(employer))
+
+  test("Pushing a single nested field projection") {
+    def testSingleFieldPushDown(op: LogicalPlan => LogicalPlan): Unit = {
+      val middle = GetStructField('name, 1, Some("middle"))
+      val query = op(contact).select(middle).analyze
+      val optimized = Optimize.execute(query)
+      val expected = op(contact.select(middle)).analyze
+      comparePlans(optimized, expected)
+    }
+
+    testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+    testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+    testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false, 
11L, input))
+  }
+
+  test("Pushing multiple nested field projection") {
+    val first = GetStructField('name, 0, Some("first"))
+    val last = GetStructField('name, 2, Some("last"))
+
+    val query = contact
+      .limit(5)
+      .select('id, first, last)
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = contact
+      .select('id, first, last)
+      .limit(5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("function with nested field inputs") {
+    val first = GetStructField('name, 0, Some("first"))
+    val last = GetStructField('name, 2, Some("last"))
+
+    val query = contact
+      .limit(5)
+      .select('id, ConcatWs(Seq(first, last)))
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val aliases = collectGeneratedAliases(optimized)
+
+    val expected = contact
+      .select('id, first.as(aliases(0)), last.as(aliases(1)))
+      .limit(5)
+      .select(
+        'id,
+        ConcatWs(Seq($"${aliases(0)}", 
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+      .analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("multi-level nested field") {
+    val field1 = GetStructField(GetStructField('employer, 1, Some("company")), 
0, Some("name"))
+    val field2 = GetStructField('employer, 0, Some("id"))
+
+    val query = contact
+      .limit(5)
+      .select(field1, field2)
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = contact
+      .select(field1, field2)
+      .limit(5)
+      .analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("Push original case-sensitive names") {
+    val first1 = GetStructField('name, 0, Some("first"))
+    val first2 = GetStructField('name, 1, Some("FIRST"))
+
+    val query = contact
+      .limit(5)
+      .select('id, first1, first2)
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = contact
+      .select('id, first1, first2)
+      .limit(5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  test("Pushing a single nested field projection - negative") {
+    val ops = Array(
+      (input: LogicalPlan) => input.distribute('name)(1),
+      (input: LogicalPlan) => input.distribute($"name.middle")(1),
+      (input: LogicalPlan) => input.orderBy('name.asc),
+      (input: LogicalPlan) => input.orderBy($"name.middle".asc),
+      (input: LogicalPlan) => input.sortBy('name.asc),
+      (input: LogicalPlan) => input.sortBy($"name.middle".asc),
+      (input: LogicalPlan) => input.union(input)
+    )
+
+    val queries = ops.map { op =>
+      op(contact.select('name))
+        .select(GetStructField('name, 1, Some("middle")))
+        .analyze
+    }
+
+    val optimizedQueries = queries.map(Optimize.execute)
+    val expectedQueries = queries
+    optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected) 
=>
+      comparePlans(optimized, expected)
+    }
+  }
+
+  test("Pushing a single nested field projection through filters - negative") {
+    val ops = Array(
+      (input: LogicalPlan) => input.where('name.isNotNull),
+      (input: LogicalPlan) => input.where($"name.middle".isNotNull)
+    )
+
+    val queries = ops.map { op =>
+      op(contact)
+        .select(GetStructField('name, 1, Some("middle")))
+        .analyze
+    }
+
+    val optimizedQueries = queries.map(Optimize.execute)
+    val expectedQueries = queries
+
+    optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected) 
=>
+      comparePlans(optimized, expected)
+    }
+  }
+
+  test("Do not optimize when parent field is used") {
+    val query = contact
+      .limit(5)
+      .select('id, GetStructField('name, 0, Some("first")), 'name)
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = contact
+      .select('id, 'name)
+      .limit(5)
+      .select('id, GetStructField('name, 0, Some("first")), 'name)
+      .analyze
+    comparePlans(optimized, expected)
+  }
+
+  test("Some nested column means the whole structure") {
+    val nestedRelation = LocalRelation('a.struct('b.struct('c.int, 'd.int, 
'e.int)))
+
+    val query = nestedRelation
+      .limit(5)
+      .select(GetStructField('a, 0, Some("b")))
+      .analyze
+
+    val optimized = Optimize.execute(query)
+
+    val expected = nestedRelation
+      .select(GetStructField('a, 0, Some("b")))
+      .limit(5)
+      .analyze
+
+    comparePlans(optimized, expected)
+  }
+
+  private def collectGeneratedAliases(query: LogicalPlan): ArrayBuffer[String] 
= {
+    val aliases = ArrayBuffer[String]()
+    query.transformAllExpressions {
+      case a @ Alias(_, name) if name.startsWith("_gen_alias_") =>
+        aliases += name
+        a
+    }
+    aliases
+  }
+}
diff --git a/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt 
b/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
index 078fbbb..53fbf14 100644
--- a/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    117            154         
 23          8.5         117.5       1.0X
-Nested column                                      1271           1295         
 26          0.8        1270.5       0.1X
+Top-level column                                    131            150         
 25          7.7         130.6       1.0X
+Nested column                                       922            954         
 21          1.1         922.2       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    431            488         
 73          2.3         431.2       1.0X
-Nested column                                      1738           1777         
 24          0.6        1738.3       0.2X
+Top-level column                                    446            477         
 50          2.2         445.5       1.0X
+Nested column                                      1328           1366         
 44          0.8        1328.4       0.3X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:                           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    349            381         
 87          2.9         348.7       1.0X
-Nested column                                      4374           4456         
125          0.2        4373.6       0.1X
+Top-level column                                    357            386         
 33          2.8         356.8       1.0X
+Nested column                                      1266           1274         
  7          0.8        1266.3       0.3X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning by exprs:                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    358            410         
 85          2.8         358.5       1.0X
-Nested column                                      4447           4517         
125          0.2        4447.3       0.1X
+Top-level column                                    368            394         
 54          2.7         367.6       1.0X
+Nested column                                      3890           3954         
 80          0.3        3890.3       0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample:                                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column                                    129            140         
 10          7.7         129.1       1.0X
+Nested column                                       966            999         
 26          1.0         966.2       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Sorting:                                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    553            566         
 13          1.8         552.8       1.0X
-Nested column                                      5115           5248         
 84          0.2        5115.1       0.1X
+Top-level column                                    573            601         
 61          1.7         573.2       1.0X
+Nested column                                      4417           4598         
149          0.2        4417.1       0.1X
 
 
diff --git a/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt 
b/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
index eaaf3b0..f259e14 100644
--- a/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    120            148         
 24          8.3         120.0       1.0X
-Nested column                                      2367           2415         
 43          0.4        2367.0       0.1X
+Top-level column                                    135            169         
 19          7.4         134.7       1.0X
+Nested column                                      2131           2216         
 95          0.5        2131.4       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    129            153         
 16          7.8         128.5       1.0X
-Nested column                                      2368           2400         
 32          0.4        2367.7       0.1X
+Top-level column                                    147            158         
 10          6.8         146.9       1.0X
+Nested column                                      2149           2204         
 50          0.5        2148.9       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:                           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    359            396         
 59          2.8         358.9       1.0X
-Nested column                                      4100           4147         
 59          0.2        4099.9       0.1X
+Top-level column                                    386            399         
 16          2.6         385.8       1.0X
+Nested column                                      2612           2666         
 57          0.4        2612.2       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning by exprs:                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    367            394         
 56          2.7         366.8       1.0X
-Nested column                                      4182           4236         
 64          0.2        4181.5       0.1X
+Top-level column                                    392            454         
119          2.5         392.2       1.0X
+Nested column                                      4106           4168         
 79          0.2        4106.1       0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample:                                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column                                    146            157         
 13          6.9         145.9       1.0X
+Nested column                                      2294           2338         
 44          0.4        2293.6       0.1X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Sorting:                                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    262            269         
  6          3.8         262.5       1.0X
-Nested column                                      2950           3021         
 65          0.3        2949.8       0.1X
+Top-level column                                    290            294         
  4          3.5         289.7       1.0X
+Nested column                                      2914           2997         
 87          0.3        2913.6       0.1X
 
 
diff --git 
a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt 
b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
index 1f1296e..bd5b39a 100644
--- a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 
3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Selection:                                Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    145            174         
 23          6.9         145.1       1.0X
-Nested column                                       325            346         
 19          3.1         324.8       0.4X
+Top-level column                                    128            166         
 24          7.8         128.0       1.0X
+Nested column                                       308            325         
 10          3.2         308.3       0.4X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Limiting:                                 Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    434            508         
108          2.3         434.3       1.0X
-Nested column                                       625            647         
 23          1.6         624.8       0.7X
+Top-level column                                    447            496         
 91          2.2         447.0       1.0X
+Nested column                                       631            666         
 40          1.6         631.2       0.7X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning:                           Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    357            368         
  9          2.8         356.9       1.0X
-Nested column                                      2897           2976         
 88          0.3        2897.4       0.1X
+Top-level column                                    360            394         
 84          2.8         360.0       1.0X
+Nested column                                       553            586         
 65          1.8         553.5       0.7X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Repartitioning by exprs:                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    365            413         
 77          2.7         364.9       1.0X
-Nested column                                      2902           2969         
 99          0.3        2902.4       0.1X
+Top-level column                                    368            393         
 50          2.7         368.3       1.0X
+Nested column                                      2942           3017         
 82          0.3        2942.0       0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample:                                   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column                                    124            143         
 10          8.1         124.1       1.0X
+Nested column                                       345            366         
 34          2.9         344.8       0.4X
 
 OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
 Sorting:                                  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-Top-level column                                    555            600         
 80          1.8         555.4       1.0X
-Nested column                                      3448           3490         
 45          0.3        3447.9       0.2X
+Top-level column                                    577            618         
 55          1.7         576.8       1.0X
+Nested column                                      3473           3524         
 49          0.3        3473.0       0.2X
 
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
index e852de1..9ffb3f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
@@ -125,6 +125,26 @@ abstract class NestedSchemaPruningBenchmark extends 
SqlBasedBenchmark {
     }
   }
 
+  protected def sampleBenchmark(numRows: Int, numIters: Int): Unit = {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      Seq(1, 2).foreach { i =>
+        df.write.format(dataSourceName).save(path + s"/$i")
+        spark.read.format(dataSourceName).load(path + 
s"/$i").createOrReplaceTempView(s"t$i")
+      }
+
+      val benchmark = new Benchmark(s"Sample", numRows, numIters, output = 
output)
+
+      addCase(benchmark, "Top-level column",
+        s"SELECT col1 FROM (SELECT col1 FROM t1 TABLESAMPLE(100 percent))")
+      addCase(benchmark, "Nested column",
+        s"SELECT col2._1 FROM (SELECT col2 FROM t2 TABLESAMPLE(100 percent))")
+
+      benchmark.run()
+    }
+  }
+
   protected def sortBenchmark(numRows: Int, numIters: Int): Unit = {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
@@ -146,11 +166,12 @@ abstract class NestedSchemaPruningBenchmark extends 
SqlBasedBenchmark {
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
     runBenchmark(benchmarkName) {
       withSQLConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> "true") {
-        selectBenchmark (N, numIters)
-        limitBenchmark (N, numIters)
-        repartitionBenchmark (N, numIters)
-        repartitionByExprBenchmark (N, numIters)
-        sortBenchmark (N, numIters)
+        selectBenchmark(N, numIters)
+        limitBenchmark(N, numIters)
+        repartitionBenchmark(N, numIters)
+        repartitionByExprBenchmark(N, numIters)
+        sampleBenchmark(N, numIters)
+        sortBenchmark(N, numIters)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to