This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new ad29290a02f [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record ad29290a02f is described below commit ad29290a02fb94a958fd21e301100338c9f5b82a Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Thu Jun 29 16:38:02 2023 +0300 [SPARK-44079][SQL][3.4] Fix `ArrayIndexOutOfBoundsException` when parse array as struct using PERMISSIVE mode with corrupt record ### What changes were proposed in this pull request? cherry pick #41662 , fix parse array as struct bug on branch 3.4 ### Why are the changes needed? Fix the bug when parse array as struct using PERMISSIVE mode with corrupt record ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #41784 from Hisoka-X/SPARK-44079_3.4_cherry_pick. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 ++-- .../spark/sql/catalyst/json/JacksonParser.scala | 20 +++++++++++++++----- .../spark/sql/catalyst/util/BadRecordException.scala | 14 ++++++++++++-- .../spark/sql/catalyst/util/FailureSafeParser.scala | 9 +++++++-- .../sql/execution/datasources/json/JsonSuite.scala | 15 +++++++++++++++ 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 42e03630b14..b58649da61c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -318,7 +318,7 @@ class UnivocityParser( if (tokens == null) { throw BadRecordException( () => getCurrentInput, - () => None, + () => Array.empty, QueryExecutionErrors.malformedCSVRecordError("")) } @@ -362,7 +362,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => requiredRow.headOption, badRecordException.get) + () => currentInput, () => Array(requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index bf07d65caa0..d9bff3dc7ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -135,7 +135,7 @@ class JacksonParser( // List([str_a_2,null], [null,str_b_3]) // case START_ARRAY if allowArrayAsStructs => - val array = convertArray(parser, elementConverter, isRoot = true) + val array = convertArray(parser, elementConverter, isRoot = true, arrayAsStructs = true) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { @@ -517,7 +517,8 @@ class JacksonParser( private def convertArray( parser: JsonParser, fieldConverter: ValueConverter, - isRoot: Boolean = false): ArrayData = { + isRoot: Boolean = false, + arrayAsStructs: Boolean = false): ArrayData = { val values = ArrayBuffer.empty[Any] var badRecordException: Option[Throwable] = None @@ -537,6 +538,9 @@ class JacksonParser( if (badRecordException.isEmpty) { arrayData + } else if (arrayAsStructs) { + throw PartialResultArrayException(arrayData.toArray[InternalRow](schema), + badRecordException.get) } else { throw PartialResultException(InternalRow(arrayData), badRecordException.get) } @@ -570,7 +574,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), () => None, e) + throw BadRecordException(() => recordLiteral(record), () => Array.empty, e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -578,11 +582,17 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + throw BadRecordException(() => recordLiteral(record), () => Array.empty, + wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), - partialResult = () => Some(row), + partialResults = () => Array(row), + cause) + case PartialResultArrayException(rows, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResults = () => rows, cause) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 67defe78a6c..005f32dd869 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -30,14 +30,24 @@ case class PartialResultException( cause: Throwable) extends Exception(cause) +/** + * Exception thrown when the underlying parser returns partial result list of parsing. + * @param partialResults the partial result list of parsing bad records. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialResultArrayException( + partialResults: Array[InternalRow], + cause: Throwable) + extends Exception(cause) + /** * Exception thrown when the underlying parser meet a bad record and can't parse it. * @param record a function to return the record that cause the parser to fail - * @param partialResult a function that returns an optional row, which is the partial result of + * @param partialResults a function that returns an row array, which is the partial results of * parsing this bad record. * @param cause the actual exception about why the record is bad and can't be parsed. */ case class BadRecordException( @transient record: () => UTF8String, - @transient partialResult: () => Option[InternalRow], + @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], cause: Throwable) extends Exception(cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index fcdcd21b6dc..28e81845f25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -61,12 +61,17 @@ class FailureSafeParser[IN]( } catch { case e: BadRecordException => mode match { case PermissiveMode => - Iterator(toResultRow(e.partialResult(), e.record)) + val partialResults = e.partialResults() + if (partialResults.nonEmpty) { + partialResults.iterator.map(row => toResultRow(Some(row), e.record)) + } else { + Iterator(toResultRow(None, e.record)) + } case DropMalformedMode => Iterator.empty case FailFastMode => throw QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( - toResultRow(e.partialResult(), e.record).toString, e) + toResultRow(e.partialResults().headOption, e.record).toString, e) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 1f9a2da5dd7..f34059b22a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -3225,6 +3225,21 @@ abstract class JsonSuite Row(null) :: Nil) } + test("SPARK-44079: fix incorrect result when parse array as struct " + + "using PERMISSIVE mode with corrupt record") { + val data = """[{"a": "incorrect", "b": "correct"}, {"a": "incorrect", "b": "correct"}]""" + val schema = new StructType(Array(StructField("a", IntegerType), + StructField("b", StringType), StructField("_corrupt_record", StringType))) + + val result = spark.read + .option("mode", "PERMISSIVE") + .option("multiline", "true") + .schema(schema) + .json(Seq(data).toDS()) + + checkAnswer(result, Seq(Row(null, "correct", data), Row(null, "correct", data))) + } + test("SPARK-36536: use casting when datetime pattern is not set") { withSQLConf( SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org