Repository: spark Updated Branches: refs/heads/master 7e26b5761 -> 1a7d7cc85
[SPARK-2405][SQL] Reusue same byte buffers when creating new instance of InMemoryRelation Reuse byte buffers when creating unique attributes for multiple instances of an InMemoryRelation in a single query plan. Author: Michael Armbrust <[email protected]> Closes #1332 from marmbrus/doubleCache and squashes the following commits: 4a19609 [Michael Armbrust] Clean up concurrency story by calculating buffersn the constructor. b39c931 [Michael Armbrust] Allocations are kind of a side effect. f67eff7 [Michael Armbrust] Reusue same byte buffers when creating new instance of InMemoryRelation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a7d7cc8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a7d7cc8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a7d7cc8 Branch: refs/heads/master Commit: 1a7d7cc85fb24de21f1cde67d04467171b82e845 Parents: 7e26b57 Author: Michael Armbrust <[email protected]> Authored: Sat Jul 12 12:13:32 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Jul 12 12:13:32 2014 -0700 ---------------------------------------------------------------------- .../analysis/MultiInstanceRelation.scala | 2 +- .../columnar/InMemoryColumnarTableScan.scala | 35 ++++++++++++++------ 2 files changed, 25 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1a7d7cc8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala index a6ce908..22941ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/MultiInstanceRelation.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan * of itself with globally unique expression ids. */ trait MultiInstanceRelation { - def newInstance: this.type + def newInstance(): this.type } /** http://git-wip-us.apache.org/repos/asf/spark/blob/1a7d7cc8/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index e1e4f24..ff7f664 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.columnar +import java.nio.ByteBuffer + +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -26,22 +29,19 @@ import org.apache.spark.SparkConf object InMemoryRelation { def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, child) + new InMemoryRelation(child.output, useCompression, child)() } private[sql] case class InMemoryRelation( output: Seq[Attribute], useCompression: Boolean, child: SparkPlan) + (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null) extends LogicalPlan with MultiInstanceRelation { - override def children = Seq.empty - override def references = Set.empty - - override def newInstance() = - new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type] - - lazy val cachedColumnBuffers = { + // If the cached column buffers were not passed in, we calculate them in the constructor. + // As in Spark, the actual work of caching is lazy. + if (_cachedColumnBuffers == null) { val output = child.output val cached = child.execute().mapPartitions { iterator => val columnBuilders = output.map { attribute => @@ -62,10 +62,23 @@ private[sql] case class InMemoryRelation( }.cache() cached.setName(child.toString) - // Force the materialization of the cached RDD. - cached.count() - cached + _cachedColumnBuffers = cached } + + + override def children = Seq.empty + + override def references = Set.empty + + override def newInstance() = { + new InMemoryRelation( + output.map(_.newInstance), + useCompression, + child)( + _cachedColumnBuffers).asInstanceOf[this.type] + } + + def cachedColumnBuffers = _cachedColumnBuffers } private[sql] case class InMemoryColumnarTableScan(
