[ 
https://issues.apache.org/jira/browse/SPARK-26303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716494#comment-16716494
 ] 

ASF GitHub Bot commented on SPARK-26303:
----------------------------------------

asfgit closed pull request #23253: [SPARK-26303][SQL] Return partial results 
for bad JSON records
URL: https://github.com/apache/spark/pull/23253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index ed2ff139bcc33..3c346be4c45d0 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, CSV datasource uses java.time API for parsing and 
generating CSV content. New formatting implementation supports date/timestamp 
patterns conformed to ISO 8601. To switch back to the implementation used in 
Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
 
-  - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV 
string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
+  - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV 
string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
+
+  - In Spark version 2.4 and earlier, JSON datasource and JSON functions like 
`from_json` convert a bad JSON record to a row with all `null`s in the 
PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of JSON column values were 
parsed and converted to desired types successfully.
 
 ## Upgrading From Spark SQL 2.3 to 2.4
 
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 1d2dd4d808930..7b10512a43294 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set 
a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
                   schema does not have the field, it drops corrupt records 
during parsing. \
@@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, 
quote=None, escape=Non
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set 
a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
                   schema does not have the field, it drops corrupt records 
during parsing. \
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index d92b0d5677e25..fc23b9d99c34a 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, 
prefersDecimal=None,
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set 
a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
                   schema does not have the field, it drops corrupt records 
during parsing. \
@@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, 
quote=None, escape=Non
                      set, it uses the default value, ``PERMISSIVE``.
 
                 * ``PERMISSIVE`` : when it meets a corrupted record, puts the 
malformed string \
-                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets other \
+                  into a field configured by ``columnNameOfCorruptRecord``, 
and sets malformed \
                   fields to ``null``. To keep corrupt records, an user can set 
a string type \
                   field named ``columnNameOfCorruptRecord`` in an user-defined 
schema. If a \
                   schema does not have the field, it drops corrupt records 
during parsing. \
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 2357595906b11..7e3bd4df51bb7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -22,6 +22,7 @@ import java.nio.charset.MalformedInputException
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
+import scala.util.control.NonFatal
 
 import com.fasterxml.jackson.core._
 
@@ -29,7 +30,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -347,17 +347,28 @@ class JacksonParser(
       schema: StructType,
       fieldConverters: Array[ValueConverter]): InternalRow = {
     val row = new GenericInternalRow(schema.length)
+    var badRecordException: Option[Throwable] = None
+
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
         case Some(index) =>
-          row.update(index, fieldConverters(index).apply(parser))
-
+          try {
+            row.update(index, fieldConverters(index).apply(parser))
+          } catch {
+            case NonFatal(e) =>
+              badRecordException = badRecordException.orElse(Some(e))
+              parser.skipChildren()
+          }
         case None =>
           parser.skipChildren()
       }
     }
 
