Repository: spark
Updated Branches:
  refs/heads/master 8ea3f4eae -> 2877f1a52


[SPARK-16351][SQL] Avoid per-record type dispatch in JSON when writing

## What changes were proposed in this pull request?

Currently, `JacksonGenerator.apply` is doing type-based dispatch for each row 
to write appropriate values.
It might not have to be done like this because the schema is already kept.

So, appropriate writers can be created first according to the schema once, and 
then apply them to each row. This approach is similar with 
`CatalystWriteSupport`.

This PR corrects `JacksonGenerator` so that it creates all writers for the 
schema once and then applies them to each row rather than type dispatching for 
every row.

Benchmark was proceeded with the codes below:

```scala
test("Benchmark for JSON writer") {
  val N = 500 << 8
  val row =
    """{"struct":{"field1": true, "field2": 92233720368547758070},
      "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
      "arrayOfString":["str1", "str2"],
      "arrayOfInteger":[1, 2147483647, -2147483648],
      "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
      "arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
      "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 
2.2250738585072014E-308],
      "arrayOfBoolean":[true, false, true],
      "arrayOfNull":[null, null, null, null],
      "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, 
{"field3": null}],
      "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
      "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
     }"""
  val df = 
spark.sqlContext.read.json(spark.sparkContext.parallelize(List.fill(N)(row)))
  val benchmark = new Benchmark("JSON writer", N)
  benchmark.addCase("writing JSON file", 10) { _ =>
    withTempPath { path =>
      df.write.format("json").save(path.getCanonicalPath)
    }
  }
  benchmark.run()
}
```

This produced the results below

- **Before**

```
JSON writer:                             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
writing JSON file                             1675 / 1767          0.1       
13087.5       1.0X
```

- **After**

```
JSON writer:                             Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
writing JSON file                             1597 / 1686          0.1       
12477.1       1.0X
```

In addition, I ran this benchmark 10 times for each and calculated the average 
elapsed time as below:

| **Before** | **After**|
|---------------|------------|
|17478ms  |16669ms |

It seems roughly ~5% is improved.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #14028 from HyukjinKwon/SPARK-16351.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2877f1a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2877f1a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2877f1a5

