This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2573914 [SPARK-26975][SQL] Support nested-column pruning over
limit/sample/repartition
2573914 is described below
commit 257391497bea51553d5a9e420c20a3fdfe4d36a9
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue Mar 19 20:24:22 2019 -0700
[SPARK-26975][SQL] Support nested-column pruning over
limit/sample/repartition
## What changes were proposed in this pull request?
As [SPARK-26958](https://github.com/apache/spark/pull/23862/files)
benchmark shows, nested-column pruning has limitations. This PR aims to remove
the limitations on `limit/repartition/sample`. Here, repartition means
`Repartition`, not `RepartitionByExpression`.
**PREPARATION**
```scala
scala> spark.range(100).map(x => (x, (x, s"$x" * 100))).toDF("col1",
"col2").write.mode("overwrite").save("/tmp/p")
scala> sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
scala> spark.read.parquet("/tmp/p").createOrReplaceTempView("t")
```
**BEFORE**
```scala
scala> sql("SELECT col2._1 FROM (SELECT col2 FROM t LIMIT 1000000)").explain
== Physical Plan ==
CollectLimit 1000000
+- *(1) Project [col2#22._1 AS _1#28L]
+- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [],
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>
scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM
t)").explain
== Physical Plan ==
*(2) Project [col2#22._1 AS _1#33L]
+- Exchange RoundRobinPartitioning(1)
+- *(1) Project [col2#22]
+- *(1) FileScan parquet [col2#22] Batched: false, DataFilters: [],
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint,_2:string>>
```
**AFTER**
```scala
scala> sql("SELECT col2._1 FROM (SELECT /*+ REPARTITION(1) */ col2 FROM
t)").explain
== Physical Plan ==
Exchange RoundRobinPartitioning(1)
+- *(1) Project [col2#5._1 AS _1#11L]
+- *(1) FileScan parquet [col2#5] Batched: false, DataFilters: [],
Format: Parquet, Location: InMemoryFileIndex[file:/tmp/p], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct<col2:struct<_1:bigint>>
```
This supercedes https://github.com/apache/spark/pull/23542 and
https://github.com/apache/spark/pull/23873 .
## How was this patch tested?
Pass the Jenkins with a newly added test suite.
Closes #23964 from dongjoon-hyun/SPARK-26975-ALIAS.
Lead-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: DB Tsai <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Takeshi Yamamuro <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../catalyst/optimizer/NestedColumnAliasing.scala | 151 ++++++++++++++
.../spark/sql/catalyst/optimizer/Optimizer.scala | 3 +
.../optimizer/NestedColumnAliasingSuite.scala | 229 +++++++++++++++++++++
.../OrcNestedSchemaPruningBenchmark-results.txt | 27 ++-
.../OrcV2NestedSchemaPruningBenchmark-results.txt | 27 ++-
...ParquetNestedSchemaPruningBenchmark-results.txt | 27 ++-
.../benchmark/NestedSchemaPruningBenchmark.scala | 31 ++-
7 files changed, 460 insertions(+), 35 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
new file mode 100644
index 0000000..d9f2d36
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+/**
+ * This aims to handle a nested column aliasing pattern inside the
`ColumnPruning` optimizer rule.
+ * If a project or its child references to nested fields, and not all the
fields
+ * in a nested attribute are used, we can substitute them by alias attributes;
then a project
+ * of the nested fields as aliases on the children of the child will be
created.
+ */
+object NestedColumnAliasing {
+
+ def unapply(plan: LogicalPlan)
+ : Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = plan
match {
+ case Project(projectList, child)
+ if SQLConf.get.nestedSchemaPruningEnabled &&
canProjectPushThrough(child) =>
+ getAliasSubMap(projectList)
+ case _ => None
+ }
+
+ /**
+ * Replace nested columns to prune unused nested columns later.
+ */
+ def replaceToAliases(
+ plan: LogicalPlan,
+ nestedFieldToAlias: Map[GetStructField, Alias],
+ attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = plan match {
+ case Project(projectList, child) =>
+ Project(
+ getNewProjectList(projectList, nestedFieldToAlias),
+ replaceChildrenWithAliases(child, attrToAliases))
+ }
+
+ /**
+ * Return a replaced project list.
+ */
+ private def getNewProjectList(
+ projectList: Seq[NamedExpression],
+ nestedFieldToAlias: Map[GetStructField, Alias]): Seq[NamedExpression] = {
+ projectList.map(_.transform {
+ case f: GetStructField if nestedFieldToAlias.contains(f) =>
+ nestedFieldToAlias(f).toAttribute
+ }.asInstanceOf[NamedExpression])
+ }
+
+ /**
+ * Return a plan with new children replaced with aliases.
+ */
+ private def replaceChildrenWithAliases(
+ plan: LogicalPlan,
+ attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
+ plan.withNewChildren(plan.children.map { plan =>
+ Project(plan.output.flatMap(a => attrToAliases.getOrElse(a.exprId,
Seq(a))), plan)
+ })
+ }
+
+ /**
+ * Returns true for those operators that project can be pushed through.
+ */
+ private def canProjectPushThrough(plan: LogicalPlan) = plan match {
+ case _: GlobalLimit => true
+ case _: LocalLimit => true
+ case _: Repartition => true
+ case _: Sample => true
+ case _ => false
+ }
+
+ /**
+ * Return root references that are individually accessed as a whole, and
`GetStructField`s.
+ */
+ private def collectRootReferenceAndGetStructField(e: Expression):
Seq[Expression] = e match {
+ case _: AttributeReference | _: GetStructField => Seq(e)
+ case es if es.children.nonEmpty =>
es.children.flatMap(collectRootReferenceAndGetStructField)
+ case _ => Seq.empty
+ }
+
+ /**
+ * Return two maps in order to replace nested fields to aliases.
+ *
+ * 1. GetStructField -> Alias: A new alias is created for each nested field.
+ * 2. ExprId -> Seq[Alias]: A reference attribute has multiple aliases
pointing it.
+ */
+ private def getAliasSubMap(projectList: Seq[NamedExpression])
+ : Option[(Map[GetStructField, Alias], Map[ExprId, Seq[Alias]])] = {
+ val (nestedFieldReferences, otherRootReferences) =
+ projectList.flatMap(collectRootReferenceAndGetStructField).partition {
+ case _: GetStructField => true
+ case _ => false
+ }
+
+ val aliasSub = nestedFieldReferences.asInstanceOf[Seq[GetStructField]]
+ .filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
+ .groupBy(_.references.head)
+ .flatMap { case (attr, nestedFields: Seq[GetStructField]) =>
+ // Each expression can contain multiple nested fields.
+ // Note that we keep the original names to deliver to parquet in a
case-sensitive way.
+ val nestedFieldToAlias = nestedFields.distinct.map { f =>
+ val exprId = NamedExpression.newExprId
+ (f, Alias(f, s"_gen_alias_${exprId.id}")(exprId, Seq.empty, None))
+ }
+
+ // If all nested fields of `attr` are used, we don't need to introduce
new aliases.
+ // By default, ColumnPruning rule uses `attr` already.
+ if (nestedFieldToAlias.nonEmpty &&
+ nestedFieldToAlias.length < totalFieldNum(attr.dataType)) {
+ Some(attr.exprId -> nestedFieldToAlias)
+ } else {
+ None
+ }
+ }
+
+ if (aliasSub.isEmpty) {
+ None
+ } else {
+ Some((aliasSub.values.flatten.toMap, aliasSub.map(x => (x._1,
x._2.map(_._2)))))
+ }
+ }
+
+ /**
+ * Return total number of fields of this type. This is used as a threshold
to use nested column
+ * pruning. It's okay to underestimate. If the number of reference is bigger
than this, the parent
+ * reference is used instead of nested field references.
+ */
+ private def totalFieldNum(dataType: DataType): Int = dataType match {
+ case _: AtomicType => 1
+ case StructType(fields) => fields.map(f => totalFieldNum(f.dataType)).sum
+ case ArrayType(elementType, _) => totalFieldNum(elementType)
+ case MapType(keyType, valueType, _) => totalFieldNum(keyType) +
totalFieldNum(valueType)
+ case _ => 1 // UDT and others
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4babd40..d4eb516 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -647,6 +647,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p
+ case p @ NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
+ NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias,
attrToAliases)
+
// for all other logical plans that inherits the output from it's children
// Project over project is handled by the first case, skip it here.
case p @ Project(_, child) if !child.isInstanceOf[Project] =>
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
new file mode 100644
index 0000000..87f7d19
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.SchemaPruningTest
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.{StringType, StructType}
+
+class NestedColumnAliasingSuite extends SchemaPruningTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches = Batch("Nested column pruning", FixedPoint(100),
+ ColumnPruning,
+ CollapseProject,
+ RemoveNoopOperators) :: Nil
+ }
+
+ private val name = StructType.fromDDL("first string, middle string, last
string")
+ private val employer = StructType.fromDDL("id int, company
struct<name:string, address:string>")
+ private val contact = LocalRelation(
+ 'id.int,
+ 'name.struct(name),
+ 'address.string,
+ 'friends.array(name),
+ 'relatives.map(StringType, name),
+ 'employer.struct(employer))
+
+ test("Pushing a single nested field projection") {
+ def testSingleFieldPushDown(op: LogicalPlan => LogicalPlan): Unit = {
+ val middle = GetStructField('name, 1, Some("middle"))
+ val query = op(contact).select(middle).analyze
+ val optimized = Optimize.execute(query)
+ val expected = op(contact.select(middle)).analyze
+ comparePlans(optimized, expected)
+ }
+
+ testSingleFieldPushDown((input: LogicalPlan) => input.limit(5))
+ testSingleFieldPushDown((input: LogicalPlan) => input.repartition(1))
+ testSingleFieldPushDown((input: LogicalPlan) => Sample(0.0, 0.6, false,
11L, input))
+ }
+
+ test("Pushing multiple nested field projection") {
+ val first = GetStructField('name, 0, Some("first"))
+ val last = GetStructField('name, 2, Some("last"))
+
+ val query = contact
+ .limit(5)
+ .select('id, first, last)
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected = contact
+ .select('id, first, last)
+ .limit(5)
+ .analyze
+
+ comparePlans(optimized, expected)
+ }
+
+ test("function with nested field inputs") {
+ val first = GetStructField('name, 0, Some("first"))
+ val last = GetStructField('name, 2, Some("last"))
+
+ val query = contact
+ .limit(5)
+ .select('id, ConcatWs(Seq(first, last)))
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val aliases = collectGeneratedAliases(optimized)
+
+ val expected = contact
+ .select('id, first.as(aliases(0)), last.as(aliases(1)))
+ .limit(5)
+ .select(
+ 'id,
+ ConcatWs(Seq($"${aliases(0)}",
$"${aliases(1)}")).as("concat_ws(name.first, name.last)"))
+ .analyze
+ comparePlans(optimized, expected)
+ }
+
+ test("multi-level nested field") {
+ val field1 = GetStructField(GetStructField('employer, 1, Some("company")),
0, Some("name"))
+ val field2 = GetStructField('employer, 0, Some("id"))
+
+ val query = contact
+ .limit(5)
+ .select(field1, field2)
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected = contact
+ .select(field1, field2)
+ .limit(5)
+ .analyze
+ comparePlans(optimized, expected)
+ }
+
+ test("Push original case-sensitive names") {
+ val first1 = GetStructField('name, 0, Some("first"))
+ val first2 = GetStructField('name, 1, Some("FIRST"))
+
+ val query = contact
+ .limit(5)
+ .select('id, first1, first2)
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected = contact
+ .select('id, first1, first2)
+ .limit(5)
+ .analyze
+
+ comparePlans(optimized, expected)
+ }
+
+ test("Pushing a single nested field projection - negative") {
+ val ops = Array(
+ (input: LogicalPlan) => input.distribute('name)(1),
+ (input: LogicalPlan) => input.distribute($"name.middle")(1),
+ (input: LogicalPlan) => input.orderBy('name.asc),
+ (input: LogicalPlan) => input.orderBy($"name.middle".asc),
+ (input: LogicalPlan) => input.sortBy('name.asc),
+ (input: LogicalPlan) => input.sortBy($"name.middle".asc),
+ (input: LogicalPlan) => input.union(input)
+ )
+
+ val queries = ops.map { op =>
+ op(contact.select('name))
+ .select(GetStructField('name, 1, Some("middle")))
+ .analyze
+ }
+
+ val optimizedQueries = queries.map(Optimize.execute)
+ val expectedQueries = queries
+ optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected)
=>
+ comparePlans(optimized, expected)
+ }
+ }
+
+ test("Pushing a single nested field projection through filters - negative") {
+ val ops = Array(
+ (input: LogicalPlan) => input.where('name.isNotNull),
+ (input: LogicalPlan) => input.where($"name.middle".isNotNull)
+ )
+
+ val queries = ops.map { op =>
+ op(contact)
+ .select(GetStructField('name, 1, Some("middle")))
+ .analyze
+ }
+
+ val optimizedQueries = queries.map(Optimize.execute)
+ val expectedQueries = queries
+
+ optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected)
=>
+ comparePlans(optimized, expected)
+ }
+ }
+
+ test("Do not optimize when parent field is used") {
+ val query = contact
+ .limit(5)
+ .select('id, GetStructField('name, 0, Some("first")), 'name)
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected = contact
+ .select('id, 'name)
+ .limit(5)
+ .select('id, GetStructField('name, 0, Some("first")), 'name)
+ .analyze
+ comparePlans(optimized, expected)
+ }
+
+ test("Some nested column means the whole structure") {
+ val nestedRelation = LocalRelation('a.struct('b.struct('c.int, 'd.int,
'e.int)))
+
+ val query = nestedRelation
+ .limit(5)
+ .select(GetStructField('a, 0, Some("b")))
+ .analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected = nestedRelation
+ .select(GetStructField('a, 0, Some("b")))
+ .limit(5)
+ .analyze
+
+ comparePlans(optimized, expected)
+ }
+
+ private def collectGeneratedAliases(query: LogicalPlan): ArrayBuffer[String]
= {
+ val aliases = ArrayBuffer[String]()
+ query.transformAllExpressions {
+ case a @ Alias(_, name) if name.startsWith("_gen_alias_") =>
+ aliases += name
+ a
+ }
+ aliases
+ }
+}
diff --git a/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
b/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
index 078fbbb..53fbf14 100644
--- a/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/OrcNestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux
3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Selection: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 117 154
23 8.5 117.5 1.0X
-Nested column 1271 1295
26 0.8 1270.5 0.1X
+Top-level column 131 150
25 7.7 130.6 1.0X
+Nested column 922 954
21 1.1 922.2 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Limiting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 431 488
73 2.3 431.2 1.0X
-Nested column 1738 1777
24 0.6 1738.3 0.2X
+Top-level column 446 477
50 2.2 445.5 1.0X
+Nested column 1328 1366
44 0.8 1328.4 0.3X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 349 381
87 2.9 348.7 1.0X
-Nested column 4374 4456
125 0.2 4373.6 0.1X
+Top-level column 357 386
33 2.8 356.8 1.0X
+Nested column 1266 1274
7 0.8 1266.3 0.3X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 358 410
85 2.8 358.5 1.0X
-Nested column 4447 4517
125 0.2 4447.3 0.1X
+Top-level column 368 394
54 2.7 367.6 1.0X
+Nested column 3890 3954
80 0.3 3890.3 0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column 129 140
10 7.7 129.1 1.0X
+Nested column 966 999
26 1.0 966.2 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sorting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 553 566
13 1.8 552.8 1.0X
-Nested column 5115 5248
84 0.2 5115.1 0.1X
+Top-level column 573 601
61 1.7 573.2 1.0X
+Nested column 4417 4598
149 0.2 4417.1 0.1X
diff --git a/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
b/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
index eaaf3b0..f259e14 100644
--- a/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/OrcV2NestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux
3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Selection: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 120 148
24 8.3 120.0 1.0X
-Nested column 2367 2415
43 0.4 2367.0 0.1X
+Top-level column 135 169
19 7.4 134.7 1.0X
+Nested column 2131 2216
95 0.5 2131.4 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Limiting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 129 153
16 7.8 128.5 1.0X
-Nested column 2368 2400
32 0.4 2367.7 0.1X
+Top-level column 147 158
10 6.8 146.9 1.0X
+Nested column 2149 2204
50 0.5 2148.9 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 359 396
59 2.8 358.9 1.0X
-Nested column 4100 4147
59 0.2 4099.9 0.1X
+Top-level column 386 399
16 2.6 385.8 1.0X
+Nested column 2612 2666
57 0.4 2612.2 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 367 394
56 2.7 366.8 1.0X
-Nested column 4182 4236
64 0.2 4181.5 0.1X
+Top-level column 392 454
119 2.5 392.2 1.0X
+Nested column 4106 4168
79 0.2 4106.1 0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column 146 157
13 6.9 145.9 1.0X
+Nested column 2294 2338
44 0.4 2293.6 0.1X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sorting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 262 269
6 3.8 262.5 1.0X
-Nested column 2950 3021
65 0.3 2949.8 0.1X
+Top-level column 290 294
4 3.5 289.7 1.0X
+Nested column 2914 2997
87 0.3 2913.6 0.1X
diff --git
a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
index 1f1296e..bd5b39a 100644
--- a/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetNestedSchemaPruningBenchmark-results.txt
@@ -6,35 +6,42 @@ OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux
3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Selection: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 145 174
23 6.9 145.1 1.0X
-Nested column 325 346
19 3.1 324.8 0.4X
+Top-level column 128 166
24 7.8 128.0 1.0X
+Nested column 308 325
10 3.2 308.3 0.4X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Limiting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 434 508
108 2.3 434.3 1.0X
-Nested column 625 647
23 1.6 624.8 0.7X
+Top-level column 447 496
91 2.2 447.0 1.0X
+Nested column 631 666
40 1.6 631.2 0.7X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 357 368
9 2.8 356.9 1.0X
-Nested column 2897 2976
88 0.3 2897.4 0.1X
+Top-level column 360 394
84 2.8 360.0 1.0X
+Nested column 553 586
65 1.8 553.5 0.7X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Repartitioning by exprs: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 365 413
77 2.7 364.9 1.0X
-Nested column 2902 2969
99 0.3 2902.4 0.1X
+Top-level column 368 393
50 2.7 368.3 1.0X
+Nested column 2942 3017
82 0.3 2942.0 0.1X
+
+OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Sample: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Top-level column 124 143
10 8.1 124.1 1.0X
+Nested column 345 366
34 2.9 344.8 0.4X
OpenJDK 64-Bit Server VM 1.8.0_201-b09 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Sorting: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Top-level column 555 600
80 1.8 555.4 1.0X
-Nested column 3448 3490
45 0.3 3447.9 0.2X
+Top-level column 577 618
55 1.7 576.8 1.0X
+Nested column 3473 3524
49 0.3 3473.0 0.2X
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
index e852de1..9ffb3f9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/NestedSchemaPruningBenchmark.scala
@@ -125,6 +125,26 @@ abstract class NestedSchemaPruningBenchmark extends
SqlBasedBenchmark {
}
}
+ protected def sampleBenchmark(numRows: Int, numIters: Int): Unit = {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ Seq(1, 2).foreach { i =>
+ df.write.format(dataSourceName).save(path + s"/$i")
+ spark.read.format(dataSourceName).load(path +
s"/$i").createOrReplaceTempView(s"t$i")
+ }
+
+ val benchmark = new Benchmark(s"Sample", numRows, numIters, output =
output)
+
+ addCase(benchmark, "Top-level column",
+ s"SELECT col1 FROM (SELECT col1 FROM t1 TABLESAMPLE(100 percent))")
+ addCase(benchmark, "Nested column",
+ s"SELECT col2._1 FROM (SELECT col2 FROM t2 TABLESAMPLE(100 percent))")
+
+ benchmark.run()
+ }
+ }
+
protected def sortBenchmark(numRows: Int, numIters: Int): Unit = {
withTempPath { dir =>
val path = dir.getCanonicalPath
@@ -146,11 +166,12 @@ abstract class NestedSchemaPruningBenchmark extends
SqlBasedBenchmark {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark(benchmarkName) {
withSQLConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> "true") {
- selectBenchmark (N, numIters)
- limitBenchmark (N, numIters)
- repartitionBenchmark (N, numIters)
- repartitionByExprBenchmark (N, numIters)
- sortBenchmark (N, numIters)
+ selectBenchmark(N, numIters)
+ limitBenchmark(N, numIters)
+ repartitionBenchmark(N, numIters)
+ repartitionByExprBenchmark(N, numIters)
+ sampleBenchmark(N, numIters)
+ sortBenchmark(N, numIters)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]