Repository: spark
Updated Branches:
  refs/heads/master ed72badb0 -> 34c4b9c57


[SPARK-23765][SQL] Supports custom line separator for json datasource

## What changes were proposed in this pull request?

This PR proposes to add lineSep option for a configurable line separator in 
text datasource.
It supports this option by using `LineRecordReader`'s functionality with 
passing it to the constructor.

The approach is similar with https://github.com/apache/spark/pull/20727; 
however, one main difference is, it uses text datasource's `lineSep` option to 
parse line by line in JSON's schema inference.

## How was this patch tested?

Manually tested and unit tests were added.

Author: hyukjinkwon <[email protected]>
Author: hyukjinkwon <[email protected]>

Closes #20877 from HyukjinKwon/linesep-json.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34c4b9c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34c4b9c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34c4b9c5

Branch: refs/heads/master
Commit: 34c4b9c57e114cdb390e4dbc7383284d82fea317
Parents: ed72bad
Author: hyukjinkwon <[email protected]>
Authored: Wed Mar 28 19:49:27 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Mar 28 19:49:27 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 14 +++--
 python/pyspark/sql/streaming.py                 |  6 +-
 python/pyspark/sql/tests.py                     | 17 +++++
 .../spark/sql/catalyst/json/JSONOptions.scala   | 11 ++++
 .../sql/catalyst/json/JacksonGenerator.scala    |  8 ++-
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +
 .../datasources/json/JsonDataSource.scala       | 17 +++--
 .../datasources/text/TextOptions.scala          |  2 +-
 .../spark/sql/streaming/DataStreamReader.scala  |  2 +
 .../execution/datasources/json/JsonSuite.scala  | 66 +++++++++++++++++++-
 .../execution/datasources/text/TextSuite.scala  |  4 +-
 12 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 4f9b938..6bd79bc 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -176,7 +176,7 @@ class DataFrameReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
-             multiLine=None, allowUnquotedControlChars=None):
+             multiLine=None, allowUnquotedControlChars=None, lineSep=None):
         """
         Loads JSON files and returns the results as a :class:`DataFrame`.
 
@@ -237,6 +237,8 @@ class DataFrameReader(OptionUtils):
         :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
                                           characters (ASCII characters with 
value less than 32,
                                           including tab and line feed 
characters) or not.
+        :param lineSep: defines the line separator that should be used for 
parsing. If None is
+                        set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
 
         >>> df1 = spark.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes
@@ -254,7 +256,7 @@ class DataFrameReader(OptionUtils):
             
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
             timestampFormat=timestampFormat, multiLine=multiLine,
-            allowUnquotedControlChars=allowUnquotedControlChars)
+            allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep)
         if isinstance(path, basestring):
             path = [path]
         if type(path) == list:
@@ -746,7 +748,8 @@ class DataFrameWriter(OptionUtils):
         self._jwrite.saveAsTable(name)
 
     @since(1.4)
-    def json(self, path, mode=None, compression=None, dateFormat=None, 
timestampFormat=None):
+    def json(self, path, mode=None, compression=None, dateFormat=None, 
timestampFormat=None,
+             lineSep=None):
         """Saves the content of the :class:`DataFrame` in JSON format
         (`JSON Lines text format or newline-delimited JSON 
<http://jsonlines.org/>`_) at the
         specified path.
@@ -770,12 +773,15 @@ class DataFrameWriter(OptionUtils):
                                 formats follow the formats at 
``java.text.SimpleDateFormat``.
                                 This applies to timestamp type. If None is 
set, it uses the
                                 default value, 
``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
+        :param lineSep: defines the line separator that should be used for 
writing. If None is
+                        set, it uses the default value, ``\\n``.
 
         >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode)
         self._set_opts(
-            compression=compression, dateFormat=dateFormat, 
timestampFormat=timestampFormat)
+            compression=compression, dateFormat=dateFormat, 
timestampFormat=timestampFormat,
+            lineSep=lineSep)
         self._jwrite.json(path)
 
     @since(1.4)

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index c7907aa..15f9407 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -405,7 +405,7 @@ class DataStreamReader(OptionUtils):
              allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
              allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
              mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