Branch: refs/heads/master
Commit: 2877f1a5224c38c1fa0b85ef633ff935fae9dd83
Parents: 8ea3f4e
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Mon Jul 18 09:49:14 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jul 18 09:49:14 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    |   4 +-
 .../datasources/json/JacksonGenerator.scala     | 218 ++++++++++++++-----
 .../datasources/json/JsonFileFormat.scala       |   5 +-
 .../execution/datasources/json/JsonSuite.scala  |   3 -
 4 files changed, 163 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2877f1a5/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ed4ccdb..b28ecb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2489,12 +2489,12 @@ class Dataset[T] private[sql](
     val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
       val writer = new CharArrayWriter()
       // create the Generator without separator inserted between 2 records
-      val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+      val gen = new JacksonGenerator(rowSchema, writer)
 
       new Iterator[String] {
         override def hasNext: Boolean = iter.hasNext
         override def next(): String = {
-          JacksonGenerator(rowSchema, gen)(iter.next())
+          gen.write(iter.next())
           gen.flush()
 
           val json = writer.toString

http://git-wip-us.apache.org/repos/asf/spark/blob/2877f1a5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 8b920ec..23f4a55 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -17,74 +17,174 @@
 
 package org.apache.spark.sql.execution.datasources.json
 
+import java.io.Writer
+
 import com.fasterxml.jackson.core._
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
 import org.apache.spark.sql.types._
 
-private[sql] object JacksonGenerator {
-  /** Transforms a single InternalRow to JSON using Jackson
-   *
-   * TODO: make the code shared with the other apply method.
+private[sql] class JacksonGenerator(schema: StructType, writer: Writer) {
+  // A `ValueWriter` is responsible for writing a field of an `InternalRow` to 
appropriate
+  // JSON data. Here we are using `SpecializedGetters` rather than 
`InternalRow` so that
+  // we can directly access data in `ArrayData` without the help of 
`SpecificMutableRow`.
+  private type ValueWriter = (SpecializedGetters, Int) => Unit
+
+  // `ValueWriter`s for all fields of the schema
+  private val rootFieldWriters: Array[ValueWriter] = 
schema.map(_.dataType).map(makeWriter).toArray
+
+  private val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+  private def makeWriter(dataType: DataType): ValueWriter = dataType match {
+    case NullType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNull()
+
+    case BooleanType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBoolean(row.getBoolean(ordinal))
+
+    case ByteType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getByte(ordinal))
+
+    case ShortType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getShort(ordinal))
+
+    case IntegerType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getInt(ordinal))
+
+    case LongType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getLong(ordinal))
+
+    case FloatType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getFloat(ordinal))
+
+    case DoubleType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDouble(ordinal))
+
+    case StringType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeString(row.getUTF8String(ordinal).toString)
+
+    case TimestampType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        
gen.writeString(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString)
+
+    case DateType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeString(DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString)
+
+    case BinaryType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeBinary(row.getBinary(ordinal))
+
+    case dt: DecimalType =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        gen.writeNumber(row.getDecimal(ordinal, dt.precision, 
dt.scale).toJavaBigDecimal)
+
+    case st: StructType =>
+      val fieldWriters = st.map(_.dataType).map(makeWriter)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeFields(row.getStruct(ordinal, st.length), st, 
fieldWriters))
+
+    case at: ArrayType =>
+      val elementWriter = makeWriter(at.elementType)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeArray(writeArrayData(row.getArray(ordinal), elementWriter))
+
+    case mt: MapType =>
+      val valueWriter = makeWriter(mt.valueType)
+      (row: SpecializedGetters, ordinal: Int) =>
+        writeObject(writeMapData(row.getMap(ordinal), mt, valueWriter))
+
+    // For UDT values, they should be in the SQL type's corresponding value 
type.
+    // We should not see values in the user-defined class at here.
+    // For example, VectorUDT's SQL type is an array of double. So, we should 
expect that v is
+    // an ArrayData at here, instead of a Vector.
+    case t: UserDefinedType[_] =>
+      makeWriter(t.sqlType)
+
+    case _ =>
+      (row: SpecializedGetters, ordinal: Int) =>
+        val v = row.get(ordinal, dataType)
+        sys.error(s"Failed to convert value $v (class of ${v.getClass}}) " +
+          s"with the type of $dataType to JSON.")
+  }
+
+  private def writeObject(f: => Unit): Unit = {
+    gen.writeStartObject()
+    f
+    gen.writeEndObject()
+  }
+
+  private def writeFields(
+      row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): 
Unit = {
+    var i = 0
+    while (i < row.numFields) {
+      val field = schema(i)
+      if (!row.isNullAt(i)) {
+        gen.writeFieldName(field.name)
+        fieldWriters(i).apply(row, i)
+      }
+      i += 1
+    }
+  }
+
+  private def writeArray(f: => Unit): Unit = {
+    gen.writeStartArray()
+    f
+    gen.writeEndArray()
+  }
+
+  private def writeArrayData(
+      array: ArrayData, fieldWriter: ValueWriter): Unit = {
+    var i = 0
+    while (i < array.numElements()) {
+      if (!array.isNullAt(i)) {
+        fieldWriter.apply(array, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  private def writeMapData(
+      map: MapData, mapType: MapType, fieldWriter: ValueWriter): Unit = {
+    val keyArray = map.keyArray()
+    val valueArray = map.valueArray()
+    var i = 0
+    while (i < map.numElements()) {
+      gen.writeFieldName(keyArray.get(i, mapType.keyType).toString)
+      if (!valueArray.isNullAt(i)) {
+        fieldWriter.apply(valueArray, i)
+      } else {
+        gen.writeNull()
+      }
+      i += 1
+    }
+  }
+
+  def close(): Unit = gen.close()
+
+  def flush(): Unit = gen.flush()
+
+  /**
+   * Transforms a single InternalRow to JSON using Jackson
    *
-   * @param rowSchema the schema object used for conversion
-   * @param gen a JsonGenerator object
    * @param row The row to convert
    */
-  def apply(rowSchema: StructType, gen: JsonGenerator)(row: InternalRow): Unit 
= {
-    def valWriter: (DataType, Any) => Unit = {
-      case (_, null) | (NullType, _) => gen.writeNull()
-      case (StringType, v) => gen.writeString(v.toString)
-      case (TimestampType, v: Long) => 
gen.writeString(DateTimeUtils.toJavaTimestamp(v).toString)
-      case (IntegerType, v: Int) => gen.writeNumber(v)
-      case (ShortType, v: Short) => gen.writeNumber(v)
-      case (FloatType, v: Float) => gen.writeNumber(v)
-      case (DoubleType, v: Double) => gen.writeNumber(v)
-      case (LongType, v: Long) => gen.writeNumber(v)
-      case (DecimalType(), v: Decimal) => gen.writeNumber(v.toJavaBigDecimal)
-      case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
-      case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
-      case (BooleanType, v: Boolean) => gen.writeBoolean(v)
-      case (DateType, v: Int) => 
gen.writeString(DateTimeUtils.toJavaDate(v).toString)
-      // For UDT values, they should be in the SQL type's corresponding value 
type.
-      // We should not see values in the user-defined class at here.
-      // For example, VectorUDT's SQL type is an array of double. So, we 
should expect that v is
-      // an ArrayData at here, instead of a Vector.
-      case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v)
-
-      case (ArrayType(ty, _), v: ArrayData) =>
-        gen.writeStartArray()
-        v.foreach(ty, (_, value) => valWriter(ty, value))
-        gen.writeEndArray()
-
-      case (MapType(kt, vt, _), v: MapData) =>
-        gen.writeStartObject()
-        v.foreach(kt, vt, { (k, v) =>
-          gen.writeFieldName(k.toString)
-          valWriter(vt, v)
-        })
-        gen.writeEndObject()
-
-      case (StructType(ty), v: InternalRow) =>
-        gen.writeStartObject()
-        var i = 0
-        while (i < ty.length) {
-          val field = ty(i)
-          val value = v.get(i, field.dataType)
-          if (value != null) {
-            gen.writeFieldName(field.name)
-            valWriter(field.dataType, value)
-          }
-          i += 1
-        }
-        gen.writeEndObject()
-
-      case (dt, v) =>
-        sys.error(
-          s"Failed to convert value $v (class of ${v.getClass}}) with the type 
of $dt to JSON.")
+  def write(row: InternalRow): Unit = {
+    writeObject {
+      writeFields(row, schema, rootFieldWriters)
     }
-
-    valWriter(rowSchema, row)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2877f1a5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 86aef1f..adca8d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources.json
 
 import java.io.CharArrayWriter
 
-import com.fasterxml.jackson.core.JsonFactory
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
@@ -162,7 +161,7 @@ private[json] class JsonOutputWriter(
 
   private[this] val writer = new CharArrayWriter()
   // create the Generator without separator inserted between 2 records
-  private[this] val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+  private[this] val gen = new JacksonGenerator(dataSchema, writer)
   private[this] val result = new Text()
 
   private val recordWriter: RecordWriter[NullWritable, Text] = {
@@ -181,7 +180,7 @@ private[json] class JsonOutputWriter(
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-    JacksonGenerator(dataSchema, gen)(row)
+    gen.write(row)
     gen.flush()
 
     result.set(writer.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/2877f1a5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6c72019..a09f61a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -21,10 +21,7 @@ import java.io.{File, StringWriter}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 
-import scala.collection.JavaConverters._
-
 import com.fasterxml.jackson.core.JsonFactory
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to