This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9315b9c04704 [SPARK-49405][SQL] Restrict charsets in JsonOptions
9315b9c04704 is described below
commit 9315b9c047043fb8597e9601b827e098589e0578
Author: Kent Yao <[email protected]>
AuthorDate: Wed Aug 28 09:58:09 2024 +0800
[SPARK-49405][SQL] Restrict charsets in JsonOptions
### What changes were proposed in this pull request?
Following the work of SPARK-48857 for CSV, this PR brings it to JSON
options for reading and writing json files.
### Why are the changes needed?
To make the charset list consistent across different platforms/JDKs and
different plain text file sources
### Does this PR introduce _any_ user-facing change?
Yes,
- For an invalid charset to both JDK and Spark definitions,
UnsupportedCharsetException is replaced by INVALID_PARAMETER_VALUE.CHARSET error
- For an invalid charset to Spark definitions only,
INVALID_PARAMETER_VALUE.CHARSET will be raised instead of success.
legacyJavaCharsets is provided to restore the old behavior
### How was this patch tested?
Modified tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #47887 from yaooqinn/SPARK-49405.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../sql/catalyst/json/CreateJacksonParser.scala | 5 +-
.../spark/sql/catalyst/json/JSONOptions.scala | 11 ++-
.../spark/sql/catalyst/util/CharsetProvider.scala | 7 +-
.../sql/execution/datasources/json/JsonSuite.scala | 102 ++++++++++++---------
4 files changed, 69 insertions(+), 56 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
index ba7b54fc04e8..7190f6fff79f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.catalyst.json
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader, Reader}
import java.nio.channels.Channels
-import java.nio.charset.Charset
import java.nio.charset.StandardCharsets
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CharsetProvider
import org.apache.spark.unsafe.types.UTF8String
object CreateJacksonParser extends Serializable {
@@ -61,8 +61,7 @@ object CreateJacksonParser extends Serializable {
val bais = new ByteArrayInputStream(in, 0, length)
val byteChannel = Channels.newChannel(bais)
val decodingBufferSize = Math.min(length, 8192)
- val decoder = Charset.forName(enc).newDecoder()
-
+ val decoder = CharsetProvider.newDecoder(enc, caller = "Jackson Parser")
Channels.newReader(byteChannel, decoder, decodingBufferSize)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 9d94ea8779a7..0e1dfdf366a8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -150,7 +150,8 @@ class JSONOptions(
sep
}
- protected def checkedEncoding(enc: String): String = enc
+ protected def checkedEncoding(enc: String): String =
+ CharsetProvider.forName(enc, caller = "JSONOptions").name()
/**
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and
UTF-32BE.
@@ -233,16 +234,16 @@ class JSONOptionsInRead(
}
protected override def checkedEncoding(enc: String): String = {
- val isDenied = JSONOptionsInRead.denyList.contains(Charset.forName(enc))
+ val charset = CharsetProvider.forName(enc, caller = "JSONOptionsInRead")
+ val isDenied = JSONOptionsInRead.denyList.contains(charset)
require(multiLine || !isDenied,
s"""The $enc encoding must not be included in the denyList when
multiLine is disabled:
|denylist: ${JSONOptionsInRead.denyList.mkString(",
")}""".stripMargin)
- val isLineSepRequired =
- multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 ||
lineSeparator.nonEmpty
+ val isLineSepRequired = multiLine || charset == StandardCharsets.UTF_8 ||
lineSeparator.nonEmpty
require(isLineSepRequired, s"The lineSep option must be specified for the
$enc encoding")
- enc
+ charset.name()
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
index e879799763b7..0e7fca24e137 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
@@ -19,6 +19,7 @@
import java.util.Locale
import org.apache.spark.sql.errors.QueryExecutionErrors
+ import org.apache.spark.sql.internal.SQLConf
private[sql] object CharsetProvider {
@@ -27,7 +28,7 @@ private[sql] object CharsetProvider {
def forName(
charset: String,
- legacyCharsets: Boolean,
+ legacyCharsets: Boolean = SQLConf.get.legacyJavaCharsets,
caller: String = ""): Charset = {
val lowercasedCharset = charset.toLowerCase(Locale.ROOT)
if (legacyCharsets || VALID_CHARSETS.contains(lowercasedCharset)) {
@@ -61,8 +62,8 @@ private[sql] object CharsetProvider {
}
def newDecoder(charset: String,
- legacyCharsets: Boolean,
- legacyErrorAction: Boolean,
+ legacyCharsets: Boolean = SQLConf.get.legacyJavaCharsets,
+ legacyErrorAction: Boolean = SQLConf.get.legacyCodingErrorAction,
caller: String = "decode"): CharsetDecoder = {
val codingErrorAction = if (legacyErrorAction) {
CodingErrorAction.REPLACE
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 530019ffc5ac..c31ecbc43749 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import java.io._
-import java.nio.charset.{Charset, StandardCharsets,
UnsupportedCharsetException}
+import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
@@ -32,13 +32,15 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException,
SparkUpgradeException, TestUtils}
+import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils,
HadoopCompressionCodec}
+import org.apache.spark.sql.catalyst.util.{CharsetProvider, DateTimeTestUtils,
DateTimeUtils, HadoopCompressionCodec}
import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLId
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite,
DataSource, InMemoryFileIndex, NoopCache}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
@@ -2368,14 +2370,20 @@ abstract class JsonSuite
test("SPARK-23723: Unsupported encoding name") {
val invalidCharset = "UTF-128"
- val exception = intercept[UnsupportedCharsetException] {
- spark.read
- .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
- .json(testFile("test-data/utf16LE.json"))
- .count()
- }
-
- assert(exception.getMessage.contains(invalidCharset))
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ spark.read
+ .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+ .json(testFile("test-data/utf16LE.json"))
+ .count()
+ },
+ errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
+ parameters = Map(
+ "charset" -> invalidCharset,
+ "functionName" -> toSQLId("JSONOptionsInRead"),
+ "parameter" -> toSQLId("charset"),
+ "charsets" -> CharsetProvider.VALID_CHARSETS.mkString(", "))
+ )
}
test("SPARK-23723: checking that the encoding option is case agnostic") {
@@ -2426,17 +2434,19 @@ abstract class JsonSuite
}
test("SPARK-23723: save json in UTF-32BE") {
- val encoding = "UTF-32BE"
- withTempPath { path =>
- val df = spark.createDataset(Seq(("Dog", 42)))
- df.write
- .options(Map("encoding" -> encoding))
- .json(path.getCanonicalPath)
+ withSQLConf(SQLConf.LEGACY_JAVA_CHARSETS.key -> "true") {
+ val encoding = "UTF-32BE"
+ withTempPath { path =>
+ val df = spark.createDataset(Seq(("Dog", 42)))
+ df.write
+ .options(Map("encoding" -> encoding))
+ .json(path.getCanonicalPath)
- checkEncoding(
- expectedEncoding = encoding,
- pathToJsonFiles = path.getCanonicalPath,
- expectedContent = """{"_1":"Dog","_2":42}""")
+ checkEncoding(
+ expectedEncoding = encoding,
+ pathToJsonFiles = path.getCanonicalPath,
+ expectedContent = """{"_1":"Dog","_2":42}""")
+ }
}
}
@@ -2454,22 +2464,22 @@ abstract class JsonSuite
test("SPARK-23723: wrong output encoding") {
val encoding = "UTF-128"
- val exception = intercept[SparkException] {
- withTempPath { path =>
- val df = spark.createDataset(Seq((0)))
- df.write
- .options(Map("encoding" -> encoding))
- .json(path.getCanonicalPath)
- }
- }
-
- val baos = new ByteArrayOutputStream()
- val ps = new PrintStream(baos, true, StandardCharsets.UTF_8.name())
- exception.printStackTrace(ps)
- ps.flush()
-
- assert(baos.toString.contains(
- "java.nio.charset.UnsupportedCharsetException: UTF-128"))
+ checkError(
+ exception = intercept[SparkIllegalArgumentException] {
+ withTempPath { path =>
+ val df = spark.createDataset(Seq((0)))
+ df.write
+ .options(Map("encoding" -> encoding))
+ .json(path.getCanonicalPath)
+ }
+ },
+ errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
+ parameters = Map(
+ "charset" -> encoding,
+ "functionName" -> toSQLId("JSONOptions"),
+ "parameter" -> toSQLId("charset"),
+ "charsets" -> CharsetProvider.VALID_CHARSETS.mkString(", "))
+ )
}
test("SPARK-23723: read back json in UTF-16LE") {
@@ -2516,16 +2526,18 @@ abstract class JsonSuite
val os = new FileOutputStream(path)
os.write(data)
os.close()
- val reader = if (inferSchema) {
- spark.read
- } else {
- spark.read.schema(schema)
+ withSQLConf(SQLConf.LEGACY_JAVA_CHARSETS.key -> "true") {
+ val reader = if (inferSchema) {
+ spark.read
+ } else {
+ spark.read.schema(schema)
+ }
+ val readBack = reader
+ .option("encoding", encoding)
+ .option("lineSep", lineSep)
+ .json(path.getCanonicalPath)
+ checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
}
- val readBack = reader
- .option("encoding", encoding)
- .option("lineSep", lineSep)
- .json(path.getCanonicalPath)
- checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]