Repository: spark
Updated Branches:
  refs/heads/branch-1.0 3962abaf9 -> 57526e40a


[SPARK-2135][SQL] Use planner for in-memory scans

Author: Michael Armbrust <mich...@databricks.com>

Closes #1072 from marmbrus/cachedStars and squashes the following commits:

8757c8e [Michael Armbrust] Use planner for in-memory scans.

(cherry picked from commit 13f8cfdc04589b986554310965e83fe658085683)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57526e40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57526e40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57526e40

Branch: refs/heads/branch-1.0
Commit: 57526e40a52323ffccfc79193c04eccdc60e4653
Parents: 3962aba
Author: Michael Armbrust <mich...@databricks.com>
Authored: Thu Jun 12 23:09:41 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Thu Jun 12 23:10:08 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala | 14 +++----
 .../columnar/InMemoryColumnarTableScan.scala    | 39 ++++++++++++++++----
 .../apache/spark/sql/execution/SparkPlan.scala  |  2 -
 .../spark/sql/execution/SparkStrategies.scala   | 13 +++++++
 .../org/apache/spark/sql/CachedTableSuite.scala | 15 +++++---
 .../columnar/InMemoryColumnarQuerySuite.scala   |  6 +--
 .../org/apache/spark/sql/hive/HiveContext.scala |  1 +
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  7 ++--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  7 ++--
 .../spark/sql/hive/CachedTableSuite.scala       |  6 +--
 10 files changed, 75 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 81c5006..38fc6b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.InMemoryRelation
 
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.SparkStrategies
@@ -166,10 +166,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
     val useCompression =
       
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", 
false)
     val asInMemoryRelation =
-      InMemoryColumnarTableScan(
-        currentTable.output, executePlan(currentTable).executedPlan, 
useCompression)
+      InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
 
-    catalog.registerTable(None, tableName, 
SparkLogicalPlan(asInMemoryRelation))
+    catalog.registerTable(None, tableName, asInMemoryRelation)
   }
 
   /** Removes the specified table from the in-memory cache. */
@@ -177,11 +176,11 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
     EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
       // This is kind of a hack to make sure that if this was just an RDD 
registered as a table,
       // we reregister the RDD as a table.
-      case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: 
ExistingRdd, _)) =>
+      case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
         catalog.registerTable(None, tableName, SparkLogicalPlan(e))