-    row
+    if (badRecordException.isEmpty) {
+      row
+    } else {
+      throw PartialResultException(row, badRecordException.get)
+    }
   }
 
   /**
@@ -428,6 +439,11 @@ class JacksonParser(
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
         throw BadRecordException(() => recordLiteral(record), () => None, 
wrappedCharException)
+      case PartialResultException(row, cause) =>
+        throw BadRecordException(
+          record = () => recordLiteral(record),
+          partialResult = () => Some(row),
+          cause)
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 985f0dc1cd60e..d719a33929fcc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.unsafe.types.UTF8String
 
+/**
+ * Exception thrown when the underlying parser returns a partial result of 
parsing.
+ * @param partialResult the partial result of parsing a bad record.
+ * @param cause the actual exception about why the parser cannot return full 
result.
+ */
+case class PartialResultException(
+     partialResult: InternalRow,
+     cause: Throwable)
+  extends Exception(cause)
+
 /**
  * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
  * @param record a function to return the record that cause the parser to fail
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 661fe98d8c901..9751528654ffb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    * during parsing.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed 
fields to `null`. To
    *     keep corrupt records, an user can set a string type field named
    *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have the
    *     field, it drops corrupt records during parsing. When inferring a 
schema, it implicitly
@@ -598,13 +598,13 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To keep
-   *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. A record with less/more tokens than schema is not a 
corrupted record to
-   *     CSV. When it meets a record having fewer tokens than the length of 
the schema, sets
-   *     `null` to extra fields. When the record has more tokens than the 
length of the schema,
-   *     it drops extra tokens.</li>
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed 
fields to `null`.
+   *     To keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have
+   *     the field, it drops corrupt records during parsing. A record with 
less/more tokens
+   *     than schema is not a corrupted record to CSV. When it meets a record 
having fewer
+   *     tokens than the length of the schema, sets `null` to extra fields. 
When the record
+   *     has more tokens than the length of the schema, it drops extra 
tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index c8e3e1c191044..914fa90ae7e14 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -273,7 +273,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    * during parsing.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed 
fields to `null`. To
    *     keep corrupt records, an user can set a string type field named
    *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have the
    *     field, it drops corrupt records during parsing. When inferring a 
schema, it implicitly
@@ -360,13 +360,13 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
    *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the 
malformed string into a
-   *     field configured by `columnNameOfCorruptRecord`, and sets other 
fields to `null`. To keep
-   *     corrupt records, an user can set a string type field named 
`columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it 
drops corrupt records
-   *     during parsing. A record with less/more tokens than schema is not a 
corrupted record to
-   *     CSV. When it meets a record having fewer tokens than the length of 
the schema, sets
-   *     `null` to extra fields. When the record has more tokens than the 
length of the schema,
-   *     it drops extra tokens.</li>
+   *     field configured by `columnNameOfCorruptRecord`, and sets malformed 
fields to `null`.
+   *     To keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema 
does not have
+   *     the field, it drops corrupt records during parsing. A record with 
less/more tokens
+   *     than schema is not a corrupted record to CSV. When it meets a record 
having fewer
+   *     tokens than the length of the schema, sets `null` to extra fields. 
When the record
+   *     has more tokens than the length of the schema, it drops extra 
tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted 
records.</li>
    *   </ul>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index dff37ca2d40f0..3330de3584ebb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
 
     checkAnswer(
       sql("select nullstr, headers.Host from jsonTable"),
-      Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, 
null))
+      Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
     )
   }
 
@@ -2563,4 +2563,17 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       assert(!files.exists(_.getName.endsWith("json")))
     }
   }
+
+  test("return partial result for bad records") {
+    val schema = "a double, b array<int>, c string, _corrupt_record string"
+    val badRecords = Seq(
+      """{"a":"-","b":[0, 1, 2],"c":"abc"}""",
+      """{"a":0.1,"b":{},"c":"def"}""").toDS()
+    val df = spark.read.schema(schema).json(badRecords)
+
+    checkAnswer(
+      df,
+      Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 
2],"c":"abc"}""") ::
+      Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Return partial results for bad JSON records
> -------------------------------------------
>
>                 Key: SPARK-26303
>                 URL: https://issues.apache.org/jira/browse/SPARK-26303
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Maxim Gekk
>            Assignee: Maxim Gekk
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> Currently, JSON datasource and JSON functions return row with all null for a 
> malformed JSON string in the PERMISSIVE mode when specified schema has the 
> struct type. All nulls are returned even some of fields were parsed and 
> converted to desired types successfully. The ticket aims to solve the problem 
> by returning already parsed fields. The corrupted column specified via JSON 
> option `columnNameOfCorruptRecord` or SQL config should contain whole 
> original JSON string. 
> For example, if the input has one JSON string:
> {code:json}
> {"a":0.1,"b":{},"c":"def"}
> {code}
> and specified schema is:
> {code:sql}
> a DOUBLE, b ARRAY<INT>, c STRING, _corrupt_record STRIN
> {code}
> expected output of `from_json` in the PERMISSIVE mode:
> {code}
> +---+----+---+--------------------------+
> |a  |b   |c  |_corrupt_record           |
> +---+----+---+--------------------------+
> |0.1|null|def|{"a":0.1,"b":{},"c":"def"}|
> +---+----+---+--------------------------+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to