Repository: spark
Updated Branches:
  refs/heads/master 0698e6c88 -> 65accb813


[SPARK-17029] make toJSON not go through rdd form but operate on dataset always

## What changes were proposed in this pull request?

Don't convert toRdd when doing toJSON
## How was this patch tested?

Existing unit tests

Author: Robert Kruszewski <robe...@palantir.com>

Closes #14615 from robert3005/robertk/correct-tojson.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/65accb81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/65accb81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/65accb81

Branch: refs/heads/master
Commit: 65accb813add9f58c1e9f1555863fe0bb1932ad8
Parents: 0698e6c
Author: Robert Kruszewski <robe...@palantir.com>
Authored: Thu May 11 15:26:48 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu May 11 15:26:48 2017 +0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/Dataset.scala     |  8 +++-----
 .../spark/sql/execution/datasources/json/JsonSuite.scala  | 10 ++++++++++
 2 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/65accb81/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 61154e2..c75921e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2806,7 +2806,7 @@ class Dataset[T] private[sql](
   def toJSON: Dataset[String] = {
     val rowSchema = this.schema
     val sessionLocalTimeZone = 
sparkSession.sessionState.conf.sessionLocalTimeZone
-    val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
+    mapPartitions { iter =>
       val writer = new CharArrayWriter()
       // create the Generator without separator inserted between 2 records
       val gen = new JacksonGenerator(rowSchema, writer,
@@ -2815,7 +2815,7 @@ class Dataset[T] private[sql](
       new Iterator[String] {
         override def hasNext: Boolean = iter.hasNext
         override def next(): String = {
-          gen.write(iter.next())
+          gen.write(exprEnc.toRow(iter.next()))
           gen.flush()
 
           val json = writer.toString
@@ -2828,9 +2828,7 @@ class Dataset[T] private[sql](
           json
         }
       }
-    }
-    import sparkSession.implicits.newStringEncoder
-    sparkSession.createDataset(rdd)
+    } (Encoders.STRING)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/65accb81/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 2ab0381..5e7f794 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.ExternalRDD
 import org.apache.spark.sql.execution.datasources.DataSource
 import 
org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
 import org.apache.spark.sql.internal.SQLConf
@@ -1326,6 +1327,15 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     )
   }
 
+  test("Dataset toJSON doesn't construct rdd") {
+    val containsRDD = spark.emptyDataFrame.toJSON.queryExecution.logical.find {
+      case ExternalRDD(_, _) => true
+      case _ => false
+    }
+
+    assert(containsRDD.isEmpty, "Expected logical plan of toJSON to not 
contain an RDD")
+  }
+
   test("JSONRelation equality test") {
     withTempPath(dir => {
       val path = dir.getCanonicalFile.toURI.toString


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to