-      case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
+      case inMem: InMemoryRelation =>
         inMem.cachedColumnBuffers.unpersist()
         catalog.unregisterTable(None, tableName)
       case plan => throw new IllegalArgumentException(s"Table $tableName is 
not cached: $plan")
@@ -192,7 +191,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def isCached(tableName: String): Boolean = {
     val relation = catalog.lookupRelation(None, tableName)
     EliminateAnalysisOperators(relation) match {
-      case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+      case _: InMemoryRelation => true
       case _ => false
     }
   }
@@ -208,6 +207,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       PartialAggregation ::
       LeftSemiJoin ::
       HashJoin ::
+      InMemoryScans ::
       ParquetOperations ::
       BasicOperators ::
       CartesianProduct ::

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index fdf28e1..e1e4f24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -17,18 +17,29 @@
 
 package org.apache.spark.sql.columnar
 
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
 import org.apache.spark.sql.Row
 import org.apache.spark.SparkConf
 
-private[sql] case class InMemoryColumnarTableScan(
-    attributes: Seq[Attribute],
-    child: SparkPlan,
-    useCompression: Boolean)
-  extends LeafNode {
+object InMemoryRelation {
+  def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
+    new InMemoryRelation(child.output, useCompression, child)
+}
 
-  override def output: Seq[Attribute] = attributes
+private[sql] case class InMemoryRelation(
+    output: Seq[Attribute],
+    useCompression: Boolean,
+    child: SparkPlan)
+  extends LogicalPlan with MultiInstanceRelation {
+
+  override def children = Seq.empty
+  override def references = Set.empty
+
+  override def newInstance() =
+    new InMemoryRelation(output.map(_.newInstance), useCompression, 
child).asInstanceOf[this.type]
 
   lazy val cachedColumnBuffers = {
     val output = child.output
@@ -55,14 +66,26 @@ private[sql] case class InMemoryColumnarTableScan(
     cached.count()
     cached
   }
+}
+
+private[sql] case class InMemoryColumnarTableScan(
+    attributes: Seq[Attribute],
+    relation: InMemoryRelation)
+  extends LeafNode {
+
+  override def output: Seq[Attribute] = attributes
 
   override def execute() = {
-    cachedColumnBuffers.mapPartitions { iterator =>
+    relation.cachedColumnBuffers.mapPartitions { iterator =>
       val columnBuffers = iterator.next()
       assert(!iterator.hasNext)
 
       new Iterator[Row] {
-        val columnAccessors = columnBuffers.map(ColumnAccessor(_))
+        // Find the ordinals of the requested columns.  If none are requested, 
use the first.
+        val requestedColumns =
+          if (attributes.isEmpty) Seq(0) else 
attributes.map(relation.output.indexOf(_))
+
+        val columnAccessors = 
requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
         val nextRow = new GenericMutableRow(columnAccessors.length)
 
         override def next() = {

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 4613df1..07967fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -77,8 +77,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
     SparkLogicalPlan(
       alreadyPlanned match {
         case ExistingRdd(output, rdd) => 
ExistingRdd(output.map(_.newInstance), rdd)
-        case scan @ InMemoryColumnarTableScan(output, _, _) =>
-          scan.copy(attributes = output.map(_.newInstance))
         case _ => sys.error("Multiple instance of the same relation detected.")
       }).asInstanceOf[this.type]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 579e50a..1039be5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.parquet._
+import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SQLContext#SparkPlanner =>
@@ -166,6 +167,18 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
   }
 
+  object InMemoryScans extends Strategy {
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
+        pruneFilterProject(
+          projectList,
+          filters,
+          identity[Seq[Expression]], // No filters are pushed down.
+          InMemoryColumnarTableScan(_, mem)) :: Nil
+      case _ => Nil
+    }
+  }
+
   // Can we automate these 'pass through' operations?
   object BasicOperators extends Strategy {
     def numPartitions = self.numPartitions

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index ebca3ad..c794da4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
-import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
 import org.apache.spark.sql.test.TestSQLContext
 
 class CachedTableSuite extends QueryTest {
@@ -34,7 +33,7 @@ class CachedTableSuite extends QueryTest {
     )
 
     TestSQLContext.table("testData").queryExecution.analyzed match {
-      case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found 
evidence of caching
+      case _ : InMemoryRelation => // Found evidence of caching
       case noCache => fail(s"No cache node found in plan $noCache")
     }
 
@@ -46,7 +45,7 @@ class CachedTableSuite extends QueryTest {
     )
 
     TestSQLContext.table("testData").queryExecution.analyzed match {
-      case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+      case cachePlan: InMemoryRelation =>
         fail(s"Table still cached after uncache: $cachePlan")
       case noCache => // Table uncached successfully
     }
@@ -61,13 +60,17 @@ class CachedTableSuite extends QueryTest {
   test("SELECT Star Cached Table") {
     TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
     TestSQLContext.cacheTable("selectStar")
-    TestSQLContext.sql("SELECT * FROM selectStar")
+    TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
     TestSQLContext.uncacheTable("selectStar")
   }
 
   test("Self-join cached") {
+    val unCachedAnswer =
+      TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = 
b.key").collect()
     TestSQLContext.cacheTable("testData")
-    TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = 
b.key")
+    checkAnswer(
+      TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = 
b.key"),
+      unCachedAnswer.toSeq)
     TestSQLContext.uncacheTable("testData")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 31c5dfb..86727b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("simple columnar query") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
+    val scan = InMemoryRelation(useCompression = true, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
   }
 
   test("projection") {
     val plan = TestSQLContext.executePlan(testData.select('value, 
'key).logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
+    val scan = InMemoryRelation(useCompression = true, plan)
 
     checkAnswer(scan, testData.collect().map {
       case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed 
multiple times") {
     val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
-    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, 
true))
+    val scan = InMemoryRelation(useCompression = true, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
     checkAnswer(scan, testData.collect().toSeq)

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 6497821..9cd13f6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -230,6 +230,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       CommandStrategy(self),
       TakeOrdered,
       ParquetOperations,
+      InMemoryScans,
       HiveTableScans,
       DataSinks,
       Scripts,

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index a91b520..e9e6497 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.execution.SparkLogicalPlan
 import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable}
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
 
 /* Implicit conversions */
 import scala.collection.JavaConversions._
@@ -130,8 +130,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
       case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
         castChildOutput(p, table, child)
 
-      case p @ 
logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
-        _, HiveTableScan(_, table, _), _)), _, child, _) =>
+      case p @ logical.InsertIntoTable(
+                 InMemoryRelation(_, _,
+                   HiveTableScan(_, table, _)), _, child, _) =>
         castChildOutput(p, table, child)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b215707..d1aa8c8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.execution._
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.InMemoryRelation
 
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
@@ -44,8 +44,9 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, 
overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
-      case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
-        _, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
+      case logical.InsertIntoTable(
+             InMemoryRelation(_, _,
+               HiveTableScan(_, table, _)), partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), 
overwrite)(hiveContext) :: Nil
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/57526e40/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 91ac03c..3132d01 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.execution.SparkLogicalPlan
-import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
 import org.apache.spark.sql.hive.execution.HiveComparisonTest
 import org.apache.spark.sql.hive.test.TestHive
 
@@ -34,7 +34,7 @@ class CachedTableSuite extends HiveComparisonTest {
 
   test("check that table is cached and uncache") {
     TestHive.table("src").queryExecution.analyzed match {
-      case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found 
evidence of caching
+      case _ : InMemoryRelation => // Found evidence of caching
       case noCache => fail(s"No cache node found in plan $noCache")
     }
     TestHive.uncacheTable("src")
@@ -45,7 +45,7 @@ class CachedTableSuite extends HiveComparisonTest {
 
   test("make sure table is uncached") {
     TestHive.table("src").queryExecution.analyzed match {
-      case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
+      case cachePlan: InMemoryRelation =>
         fail(s"Table still cached after uncache: $cachePlan")
       case noCache => // Table uncached successfully
     }

Reply via email to