This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 74c1f02531e [SPARK-44940][SQL] Improve performance of JSON parsing
when "spark.sql.json.enablePartialResults" is enabled
74c1f02531e is described below
commit 74c1f02531e78dde34cbd311c3ed8feed7aa7fe5
Author: Ivan Sadikov <[email protected]>
AuthorDate: Mon Sep 4 12:10:07 2023 +0900
[SPARK-44940][SQL] Improve performance of JSON parsing when
"spark.sql.json.enablePartialResults" is enabled
### What changes were proposed in this pull request?
The PR improves JSON parsing when `spark.sql.json.enablePartialResults` is
enabled:
- Fixes the issue when using nested arrays `ClassCastException:
org.apache.spark.sql.catalyst.util.GenericArrayData cannot be cast to
org.apache.spark.sql.catalyst.InternalRow`
- Improves parsing of the nested struct fields, e.g. `{"a1": "AAA", "a2":
[{"f1": "", "f2": ""}], "a3": "id1", "a4": "XXX"}` used to be parsed as
`|AAA|NULL |NULL|NULL|` and now is parsed as `|AAA|[{NULL, }]|id1|XXX|`.
- Improves performance of nested JSON parsing. The initial implementation
would throw too many exceptions when multiple nested fields failed to parse.
When the config is disabled, it is not a problem because the entire record is
marked as NULL.
The internal benchmarks show the performance improvement from slowdown of
over 160% to an improvement of 7-8% compared to the master branch when the flag
is enabled. I will create a follow-up ticket to add a benchmark for this
regression.
### Why are the changes needed?
Fixes some corner cases in JSON parsing and improves performance when
`spark.sql.json.enablePartialResults` is enabled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I added tests to verify nested structs, maps, and arrays can be parsed
without affecting the subsequent fields in the JSON. I also updated the
existing tests when `spark.sql.json.enablePartialResults` is enabled because we
parse more data now.
I added a benchmark to check performance.
Before the change (master, a45a3a3d60cb97b107a177ad16bfe36372bc3e9b):
```
[info] OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on
Linux 5.4.0-1045-aws
[info] Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz
[info] Partial JSON results: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info]
------------------------------------------------------------------------------------------------------------------------
[info] parse invalid JSON 9537
9820 452 0.0 953651.6 1.0X
```
After the change (this PR):
```
OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
Intel(R) Xeon(R) Platinum 8375C CPU 2.90GHz
Partial JSON results: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
parse invalid JSON 3100 3106
6 0.0 309967.6 1.0X
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42667 from sadikovi/SPARK-44940.
Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/catalyst/json/JacksonParser.scala | 41 ++++-
.../sql/catalyst/util/BadRecordException.scala | 55 ++++++-
.../spark/sql/errors/QueryExecutionErrors.scala | 12 +-
sql/core/benchmarks/JsonBenchmark-results.txt | 152 +++++++++---------
.../org/apache/spark/sql/JsonFunctionsSuite.scala | 20 ++-
.../execution/datasources/json/JsonBenchmark.scala | 28 ++++
.../sql/execution/datasources/json/JsonSuite.scala | 170 ++++++++++++++++++++-
7 files changed, 384 insertions(+), 94 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 91c17a475cd..eace96ac872 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -420,17 +420,17 @@ class JacksonParser(
case VALUE_STRING if parser.getTextLength < 1 && allowEmptyString =>
dataType match {
case FloatType | DoubleType | TimestampType | DateType =>
- throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+ throw EmptyJsonFieldValueException(dataType)
case _ => null
}
case VALUE_STRING if parser.getTextLength < 1 =>
- throw QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+ throw EmptyJsonFieldValueException(dataType)
case token =>
// We cannot parse this token based on the given data type. So, we throw
a
// RuntimeException and this exception will be caught by `parse` method.
- throw QueryExecutionErrors.cannotParseJSONFieldError(parser, token,
dataType)
+ throw CannotParseJSONFieldException(parser.getCurrentName,
parser.getText, token, dataType)
}
/**
@@ -459,6 +459,11 @@ class JacksonParser(
bitmask(index) = false
} catch {
case e: SparkUpgradeException => throw e
+ case err: PartialValueException if enablePartialResults =>
+ badRecordException = badRecordException.orElse(Some(err.cause))
+ row.update(index, err.partialResult)
+ skipRow = structFilters.skipRow(row, index)
+ bitmask(index) = false
case NonFatal(e) if isRoot || enablePartialResults =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
@@ -508,7 +513,7 @@ class JacksonParser(
if (badRecordException.isEmpty) {
mapData
} else {
- throw PartialResultException(InternalRow(mapData),
badRecordException.get)
+ throw PartialMapDataResultException(mapData, badRecordException.get)
}
}
@@ -543,10 +548,21 @@ class JacksonParser(
throw PartialResultArrayException(arrayData.toArray[InternalRow](schema),
badRecordException.get)
} else {
- throw PartialResultException(InternalRow(arrayData),
badRecordException.get)
+ throw PartialArrayDataResultException(arrayData, badRecordException.get)
}
}
+ /**
+ * Converts the non-stacktrace exceptions to user-friendly
QueryExecutionErrors.
+ */
+ private def convertCauseForPartialResult(err: Throwable): Throwable = err
match {
+ case CannotParseJSONFieldException(fieldName, fieldValue, jsonType,
dataType) =>
+ QueryExecutionErrors.cannotParseJSONFieldError(fieldName, fieldValue,
jsonType, dataType)
+ case EmptyJsonFieldValueException(dataType) =>
+ QueryExecutionErrors.emptyJsonFieldValueError(dataType)
+ case _ => err
+ }
+
/**
* Parse the JSON input to the set of [[InternalRow]]s.
*
@@ -589,12 +605,25 @@ class JacksonParser(
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(row),
- cause)
+ convertCauseForPartialResult(cause))
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => rows,
cause)
+ // These exceptions should never be thrown outside of JacksonParser.
+ // They are used for the control flow in the parser. We add them here
for completeness
+ // since they also indicate a bad record.
+ case PartialArrayDataResultException(arrayData, cause) =>
+ throw BadRecordException(
+ record = () => recordLiteral(record),
+ partialResults = () => Array(InternalRow(arrayData)),
+ convertCauseForPartialResult(cause))
+ case PartialMapDataResultException(mapData, cause) =>
+ throw BadRecordException(
+ record = () => recordLiteral(record),
+ partialResults = () => Array(InternalRow(mapData)),
+ convertCauseForPartialResult(cause))
}
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
index 7bf01fba8cd..65a56c1064e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -17,19 +17,44 @@
package org.apache.spark.sql.catalyst.util
+import com.fasterxml.jackson.core.JsonToken
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String
+abstract class PartialValueException(val cause: Throwable) extends
Exception(cause) {
+ def partialResult: Serializable
+ override def getStackTrace(): Array[StackTraceElement] =
cause.getStackTrace()
+ override def fillInStackTrace(): Throwable = this
+}
+
/**
- * Exception thrown when the underlying parser returns a partial result of
parsing.
+ * Exception thrown when the underlying parser returns a partial result of
parsing an object/row.
* @param partialResult the partial result of parsing a bad record.
* @param cause the actual exception about why the parser cannot return full
result.
*/
case class PartialResultException(
- partialResult: InternalRow,
- cause: Throwable)
- extends Exception(cause)
+ override val partialResult: InternalRow,
+ override val cause: Throwable) extends PartialValueException(cause)
+
+/**
+ * Exception thrown when the underlying parser returns a partial array result.
+ * @param partialResult the partial array result.
+ * @param cause the actual exception about why the parser cannot return full
result.
+ */
+case class PartialArrayDataResultException(
+ override val partialResult: ArrayData,
+ override val cause: Throwable) extends PartialValueException(cause)
+
+/**
+ * Exception thrown when the underlying parser returns a partial map result.
+ * @param partialResult the partial map result.
+ * @param cause the actual exception about why the parser cannot return full
result.
+ */
+case class PartialMapDataResultException(
+ override val partialResult: MapData,
+ override val cause: Throwable) extends PartialValueException(cause)
/**
* Exception thrown when the underlying parser returns partial result list of
parsing.
@@ -65,3 +90,25 @@ case class StringAsDataTypeException(
fieldName: String,
fieldValue: String,
dataType: DataType) extends RuntimeException()
+
+/**
+ * No-stacktrace equivalent of
`QueryExecutionErrors.cannotParseJSONFieldError`.
+ * Used for code control flow in the parser without overhead of creating a
full exception.
+ */
+case class CannotParseJSONFieldException(
+ fieldName: String,
+ fieldValue: String,
+ jsonType: JsonToken,
+ dataType: DataType) extends RuntimeException() {
+ override def getStackTrace(): Array[StackTraceElement] = new
Array[StackTraceElement](0)
+ override def fillInStackTrace(): Throwable = this
+}
+
+/**
+ * No-stacktrace equivalent of `QueryExecutionErrors.emptyJsonFieldValueError`.
+ * Used for code control flow in the parser without overhead of creating a
full exception.
+ */
+case class EmptyJsonFieldValueException(dataType: DataType) extends
RuntimeException() {
+ override def getStackTrace(): Array[StackTraceElement] = new
Array[StackTraceElement](0)
+ override def fillInStackTrace(): Throwable = this
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 8e80d6570c4..2d655be0e70 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1279,11 +1279,19 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
def cannotParseJSONFieldError(parser: JsonParser, jsonType: JsonToken,
dataType: DataType)
: SparkRuntimeException = {
+ cannotParseJSONFieldError(parser.getCurrentName, parser.getText, jsonType,
dataType)
+ }
+
+ def cannotParseJSONFieldError(
+ fieldName: String,
+ fieldValue: String,
+ jsonType: JsonToken,
+ dataType: DataType): SparkRuntimeException = {
new SparkRuntimeException(
errorClass = "CANNOT_PARSE_JSON_FIELD",
messageParameters = Map(
- "fieldName" -> toSQLValue(parser.getCurrentName, StringType),
- "fieldValue" -> parser.getText,
+ "fieldName" -> toSQLValue(fieldName, StringType),
+ "fieldValue" -> fieldValue,
"jsonType" -> jsonType.toString(),
"dataType" -> toSQLType(dataType)))
}
diff --git a/sql/core/benchmarks/JsonBenchmark-results.txt
b/sql/core/benchmarks/JsonBenchmark-results.txt
index 55f66f7bb24..e53c7801141 100644
--- a/sql/core/benchmarks/JsonBenchmark-results.txt
+++ b/sql/core/benchmarks/JsonBenchmark-results.txt
@@ -3,121 +3,125 @@ Benchmark for performance of JSON parsing
================================================================================================
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
JSON schema inferring: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-No encoding 3720 3843
121 1.3 743.9 1.0X
-UTF-8 is set 5412 5455
45 0.9 1082.4 0.7X
+No encoding 2084 2134
46 2.4 416.8 1.0X
+UTF-8 is set 3077 3093
14 1.6 615.3 0.7X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
count a short column: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-No encoding 3234 3254
33 1.5 646.7 1.0X
-UTF-8 is set 4847 4868
21 1.0 969.5 0.7X
+No encoding 2854 2863
8 1.8 570.8 1.0X
+UTF-8 is set 4066 4066
1 1.2 813.1 0.7X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
count a wide column: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-No encoding 5702 5794
101 0.2 5702.1 1.0X
-UTF-8 is set 9526 9607
73 0.1 9526.1 0.6X
+No encoding 3348 3368
26 0.3 3347.8 1.0X
+UTF-8 is set 5215 5239
22 0.2 5214.7 0.6X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
select wide row: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-No encoding 18318 18448
199 0.0 366367.7 1.0X
-UTF-8 is set 19791 19887
99 0.0 395817.1 0.9X
+No encoding 11046 11102
54 0.0 220928.4 1.0X
+UTF-8 is set 12135 12181
54 0.0 242697.4 0.9X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Select a subset of 10 columns: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Select 10 columns 2531 2570
51 0.4 2531.3 1.0X
-Select 1 column 1867 1882
16 0.5 1867.0 1.4X
+Select 10 columns 2486 2488
2 0.4 2486.5 1.0X
+Select 1 column 1505 1506
2 0.7 1504.6 1.7X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
creation of JSON parser per line: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Short column without encoding 868 875
7 1.2 868.4 1.0X
-Short column with UTF-8 1151 1163
11 0.9 1150.9 0.8X
-Wide column without encoding 12063 12299
205 0.1 12063.0 0.1X
-Wide column with UTF-8 16095 16136
51 0.1 16095.3 0.1X
+Short column without encoding 888 889
3 1.1 887.6 1.0X
+Short column with UTF-8 1134 1136
2 0.9 1134.3 0.8X
+Wide column without encoding 8012 8056
51 0.1 8012.4 0.1X
+Wide column with UTF-8 9830 9844
22 0.1 9829.7 0.1X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
JSON functions: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Text read 165 170
4 6.1 164.7 1.0X
-from_json 2339 2386
77 0.4 2338.9 0.1X
-json_tuple 2667 2730
55 0.4 2667.3 0.1X
-get_json_object 2627 2659
32 0.4 2627.1 0.1X
+Text read 85 87
2 11.7 85.4 1.0X
+from_json 1706 1711
4 0.6 1706.4 0.1X
+json_tuple 1528 1534
7 0.7 1528.2 0.1X
+get_json_object 1275 1286
17 0.8 1275.0 0.1X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Dataset of json strings: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Text read 700 715
20 7.1 140.1 1.0X
-schema inferring 3144 3166
20 1.6 628.7 0.2X
-parsing 3261 3271
9 1.5 652.1 0.2X
+Text read 369 370
1 13.6 73.8 1.0X
+schema inferring 1880 1883
4 2.7 376.0 0.2X
+parsing 3731 3737
8 1.3 746.1 0.1X
Preparing data for benchmarking ...
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Json files in the per-line mode: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Text read 1096 1105
12 4.6 219.1 1.0X
-Schema inferring 3818 3830
16 1.3 763.6 0.3X
-Parsing without charset 4107 4137
32 1.2 821.4 0.3X
-Parsing with UTF-8 5717 5763
41 0.9 1143.3 0.2X
+Text read 553 579
32 9.0 110.6 1.0X
+Schema inferring 2195 2196
2 2.3 439.0 0.3X
+Parsing without charset 4272 4274
3 1.2 854.3 0.1X
+Parsing with UTF-8 5459 5464
6 0.9 1091.7 0.1X
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Write dates and timestamps: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-Create a dataset of timestamps 199 202
3 5.0 198.9 1.0X
-to_json(timestamp) 1458 1487
26 0.7 1458.0 0.1X
-write timestamps to files 1232 1262
26 0.8 1232.5 0.2X
-Create a dataset of dates 231 237
5 4.3 230.8 0.9X
-to_json(date) 956 966
10 1.0 956.5 0.2X
-write dates to files 785 793
10 1.3 785.4 0.3X
+Create a dataset of timestamps 102 112
13 9.8 101.9 1.0X
+to_json(timestamp) 840 841
1 1.2 839.6 0.1X
+write timestamps to files 692 696
4 1.4 692.0 0.1X
+Create a dataset of dates 120 121
1 8.4 119.7 0.9X
+to_json(date) 589 591
2 1.7 589.3 0.2X
+write dates to files 442 445
2 2.3 442.3 0.2X
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Read dates and timestamps: Best
Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------
-read timestamp text from files
294 300 6 3.4 293.8 1.0X
-read timestamps from files
3254 3283 49 0.3 3254.0 0.1X
-infer timestamps from files
8390 8528 165 0.1 8389.8 0.0X
-read date text from files
269 276 7 3.7 269.3 1.1X
-read date from files
1178 1192 13 0.8 1177.8 0.2X
-timestamp strings
406 418 15 2.5 406.2 0.7X
-parse timestamps from Dataset[String]
3700 3713 16 0.3 3699.5 0.1X
-infer timestamps from Dataset[String]
8604 8647 65 0.1 8604.0 0.0X
-date strings
464 479 14 2.2 463.7 0.6X
-parse dates from Dataset[String]
1528 1538 10 0.7 1527.7 0.2X
-from_json(timestamp)
5402 5429 26 0.2 5401.8 0.1X
-from_json(date)
2948 2966 17 0.3 2947.5 0.1X
-infer error timestamps from Dataset[String] with default format
2358 2434 70 0.4 2357.6 0.1X
-infer error timestamps from Dataset[String] with user-provided format
2363 2390 36 0.4 2362.9 0.1X
-infer error timestamps from Dataset[String] with legacy format
2248 2287 35 0.4 2248.3 0.1X
+read timestamp text from files
143 145 4 7.0 142.6 1.0X
+read timestamps from files
2449 2469 17 0.4 2448.6 0.1X
+infer timestamps from files
5579 5596 15 0.2 5578.8 0.0X
+read date text from files
132 139 7 7.6 131.7 1.1X
+read date from files
1017 1020 2 1.0 1017.5 0.1X
+timestamp strings
227 230 3 4.4 227.2 0.6X
+parse timestamps from Dataset[String]
2827 2830 3 0.4 2826.5 0.1X
+infer timestamps from Dataset[String]
6001 6008 6 0.2 6001.2 0.0X
+date strings
259 261 2 3.9 259.0 0.6X
+parse dates from Dataset[String]
1382 1387 4 0.7 1382.3 0.1X
+from_json(timestamp)
3557 3561 7 0.3 3556.8 0.0X
+from_json(date)
2146 2148 2 0.5 2146.4 0.1X
+infer error timestamps from Dataset[String] with default format
1989 1993 4 0.5 1989.3 0.1X
+infer error timestamps from Dataset[String] with user-provided format
1922 1925 3 0.5 1922.1 0.1X
+infer error timestamps from Dataset[String] with legacy format
1919 1923 4 0.5 1919.1 0.1X
-OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1037-azure
-Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
Filters pushdown: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-w/o filters 22544 22661
109 0.0 225436.4 1.0X
-pushdown disabled 21045 21213
188 0.0 210452.6 1.1X
-w/ filters 893 904
10 0.1 8931.8 25.2X
-
+w/o filters 14387 14399
12 0.0 143871.9 1.0X
+pushdown disabled 13891 13899
7 0.0 138912.3 1.0X
+w/ filters 782 784
2 0.1 7820.5 18.4X
+OpenJDK 64-Bit Server VM 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10 on Linux
5.4.0-1045-aws
+Intel(R) Xeon(R) Platinum 8375C CPU @ 2.90GHz
+Partial JSON results: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+parse invalid JSON 3100 3106
6 0.0 309967.6 1.0X
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 187fab75f63..a76e102fe91 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -1021,16 +1021,23 @@ class JsonFunctionsSuite extends QueryTest with
SharedSparkSession {
.add("c2", ArrayType(new StructType().add("c3", LongType).add("c4",
StringType)))
val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0")
checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
+
val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
- checkAnswer(df2.select(from_json($"c0", new StructType().add("data",
st))), Row(Row(null)))
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ checkAnswer(
+ df2.select(from_json($"c0", new StructType().add("data", st))),
+ Row(Row(Row(123456, null)))
+ )
+ }
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ checkAnswer(df2.select(from_json($"c0", new StructType().add("data",
st))), Row(Row(null)))
+ }
+ val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
- val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))),
Row(Array(Row(123456, null))))
}
-
withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
- val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
}
@@ -1119,14 +1126,13 @@ class JsonFunctionsSuite extends QueryTest with
SharedSparkSession {
)
)
- // Value "a" cannot be parsed as an integer,
- // the error cascades to "c2", thus making its value null.
+ // Value "a" cannot be parsed as an integer, c2 value is null.
val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0")
withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
checkAnswer(
df.select(from_json($"c0", ArrayType(st))),
- Row(Array(Row(null)))
+ Row(Array(Row(Seq(Row(null)))))
)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
index c522378a65d..5b86543648f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala
@@ -542,6 +542,33 @@ object JsonBenchmark extends SqlBasedBenchmark {
}
}
+ private def partialResultBenchmark(rowsNum: Int, numIters: Int): Unit = {
+ val benchmark = new Benchmark("Partial JSON results", rowsNum, output =
output)
+ val colsNum = 1000
+
+ val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
+ val schema = StructType(fields)
+
+ def data: Dataset[String] = {
+ spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
+ iter.map { i =>
+ (0 until colsNum).map { j =>
+ // Only the last column has an integer value.
+ if (j < colsNum - 1) s""""col${i}":"foo_${j}"""" else
s""""col${i}":${j}"""
+ }.mkString("{", ", ", "}")
+ }
+ }.select($"value").as[String]
+ }
+
+ benchmark.addCase("parse invalid JSON", numIters) { _ =>
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ spark.read.schema(schema).json(data).noop()
+ }
+ }
+
+ benchmark.run()
+ }
+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("Benchmark for performance of JSON parsing") {
@@ -558,6 +585,7 @@ object JsonBenchmark extends SqlBasedBenchmark {
// Benchmark pushdown filters that refer to top-level columns.
// TODO (SPARK-32325): Add benchmarks for filters with nested column
attributes.
filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters)
+ partialResultBenchmark(rowsNum = 10000, numIters)
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 95468a1f1d7..11779286ec2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -3437,7 +3437,7 @@ abstract class JsonSuite
if (enablePartialResults) {
checkAnswer(
df,
- Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
+ Seq(Row(Row(1, null), Row(1)), Row(Row(2, null), Row(2)))
)
} else {
checkAnswer(
@@ -3450,6 +3450,174 @@ abstract class JsonSuite
}
}
+ test("SPARK-44940: fully parse the record except f1 if partial results are
enabled") {
+ withTempPath { path =>
+ Seq(
+ """{"a1": "AAA", "a2": [{"f1": "", "f2": ""}], "a3": "id1", "a4":
"XXX"}""",
+ """{"a1": "BBB", "a2": [{"f1": 12, "f2": ""}], "a3": "id2", "a4":
"YYY"}""").toDF()
+ .repartition(1)
+ .write.text(path.getAbsolutePath)
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", Seq(Row(null, "")), "id1", "XXX"),
+ Row("BBB", Seq(Row(12, "")), "id2", "YYY")
+ )
+ )
+ }
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, null, null),
+ Row("BBB", Seq(Row(12, "")), "id2", "YYY")
+ )
+ )
+ }
+ }
+ }
+
+ test("SPARK-44940: fully parse primitive map if partial results are
enabled") {
+ withTempPath { path =>
+ Seq(
+ """{"a1": "AAA", "a2": {"f1": "", "f2": ""}, "a3": "id1"}""",
+ """{"a1": "BBB", "a2": {"f1": 12, "f2": ""}, "a3": "id2"}""").toDF()
+ .repartition(1)
+ .write.text(path.getAbsolutePath)
+
+ val schema = "a1 string, a2 map<string, int>, a3 string"
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ val df = spark.read.schema(schema).json(path.getAbsolutePath)
+ // Although the keys match the string type and some values match the
integer type, because
+ // some of the values do not match the type, we mark the entire map as
null.
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, "id1"),
+ Row("BBB", null, "id2")
+ )
+ )
+ }
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ val df = spark.read.schema(schema).json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, null),
+ Row("BBB", null, null)
+ )
+ )
+ }
+ }
+ }
+
+ test("SPARK-44940: fully parse map of structs if partial results are
enabled") {
+ withTempPath { path =>
+ Seq(
+ """{"a1": "AAA", "a2": {"key": {"f1": "", "f2": ""}}, "a3": "id1"}""",
+ """{"a1": "BBB", "a2": {"key": {"f1": 12, "f2": ""}}, "a3":
"id2"}""").toDF()
+ .repartition(1)
+ .write.text(path.getAbsolutePath)
+
+ val schema = "a1 string, a2 map<string, struct<f1: int, f2: string>>, a3
string"
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ val df = spark.read.schema(schema).json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", Map("key" -> Row(null, "")), "id1"),
+ Row("BBB", Map("key" -> Row(12, "")), "id2")
+ )
+ )
+ }
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ val df = spark.read.schema(schema).json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, null),
+ Row("BBB", Map("key" -> Row(12, "")), "id2")
+ )
+ )
+ }
+ }
+ }
+
+ test("SPARK-44940: fully parse primitive arrays if partial results are
enabled") {
+ withTempPath { path =>
+ Seq(
+ """{"a1": "AAA", "a2": {"f1": [""]}, "a3": "id1", "a4": "XXX"}""",
+ """{"a1": "BBB", "a2": {"f1": [12]}, "a3": "id2", "a4":
"YYY"}""").toDF()
+ .repartition(1)
+ .write.text(path.getAbsolutePath)
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", Row(null), "id1", "XXX"),
+ Row("BBB", Row(Seq(12)), "id2", "YYY")
+ )
+ )
+ }
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, null, null),
+ Row("BBB", Row(Seq(12)), "id2", "YYY")
+ )
+ )
+ }
+ }
+ }
+
+ test("SPARK-44940: fully parse array of arrays if partial results are
enabled") {
+ withTempPath { path =>
+ Seq(
+ """{"a1": "AAA", "a2": [[12, ""], [""]], "a3": "id1", "a4": "XXX"}""",
+ """{"a1": "BBB", "a2": [[12, 34], [""]], "a3": "id2", "a4":
"YYY"}""").toDF()
+ .repartition(1)
+ .write.text(path.getAbsolutePath)
+
+ // We cannot parse `array<array<int>>` type because one of the inner
arrays contains a
+ // mismatched type.
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "true") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, "id1", "XXX"),
+ Row("BBB", null, "id2", "YYY")
+ )
+ )
+ }
+
+ withSQLConf(SQLConf.JSON_ENABLE_PARTIAL_RESULTS.key -> "false") {
+ val df = spark.read.json(path.getAbsolutePath)
+ checkAnswer(
+ df,
+ Seq(
+ Row("AAA", null, "id1", "XXX"),
+ Row("BBB", null, "id2", "YYY")
+ )
+ )
+ }
+ }
+ }
+
test("SPARK-40667: validate JSON Options") {
assert(JSONOptions.getAllOptions.size == 28)
// Please add validation on any new Json options here
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]