-             multiLine=None,  allowUnquotedControlChars=None):
+             multiLine=None,  allowUnquotedControlChars=None, lineSep=None):
         """
         Loads a JSON file stream and returns the results as a 
:class:`DataFrame`.
 
@@ -468,6 +468,8 @@ class DataStreamReader(OptionUtils):
         :param allowUnquotedControlChars: allows JSON Strings to contain 
unquoted control
                                           characters (ASCII characters with 
value less than 32,
                                           including tab and line feed 
characters) or not.
+        :param lineSep: defines the line separator that should be used for 
parsing. If None is
+                        set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
 
         >>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = 
sdf_schema)
         >>> json_sdf.isStreaming
@@ -482,7 +484,7 @@ class DataStreamReader(OptionUtils):
             
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
             mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, 
dateFormat=dateFormat,
             timestampFormat=timestampFormat, multiLine=multiLine,
-            allowUnquotedControlChars=allowUnquotedControlChars)
+            allowUnquotedControlChars=allowUnquotedControlChars, 
lineSep=lineSep)
         if isinstance(path, basestring):
             return self._df(self._jreader.json(path))
         else:

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 01c5dd6..5181053 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -676,6 +676,23 @@ class SQLTests(ReusedSQLTestCase):
                                             multiLine=True)
         self.assertEqual(people1.collect(), people_array.collect())
 
+    def test_linesep_json(self):
+        df = self.spark.read.json("python/test_support/sql/people.json", 
lineSep=",")
+        expected = [Row(_corrupt_record=None, name=u'Michael'),
+                    Row(_corrupt_record=u' "age":30}\n{"name":"Justin"', 
name=None),
+                    Row(_corrupt_record=u' "age":19}\n', name=None)]
+        self.assertEqual(df.collect(), expected)
+
+        tpath = tempfile.mkdtemp()
+        shutil.rmtree(tpath)
+        try:
+            df = self.spark.read.json("python/test_support/sql/people.json")
+            df.write.json(tpath, lineSep="!!")
+            readback = self.spark.read.json(tpath, lineSep="!!")
+            self.assertEqual(readback.collect(), df.collect())
+        finally:
+            shutil.rmtree(tpath)
+
     def test_multiline_csv(self):
         ages_newlines = self.spark.read.csv(
             "python/test_support/sql/ages_newlines.csv", multiLine=True)

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 652412b..5c9adc3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.json
 
+import java.nio.charset.StandardCharsets
 import java.util.{Locale, TimeZone}
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
@@ -85,6 +86,16 @@ private[sql] class JSONOptions(
 
   val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
 
+  val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
+    require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
+    sep
+  }
+  // Note that the option 'lineSep' uses a different default value in read and 
write.
+  val lineSeparatorInRead: Option[Array[Byte]] =
+    lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
+  // Note that JSON uses writer with UTF-8 charset. This string will be 
written out as UTF-8.
+  val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
+
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {
     factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index eb06e4f..9c413de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.json
 
 import java.io.Writer
+import java.nio.charset.StandardCharsets
 
 import com.fasterxml.jackson.core._
 
@@ -74,6 +75,8 @@ private[sql] class JacksonGenerator(
 
   private val gen = new 
JsonFactory().createGenerator(writer).setRootValueSeparator(null)
 
+  private val lineSeparator: String = options.lineSeparatorInWrite
+
   private def makeWriter(dataType: DataType): ValueWriter = dataType match {
     case NullType =>
       (row: SpecializedGetters, ordinal: Int) =>
@@ -251,5 +254,8 @@ private[sql] class JacksonGenerator(
       mapType = dataType.asInstanceOf[MapType]))
   }
 
-  def writeLineEnding(): Unit = gen.writeRaw('\n')
+  def writeLineEnding(): Unit = {
+    // Note that JSON uses writer with UTF-8 charset. This string will be 
written out as UTF-8.
+    gen.writeRaw(lineSeparator)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1a5e475..ae3ba16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -366,6 +366,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * <li>`multiLine` (default `false`): parse one record, which may span 
multiple lines,
    * per file</li>
+   * <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the 
line separator
+   * that should be used for parsing.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index bb93889..bbc0631 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -518,6 +518,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
    * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the 
string that
    * indicates a timestamp format. Custom date formats follow the formats at
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+   * <li>`lineSep` (default `\n`): defines the line separator that should
+   * be used for writing.</li>
    * </ul>
    *
    * @since 1.4.0

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 77e7edc..5769c09 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, 
Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, 
TextOptions}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -92,7 +92,8 @@ object TextInputJsonDataSource extends JsonDataSource {
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): StructType = {
-    val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths)
+    val json: Dataset[String] = createBaseDataset(
+      sparkSession, inputPaths, parsedOptions.lineSeparator)
     inferFromDataset(json, parsedOptions)
   }
 
@@ -104,13 +105,19 @@ object TextInputJsonDataSource extends JsonDataSource {
 
   private def createBaseDataset(
       sparkSession: SparkSession,
-      inputPaths: Seq[FileStatus]): Dataset[String] = {
+      inputPaths: Seq[FileStatus],
+      lineSeparator: Option[String]): Dataset[String] = {
+    val textOptions = lineSeparator.map { lineSep =>
+      Map(TextOptions.LINE_SEPARATOR -> lineSep)
+    }.getOrElse(Map.empty[String, String])
+
     val paths = inputPaths.map(_.getPath.toString)
     sparkSession.baseRelationToDataFrame(
       DataSource.apply(
         sparkSession,
         paths = paths,
-        className = classOf[TextFileFormat].getName
+        className = classOf[TextFileFormat].getName,
+        options = textOptions
       ).resolveRelation(checkFilesExist = false))
       .select("value").as(Encoders.STRING)
   }
@@ -120,7 +127,7 @@ object TextInputJsonDataSource extends JsonDataSource {
       file: PartitionedFile,
       parser: JacksonParser,
       schema: StructType): Iterator[InternalRow] = {
-    val linesReader = new HadoopFileLinesReader(file, conf)
+    val linesReader = new HadoopFileLinesReader(file, 
parser.options.lineSeparatorInRead, conf)
     Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
linesReader.close()))
     val safeParser = new FailureSafeParser[Text](
       input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index 18698df9..5c1a354 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -52,7 +52,7 @@ private[text] class TextOptions(@transient private val 
parameters: CaseInsensiti
     lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
 }
 
-private[text] object TextOptions {
+private[datasources] object TextOptions {
   val COMPRESSION = "compression"
   val WHOLETEXT = "wholetext"
   val LINE_SEPARATOR = "lineSep"

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9b17406..ae93965 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -268,6 +268,8 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
    * <li>`multiLine` (default `false`): parse one record, which may span 
multiple lines,
    * per file</li>
+   * <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the 
line separator
+   * that should be used for parsing.</li>
    * </ul>
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 8c8d41e..10bac05 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.json
 
 import java.io.{File, StringWriter}
 import java.nio.charset.StandardCharsets
+import java.nio.file.Files
 import java.sql.{Date, Timestamp}
 import java.util.Locale
 
@@ -27,7 +28,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
 import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, TestUtils}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
@@ -2063,4 +2064,67 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       )
     }
   }
+
+  def testLineSeparator(lineSep: String): Unit = {
+    test(s"SPARK-21289: Support line separator - lineSep: '$lineSep'") {
+      // Read
+      val data =
+        s"""
+          |  {"f":
+          |"a", "f0": 1}$lineSep{"f":
+          |
+          |"c",  "f0": 2}$lineSep{"f": "d",  "f0": 3}
+        """.stripMargin
+      val dataWithTrailingLineSep = s"$data$lineSep"
+
+      Seq(data, dataWithTrailingLineSep).foreach { lines =>
+        withTempPath { path =>
+          Files.write(path.toPath, lines.getBytes(StandardCharsets.UTF_8))
+          val df = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
+          val expectedSchema =
+            StructType(StructField("f", StringType) :: StructField("f0", 
LongType) :: Nil)
+          checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
+          assert(df.schema === expectedSchema)
+        }
+      }
+
+      // Write
+      withTempPath { path =>
+        Seq("a", "b", "c").toDF("value").coalesce(1)
+          .write.option("lineSep", lineSep).json(path.getAbsolutePath)
+        val partFile = TestUtils.recursiveList(path).filter(f => 
f.getName.startsWith("part-")).head
+        val readBack = new String(Files.readAllBytes(partFile.toPath), 
StandardCharsets.UTF_8)
+        assert(
+          readBack === 
s"""{"value":"a"}$lineSep{"value":"b"}$lineSep{"value":"c"}$lineSep""")
+      }
+
+      // Roundtrip
+      withTempPath { path =>
+        val df = Seq("a", "b", "c").toDF()
+        df.write.option("lineSep", lineSep).json(path.getAbsolutePath)
+        val readBack = spark.read.option("lineSep", 
lineSep).json(path.getAbsolutePath)
+        checkAnswer(df, readBack)
+      }
+    }
+  }
+
+  // scalastyle:off nonascii
+  Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep 
=>
+    testLineSeparator(lineSep)
+  }
+  // scalastyle:on nonascii
+
+  test("""SPARK-21289: Support line separator - default value \r, \r\n and 
\n""") {
+    val data =
+      "{\"f\": \"a\", \"f0\": 1}\r{\"f\": \"c\",  \"f0\": 2}\r\n{\"f\": \"d\", 
 \"f0\": 3}\n"
+
+    withTempPath { path =>
+      Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
+      val df = spark.read.json(path.getAbsolutePath)
+      val expectedSchema =
+        StructType(StructField("f", StringType) :: StructField("f0", LongType) 
:: Nil)
+      checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
+      assert(df.schema === expectedSchema)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34c4b9c5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index e8a5299..0e7f3af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -208,9 +208,11 @@ class TextSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString).foreach { lineSep =>
+  // scalastyle:off nonascii
+  Seq("|", "^", "::", "!!!@3", 0x1E.toChar.toString, "아").foreach { lineSep 
=>
     testLineSeparator(lineSep)
   }
+  // scalastyle:on nonascii
 
   private def testFile: String = {
     
Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to