Repository: spark
Updated Branches:
  refs/heads/master c7b29ae64 -> 03cca5dce


[SPARK-10770] [SQL] SparkPlan.executeCollect/executeTake should return 
InternalRow rather than external Row.

Author: Reynold Xin <[email protected]>

Closes #8900 from rxin/SPARK-10770-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03cca5dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03cca5dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03cca5dc

Branch: refs/heads/master
Commit: 03cca5dce2cd7618b5c0e33163efb8502415b06e
Parents: c7b29ae
Author: Reynold Xin <[email protected]>
Authored: Wed Sep 30 14:36:54 2015 -0400
Committer: Reynold Xin <[email protected]>
Committed: Wed Sep 30 14:36:54 2015 -0400

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../spark/sql/execution/LocalTableScan.scala    | 10 ++++-----
 .../apache/spark/sql/execution/SparkPlan.scala  | 22 ++++++++++++--------
 .../spark/sql/execution/basicOperators.scala    |  7 +++----
 .../apache/spark/sql/execution/commands.scala   | 13 ++++++------
 .../org/apache/spark/sql/execution/python.scala | 10 ++++-----
 .../spark/sql/execution/SparkPlanTest.scala     |  2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  6 +++---
 .../hive/execution/InsertIntoHiveTable.scala    |  6 +++---
 9 files changed, 39 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 9c67ad1..01f60ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1415,7 +1415,7 @@ class DataFrame private[sql](
    * @since 1.3.0
    */
   def collect(): Array[Row] = withNewExecutionId {
-    queryExecution.executedPlan.executeCollect()
+    queryExecution.executedPlan.executeCollectPublic()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 34e926e..adb6bbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -34,13 +34,11 @@ private[sql] case class LocalTableScan(
 
   protected override def doExecute(): RDD[InternalRow] = rdd
 
-  override def executeCollect(): Array[Row] = {
-    val converter = CatalystTypeConverters.createToScalaConverter(schema)
-    rows.map(converter(_).asInstanceOf[Row]).toArray
+  override def executeCollect(): Array[InternalRow] = {
+    rows.toArray
   }
 
-  override def executeTake(limit: Int): Array[Row] = {
-    val converter = CatalystTypeConverters.createToScalaConverter(schema)
-    rows.map(converter(_).asInstanceOf[Row]).take(limit).toArray
+  override def executeTake(limit: Int): Array[InternalRow] = {
+    rows.take(limit).toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 72f5450..fcb4204 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -170,11 +170,16 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   /**
    * Runs this query returning the result as an array.
    */
-  def executeCollect(): Array[Row] = {
-    execute().mapPartitions { iter =>
-      val converter = CatalystTypeConverters.createToScalaConverter(schema)
-      iter.map(converter(_).asInstanceOf[Row])
-    }.collect()
+  def executeCollect(): Array[InternalRow] = {
+    execute().map(_.copy()).collect()
+  }
+
+  /**
+   * Runs this query returning the result as an array, using external Row 
format.
+   */
+  def executeCollectPublic(): Array[Row] = {
+    val converter = CatalystTypeConverters.createToScalaConverter(schema)
+    executeCollect().map(converter(_).asInstanceOf[Row])
   }
 
   /**
@@ -182,9 +187,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    *
    * This is modeled after RDD.take but never runs any job locally on the 
driver.
    */
-  def executeTake(n: Int): Array[Row] = {
+  def executeTake(n: Int): Array[InternalRow] = {
     if (n == 0) {
-      return new Array[Row](0)
+      return new Array[InternalRow](0)
     }
 
     val childRDD = execute().map(_.copy())
@@ -218,8 +223,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
       partsScanned += numPartsToTry
     }
 
-    val converter = CatalystTypeConverters.createToScalaConverter(schema)
-    buf.toArray.map(converter(_).asInstanceOf[Row])
+    buf.toArray
   }
 
   private[this] def isTesting: Boolean = sys.props.contains("spark.testing")

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index bf6d44c..3e49e0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -204,7 +204,7 @@ case class Limit(limit: Int, child: SparkPlan)
   override def output: Seq[Attribute] = child.output
   override def outputPartitioning: Partitioning = SinglePartition
 
-  override def executeCollect(): Array[Row] = child.executeTake(limit)
+  override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
 
   protected override def doExecute(): RDD[InternalRow] = {
     val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if 
(sortBasedShuffleOn) {
@@ -258,9 +258,8 @@ case class TakeOrderedAndProject(
     projection.map(data.map(_)).getOrElse(data)
   }
 
-  override def executeCollect(): Array[Row] = {
-    val converter = CatalystTypeConverters.createToScalaConverter(schema)
-    collectData().map(converter(_).asInstanceOf[Row])
+  override def executeCollect(): Array[InternalRow] = {
+    collectData()
   }
 
   // TODO: Terminal split should be implemented differently from non-terminal 
split.

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index af28e2d..05ccc53 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -54,20 +54,21 @@ private[sql] case class ExecutedCommand(cmd: 
RunnableCommand) extends SparkPlan
    * The `execute()` method of all the physical command classes should 
reference `sideEffectResult`
    * so that the command can be executed eagerly right after the command query 
is created.
    */
-  protected[sql] lazy val sideEffectResult: Seq[Row] = cmd.run(sqlContext)
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
+    cmd.run(sqlContext).map(converter(_).asInstanceOf[InternalRow])
+  }
 
   override def output: Seq[Attribute] = cmd.output
 
   override def children: Seq[SparkPlan] = Nil
 
-  override def executeCollect(): Array[Row] = sideEffectResult.toArray
+  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
 
-  override def executeTake(limit: Int): Array[Row] = 
sideEffectResult.take(limit).toArray
+  override def executeTake(limit: Int): Array[InternalRow] = 
sideEffectResult.take(limit).toArray
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val convert = CatalystTypeConverters.createToCatalystConverter(schema)
-    val converted = sideEffectResult.map(convert(_).asInstanceOf[InternalRow])
-    sqlContext.sparkContext.parallelize(converted, 1)
+    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
   }
 
   override def argString: String = cmd.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
index d6aaf42..5dbe0fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python.scala
@@ -121,12 +121,10 @@ object EvaluatePython {
 
   def takeAndServe(df: DataFrame, n: Int): Int = {
     registerPicklers()
-    // This is an annoying hack - we should refactor the code so 
executeCollect and executeTake
-    // returns InternalRow rather than Row.
-    val converter = CatalystTypeConverters.createToCatalystConverter(df.schema)
-    val iter = new SerDeUtil.AutoBatchedPickler(df.take(n).iterator.map { row 
=>
-      EvaluatePython.toJava(converter(row).asInstanceOf[InternalRow], 
df.schema)
-    })
+    val iter = new SerDeUtil.AutoBatchedPickler(
+      df.queryExecution.executedPlan.executeTake(n).iterator.map { row =>
+        EvaluatePython.toJava(row, df.schema)
+      })
     PythonRDD.serveIterator(iter, s"serve-DataFrame")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 3d218f0..8549a6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -245,7 +245,7 @@ object SparkPlanTest {
           }
       }
     )
-    resolvedPlan.executeCollect().toSeq
+    resolvedPlan.executeCollectPublic().toSeq
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c75025e..17de8ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -563,7 +563,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
with Logging {
 
   /** Extends QueryExecution with hive specific features. */
   protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
-    extends super.QueryExecution(logicalPlan) {
+    extends org.apache.spark.sql.execution.QueryExecution(this, logicalPlan) {
 
     /**
      * Returns the result as a hive compatible sequence of strings.  For 
native commands, the
@@ -581,10 +581,10 @@ class HiveContext(sc: SparkContext) extends 
SQLContext(sc) with Logging {
               .mkString("\t")
         }
       case command: ExecutedCommand =>
-        command.executeCollect().map(_(0).toString)
+        command.executeCollect().map(_.getString(0))
 
       case other =>
-        val result: Seq[Seq[Any]] = other.executeCollect().map(_.toSeq).toSeq
+        val result: Seq[Seq[Any]] = 
other.executeCollectPublic().map(_.toSeq).toSeq
         // We need the types so we can output struct field names
         val types = analyzed.output.map(_.dataType)
         // Reformat to match hive tab delimited output.

http://git-wip-us.apache.org/repos/asf/spark/blob/03cca5dc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 0c700bd..f936cf5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -124,7 +124,7 @@ case class InsertIntoHiveTable(
    *
    * Note: this is run once and then kept to avoid double insertions.
    */
-  protected[sql] lazy val sideEffectResult: Seq[Row] = {
+  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
     // Have to pass the TableDesc object to RDD.mapPartitions and then 
instantiate new serializer
     // instances within the closure, since Serializer is not serializable 
while TableDesc is.
     val tableDesc = table.tableDesc
@@ -267,10 +267,10 @@ case class InsertIntoHiveTable(
     // however for now we return an empty list to simplify compatibility 
checks with hive, which
     // does not return anything for insert operations.
     // TODO: implement hive compatibility as rules.
-    Seq.empty[Row]
+    Seq.empty[InternalRow]
   }
 
-  override def executeCollect(): Array[Row] = sideEffectResult.toArray
+  override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
 
   protected override def doExecute(): RDD[InternalRow] = {
     
sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]],
 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to