This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 5d32fa98043c [SPARK-48965][SQL] Use the correct schema in
`Dataset#toJSON`
5d32fa98043c is described below
commit 5d32fa98043ccd0d154d8631ba30c32005e05125
Author: Bruce Robbins <[email protected]>
AuthorDate: Wed Sep 4 21:43:28 2024 -0700
[SPARK-48965][SQL] Use the correct schema in `Dataset#toJSON`
In `Dataset#toJSON`, use the schema from `exprEnc`. This schema reflects
any changes (e.g., decimal precision, column ordering) that `exprEnc` might
make to input rows.
`Dataset#toJSON` currently uses the schema from the logical plan, but that
schema does not necessarily describe the rows passed to `JacksonGenerator`: the
function passed to `mapPartitions` uses `exprEnc` to serialize the input, and
this could potentially change the precision on decimals or rearrange columns.
Here's an example that tricks `UnsafeRow#getDecimal` (called from
`JacksonGenerator`) to mistakenly assume the decimal is stored as a Long:
```
scala> case class Data(a: BigDecimal)
class Data
scala> sql("select 123.456bd as a").as[Data].toJSON.collect
warning: 1 deprecation (since 2.13.3); for details, enable `:setting
-deprecation` or `:replay -deprecation`
val res0: Array[String] = Array({"a":68719476.745})
scala>
```
Here's an example that tricks `JacksonGenerator` to ask for a string from
an array and an array from a string. This case actually crashes the JVM:
```
scala> case class Data(x: Array[Int], y: String)
class Data
scala> sql("select repeat('Hey there', 17) as y, array_repeat(22, 17) as
x").as[Data].toJSON.collect
warning: 1 deprecation (since 2.13.3); for details, enable `:setting
-deprecation` or `:replay -deprecation`
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.InternalError: a fault occurred in a recent unsafe memory access
operation in compiled Java code
at
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$5(JacksonGenerator.scala:129)
~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
at
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$5$adapted(JacksonGenerator.scala:128)
~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
at
org.apache.spark.sql.catalyst.json.JacksonGenerator.writeArrayData(JacksonGenerator.scala:258)
~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
at
org.apache.spark.sql.catalyst.json.JacksonGenerator.$anonfun$makeWriter$23(JacksonGenerator.scala:201)
~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
at
org.apache.spark.sql.catalyst.json.JacksonGenerator.writeArray(JacksonGenerator.scala:249)
~[spark-catalyst_2.13-4.0.0-SNAPSHOT.jar:4.0.0-SNAPSHOT]
...
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
bash-3.2$
```
Both these cases work correctly without `toJSON`.
Before the PR, converting the dataframe to a dataset of Tuple would
preserve the column names in the JSON strings:
```
scala> sql("select 123.456d as a, 12 as b").as[(Double, Int)].toJSON.collect
warning: 1 deprecation (since 2.13.3); for details, enable `:setting
-deprecation` or `:replay -deprecation`
val res0: Array[String] = Array({"a":123.456,"b":12})
scala>
```
After the PR, the JSON strings use the field name from the Tuple class:
```
scala> sql("select 123.456d as a, 12 as b").as[(Double, Int)].toJSON.collect
warning: 1 deprecation (since 2.13.3); for details, enable `:setting
-deprecation` or `:replay -deprecation`
val res1: Array[String] = Array({"_1":123.456,"_2":12})
scala>
```
New tests.
No.
Closes #47982 from bersprockets/to_json_issue.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 5375ce2acfe206eb64fb8bede44fe47c643fcd46)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../sql/execution/datasources/json/JsonSuite.scala | 29 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3f43bccda7ab..f611208e1cae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3926,7 +3926,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def toJSON: Dataset[String] = {
- val rowSchema = this.schema
+ val rowSchema = exprEnc.schema
val sessionLocalTimeZone =
sparkSession.sessionState.conf.sessionLocalTimeZone
mapPartitions { iter =>
val writer = new CharArrayWriter()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index bbcda1df0339..b1b9a065986a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, Da
import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.test.SQLTestData.{DecimalData, TestData}
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.StructType.fromDDL
import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT}
@@ -3654,6 +3655,34 @@ abstract class JsonSuite
assert(JSONOptions.getAlternativeOption("charset").contains("encoding"))
assert(JSONOptions.getAlternativeOption("dateFormat").isEmpty)
}
+
+ test("SPARK-48965: Dataset#toJSON should use correct schema #1: decimals") {
+ val numString = "123.456"
+ val bd = BigDecimal(numString)
+ val ds1 = sql(s"select ${numString}bd as a, ${numString}bd as
b").as[DecimalData]
+ checkDataset(
+ ds1,
+ DecimalData(bd, bd)
+ )
+ val ds2 = ds1.toJSON
+ checkDataset(
+ ds2,
+ "{\"a\":123.456000000000000000,\"b\":123.456000000000000000}"
+ )
+ }
+
+ test("SPARK-48965: Dataset#toJSON should use correct schema #2: misaligned
columns") {
+ val ds1 = sql("select 'Hey there' as value, 90000001 as key").as[TestData]
+ checkDataset(
+ ds1,
+ TestData(90000001, "Hey there")
+ )
+ val ds2 = ds1.toJSON
+ checkDataset(
+ ds2,
+ "{\"key\":90000001,\"value\":\"Hey there\"}"
+ )
+ }
}
class JsonV1Suite extends JsonSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]