Repository: spark
Updated Branches:
  refs/heads/branch-1.3 748cdc15e -> fc0446f66


[SPARK-5683] [SQL] Avoid multiple json generator created

Author: Cheng Hao <hao.ch...@intel.com>

Closes #4468 from chenghao-intel/json and squashes the following commits:

aeb7801 [Cheng Hao] avoid multiple json generator created

(cherry picked from commit a60aea86b4d4b716b5ec3bff776b509fe0831342)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc0446f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc0446f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc0446f6

Branch: refs/heads/branch-1.3
Commit: fc0446f66063ea38e50e8a1d22a64d11642bbc64
Parents: 748cdc1
Author: Cheng Hao <hao.ch...@intel.com>
Authored: Tue Feb 10 18:19:56 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Feb 10 18:20:23 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameImpl.scala    | 24 ++++++++++++++++++--
 .../org/apache/spark/sql/json/JsonRDD.scala     | 13 +++--------
 .../org/apache/spark/sql/json/JsonSuite.scala   |  8 +++----
 3 files changed, 29 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fc0446f6/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 11f9334..0134b03 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.io.CharArrayWriter
+
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import scala.collection.JavaConversions._
@@ -380,8 +382,26 @@ private[sql] class DataFrameImpl protected[sql](
   override def toJSON: RDD[String] = {
     val rowSchema = this.schema
     this.mapPartitions { iter =>
-      val jsonFactory = new JsonFactory()
-      iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
+      val writer = new CharArrayWriter()
+      // create the Generator without separator inserted between 2 records
+      val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+      new Iterator[String] {
+        override def hasNext() = iter.hasNext
+        override def next(): String = {
+          JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
+          gen.flush()
+
+          val json = writer.toString
+          if (hasNext) {
+            writer.reset()
+          } else {
+            gen.close()
+          }
+
+          json
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fc0446f6/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 33ce71b..1043eef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -23,8 +23,7 @@ import java.sql.{Date, Timestamp}
 import scala.collection.Map
 import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
 
-import com.fasterxml.jackson.core.JsonProcessingException
-import com.fasterxml.jackson.core.JsonFactory
+import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException, 
JsonFactory}
 import com.fasterxml.jackson.databind.ObjectMapper
 
 import org.apache.spark.rdd.RDD
@@ -430,14 +429,11 @@ private[sql] object JsonRDD extends Logging {
 
   /** Transforms a single Row to JSON using Jackson
     *
-    * @param jsonFactory a JsonFactory object to construct a JsonGenerator
     * @param rowSchema the schema object used for conversion
+    * @param gen a JsonGenerator object
     * @param row The row to convert
     */
-  private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: 
JsonFactory)(row: Row): String = {
-    val writer = new StringWriter()
-    val gen = jsonFactory.createGenerator(writer)
-
+  private[sql] def rowToJSON(rowSchema: StructType, gen: JsonGenerator)(row: 
Row) = {
     def valWriter: (DataType, Any) => Unit = {
       case (_, null) | (NullType, _)  => gen.writeNull()
       case (StringType, v: String) => gen.writeString(v)
@@ -479,8 +475,5 @@ private[sql] object JsonRDD extends Logging {
     }
 
     valWriter(rowSchema, row)
-    gen.close()
-    writer.toString
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fc0446f6/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 7870cf9..4fc92e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -824,8 +824,8 @@ class JsonSuite extends QueryTest {
     df1.registerTempTable("applySchema1")
     val df2 = df1.toDataFrame
     val result = df2.toJSON.collect()
-    assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" 
A1\",\" true\",\" null\"]}")
-    assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" 
D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
+    assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" 
A1\",\" true\",\" null\"]}")
+    assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" 
D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
 
     val schema2 = StructType(
       StructField("f1", StructType(
@@ -846,8 +846,8 @@ class JsonSuite extends QueryTest {
     val df4 = df3.toDataFrame
     val result2 = df4.toJSON.collect()
 
-    assert(result2(1) == 
"{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
-    assert(result2(3) == 
"{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
+    assert(result2(1) === 
"{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
+    assert(result2(3) === 
"{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
 
     val jsonDF = jsonRDD(primitiveFieldAndType)
     val primTable = jsonRDD(jsonDF.toJSON)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to