This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new ad29290a02f [SPARK-44079][SQL][3.4] Fix 
`ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE 
mode with corrupt record
ad29290a02f is described below

commit ad29290a02fb94a958fd21e301100338c9f5b82a
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Thu Jun 29 16:38:02 2023 +0300

    [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse 
array as struct using PERMISSIVE mode with corrupt record
    
    ### What changes were proposed in this pull request?
    cherry pick #41662 , fix  parse array as struct bug on branch 3.4
    ### Why are the changes needed?
    Fix the bug when parse array as struct using PERMISSIVE mode with corrupt 
record
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    add new test.
    
    Closes #41784 from Hisoka-X/SPARK-44079_3.4_cherry_pick.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala     |  4 ++--
 .../spark/sql/catalyst/json/JacksonParser.scala      | 20 +++++++++++++++-----
 .../spark/sql/catalyst/util/BadRecordException.scala | 14 ++++++++++++--
 .../spark/sql/catalyst/util/FailureSafeParser.scala  |  9 +++++++--
 .../sql/execution/datasources/json/JsonSuite.scala   | 15 +++++++++++++++
 5 files changed, 51 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 42e03630b14..b58649da61c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -318,7 +318,7 @@ class UnivocityParser(
     if (tokens == null) {
       throw BadRecordException(
         () => getCurrentInput,
-        () => None,
+        () => Array.empty,
         QueryExecutionErrors.malformedCSVRecordError(""))
     }
 
@@ -362,7 +362,7 @@ class UnivocityParser(
     } else {
       if (badRecordException.isDefined) {
         throw BadRecordException(
-          () => currentInput, () => requiredRow.headOption, 
badRecordException.get)
+          () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
       } else {
         requiredRow
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bf07d65caa0..d9bff3dc7ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -135,7 +135,7 @@ class JacksonParser(
         // List([str_a_2,null], [null,str_b_3])
         //
       case START_ARRAY if allowArrayAsStructs =>
-        val array = convertArray(parser, elementConverter, isRoot = true)
+        val array = convertArray(parser, elementConverter, isRoot = true, 
arrayAsStructs = true)
         // Here, as we support reading top level JSON arrays and take every 
element
         // in such an array as a row, this case is possible.
         if (array.numElements() == 0) {
@@ -517,7 +517,8 @@ class JacksonParser(
   private def convertArray(
       parser: JsonParser,
       fieldConverter: ValueConverter,
-      isRoot: Boolean = false): ArrayData = {
+      isRoot: Boolean = false,
+      arrayAsStructs: Boolean = false): ArrayData = {
     val values = ArrayBuffer.empty[Any]
     var badRecordException: Option[Throwable] = None
 
@@ -537,6 +538,9 @@ class JacksonParser(
 
     if (badRecordException.isEmpty) {
       arrayData
+    } else if (arrayAsStructs) {
+      throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
+        badRecordException.get)
     } else {
       throw PartialResultException(InternalRow(arrayData), 
badRecordException.get)
     }
@@ -570,7 +574,7 @@ class JacksonParser(
         // JSON parser currently doesn't support partial results for corrupted 
records.
         // For such records, all fields other than the field configured by
         // `columnNameOfCorruptRecord` are set to `null`.
-        throw BadRecordException(() => recordLiteral(record), () => None, e)
+        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty, e)
       case e: CharConversionException if options.encoding.isEmpty =>
         val msg =
           """JSON parser cannot handle a character in its input.
@@ -578,11 +582,17 @@ class JacksonParser(
             |""".stripMargin + e.getMessage
         val wrappedCharException = new CharConversionException(msg)
         wrappedCharException.initCause(e)
-        throw BadRecordException(() => recordLiteral(record), () => None, 
wrappedCharException)
+        throw BadRecordException(() => recordLiteral(record), () => 
Array.empty,
+          wrappedCharException)
       case PartialResultException(row, cause) =>
         throw BadRecordException(
           record = () => recordLiteral(record),
-          partialResult = () => Some(row),
+          partialResults = () => Array(row),
+          cause)
+      case PartialResultArrayException(rows, cause) =>
+        throw BadRecordException(
+          record = () => recordLiteral(record),
+          partialResults = () => rows,
           cause)
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 67defe78a6c..005f32dd869 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -30,14 +30,24 @@ case class PartialResultException(
      cause: Throwable)
   extends Exception(cause)
 
+/**
+ * Exception thrown when the underlying parser returns partial result list of 
parsing.
+ * @param partialResults the partial result list of parsing bad records.
+ * @param cause the actual exception about why the parser cannot return full 
result.
+ */
+case class PartialResultArrayException(
+     partialResults: Array[InternalRow],
+     cause: Throwable)
+  extends Exception(cause)
+
 /**
  * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
  * @param record a function to return the record that cause the parser to fail
- * @param partialResult a function that returns an optional row, which is the 
partial result of
+ * @param partialResults a function that returns an row array, which is the 
partial results of
  *                      parsing this bad record.
  * @param cause the actual exception about why the record is bad and can't be 
parsed.
  */
 case class BadRecordException(
     @transient record: () => UTF8String,
-    @transient partialResult: () => Option[InternalRow],
+    @transient partialResults: () => Array[InternalRow] = () => 
Array.empty[InternalRow],
     cause: Throwable) extends Exception(cause)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
index fcdcd21b6dc..28e81845f25 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -61,12 +61,17 @@ class FailureSafeParser[IN](
     } catch {
       case e: BadRecordException => mode match {
         case PermissiveMode =>
-          Iterator(toResultRow(e.partialResult(), e.record))
+          val partialResults = e.partialResults()
+          if (partialResults.nonEmpty) {
+            partialResults.iterator.map(row => toResultRow(Some(row), 
e.record))
+          } else {
+            Iterator(toResultRow(None, e.record))
+          }
         case DropMalformedMode =>
           Iterator.empty
         case FailFastMode =>
           throw 
QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
-            toResultRow(e.partialResult(), e.record).toString, e)
+            toResultRow(e.partialResults().headOption, e.record).toString, e)
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 1f9a2da5dd7..f34059b22a4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3225,6 +3225,21 @@ abstract class JsonSuite
       Row(null) :: Nil)
   }
 
+  test("SPARK-44079: fix incorrect result when parse array as struct " +
+    "using PERMISSIVE mode with corrupt record") {
+    val data = """[{"a": "incorrect", "b": "correct"}, {"a": "incorrect", "b": 
"correct"}]"""
+    val schema = new StructType(Array(StructField("a", IntegerType),
+      StructField("b", StringType), StructField("_corrupt_record", 
StringType)))
+
+    val result = spark.read
+      .option("mode", "PERMISSIVE")
+      .option("multiline", "true")
+      .schema(schema)
+      .json(Seq(data).toDS())
+
+    checkAnswer(result, Seq(Row(null, "correct", data), Row(null, "correct", 
data)))
+  }
+
   test("SPARK-36536: use casting when datetime pattern is not set") {
     withSQLConf(
       SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to