This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9315b9c04704 [SPARK-49405][SQL] Restrict charsets in JsonOptions
9315b9c04704 is described below

commit 9315b9c047043fb8597e9601b827e098589e0578
Author: Kent Yao <[email protected]>
AuthorDate: Wed Aug 28 09:58:09 2024 +0800

    [SPARK-49405][SQL] Restrict charsets in JsonOptions
    
    ### What changes were proposed in this pull request?
    
    Following the work of SPARK-48857 for CSV, this PR brings it to JSON 
options for reading and writing json files.
    
    ### Why are the changes needed?
    
    To make the charset list consistent across different platforms/JDKs and 
different plain text file sources
    
    ### Does this PR introduce _any_ user-facing change?
    Yes,
    - For an invalid charset to both JDK and Spark definitions, 
UnsupportedCharsetException is replaced by INVALID_PARAMETER_VALUE.CHARSET error
    - For an invalid charset to Spark definitions only, 
INVALID_PARAMETER_VALUE.CHARSET will be raised instead of success. 
legacyJavaCharsets is provided to restore the old behavior
    
    ### How was this patch tested?
    Modified tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #47887 from yaooqinn/SPARK-49405.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../sql/catalyst/json/CreateJacksonParser.scala    |   5 +-
 .../spark/sql/catalyst/json/JSONOptions.scala      |  11 ++-
 .../spark/sql/catalyst/util/CharsetProvider.scala  |   7 +-
 .../sql/execution/datasources/json/JsonSuite.scala | 102 ++++++++++++---------
 4 files changed, 69 insertions(+), 56 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
index ba7b54fc04e8..7190f6fff79f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
@@ -19,13 +19,13 @@ package org.apache.spark.sql.catalyst.json
 
 import java.io.{ByteArrayInputStream, InputStream, InputStreamReader, Reader}
 import java.nio.channels.Channels
-import java.nio.charset.Charset
 import java.nio.charset.StandardCharsets
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.hadoop.io.Text
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CharsetProvider
 import org.apache.spark.unsafe.types.UTF8String
 
 object CreateJacksonParser extends Serializable {
@@ -61,8 +61,7 @@ object CreateJacksonParser extends Serializable {
     val bais = new ByteArrayInputStream(in, 0, length)
     val byteChannel = Channels.newChannel(bais)
     val decodingBufferSize = Math.min(length, 8192)
-    val decoder = Charset.forName(enc).newDecoder()
-
+    val decoder = CharsetProvider.newDecoder(enc, caller = "Jackson Parser")
     Channels.newReader(byteChannel, decoder, decodingBufferSize)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 9d94ea8779a7..0e1dfdf366a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -150,7 +150,8 @@ class JSONOptions(
     sep
   }
 
-  protected def checkedEncoding(enc: String): String = enc
+  protected def checkedEncoding(enc: String): String =
+    CharsetProvider.forName(enc, caller = "JSONOptions").name()
 
   /**
    * Standard encoding (charset) name. For example UTF-8, UTF-16LE and 
UTF-32BE.
@@ -233,16 +234,16 @@ class JSONOptionsInRead(
   }
 
   protected override def checkedEncoding(enc: String): String = {
-    val isDenied = JSONOptionsInRead.denyList.contains(Charset.forName(enc))
+    val charset = CharsetProvider.forName(enc, caller = "JSONOptionsInRead")
+    val isDenied = JSONOptionsInRead.denyList.contains(charset)
     require(multiLine || !isDenied,
       s"""The $enc encoding must not be included in the denyList when 
multiLine is disabled:
          |denylist: ${JSONOptionsInRead.denyList.mkString(", 
")}""".stripMargin)
 
-    val isLineSepRequired =
-        multiLine || Charset.forName(enc) == StandardCharsets.UTF_8 || 
lineSeparator.nonEmpty
+    val isLineSepRequired = multiLine || charset == StandardCharsets.UTF_8 || 
lineSeparator.nonEmpty
     require(isLineSepRequired, s"The lineSep option must be specified for the 
$enc encoding")
 
-    enc
+    charset.name()
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
index e879799763b7..0e7fca24e137 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharsetProvider.scala
@@ -19,6 +19,7 @@
  import java.util.Locale
 
  import org.apache.spark.sql.errors.QueryExecutionErrors
+ import org.apache.spark.sql.internal.SQLConf
 
 private[sql] object CharsetProvider {
 
@@ -27,7 +28,7 @@ private[sql] object CharsetProvider {
 
   def forName(
       charset: String,
-      legacyCharsets: Boolean,
+      legacyCharsets: Boolean = SQLConf.get.legacyJavaCharsets,
       caller: String = ""): Charset = {
     val lowercasedCharset = charset.toLowerCase(Locale.ROOT)
     if (legacyCharsets || VALID_CHARSETS.contains(lowercasedCharset)) {
@@ -61,8 +62,8 @@ private[sql] object CharsetProvider {
   }
 
   def newDecoder(charset: String,
-      legacyCharsets: Boolean,
-      legacyErrorAction: Boolean,
+      legacyCharsets: Boolean = SQLConf.get.legacyJavaCharsets,
+      legacyErrorAction: Boolean = SQLConf.get.legacyCodingErrorAction,
       caller: String = "decode"): CharsetDecoder = {
     val codingErrorAction = if (legacyErrorAction) {
       CodingErrorAction.REPLACE
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 530019ffc5ac..c31ecbc43749 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import java.io._
-import java.nio.charset.{Charset, StandardCharsets, 
UnsupportedCharsetException}
+import java.nio.charset.{Charset, StandardCharsets}
 import java.nio.file.Files
 import java.sql.{Date, Timestamp}
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
@@ -32,13 +32,15 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 
 import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, 
SparkUpgradeException, TestUtils}
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
 import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, 
HadoopCompressionCodec}
+import org.apache.spark.sql.catalyst.util.{CharsetProvider, DateTimeTestUtils, 
DateTimeUtils, HadoopCompressionCodec}
 import org.apache.spark.sql.catalyst.util.HadoopCompressionCodec.GZIP
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
+import org.apache.spark.sql.errors.QueryExecutionErrors.toSQLId
 import org.apache.spark.sql.execution.ExternalRDD
 import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, 
DataSource, InMemoryFileIndex, NoopCache}
 import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
@@ -2368,14 +2370,20 @@ abstract class JsonSuite
 
   test("SPARK-23723: Unsupported encoding name") {
     val invalidCharset = "UTF-128"
-    val exception = intercept[UnsupportedCharsetException] {
-      spark.read
-        .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
-        .json(testFile("test-data/utf16LE.json"))
-        .count()
-    }
-
-    assert(exception.getMessage.contains(invalidCharset))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        spark.read
+          .options(Map("encoding" -> invalidCharset, "lineSep" -> "\n"))
+          .json(testFile("test-data/utf16LE.json"))
+          .count()
+      },
+      errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
+      parameters = Map(
+        "charset" -> invalidCharset,
+        "functionName" -> toSQLId("JSONOptionsInRead"),
+        "parameter" -> toSQLId("charset"),
+        "charsets" -> CharsetProvider.VALID_CHARSETS.mkString(", "))
+    )
   }
 
   test("SPARK-23723: checking that the encoding option is case agnostic") {
@@ -2426,17 +2434,19 @@ abstract class JsonSuite
   }
 
   test("SPARK-23723: save json in UTF-32BE") {
-    val encoding = "UTF-32BE"
-    withTempPath { path =>
-      val df = spark.createDataset(Seq(("Dog", 42)))
-      df.write
-        .options(Map("encoding" -> encoding))
-        .json(path.getCanonicalPath)
+    withSQLConf(SQLConf.LEGACY_JAVA_CHARSETS.key -> "true") {
+      val encoding = "UTF-32BE"
+      withTempPath { path =>
+        val df = spark.createDataset(Seq(("Dog", 42)))
+        df.write
+          .options(Map("encoding" -> encoding))
+          .json(path.getCanonicalPath)
 
-      checkEncoding(
-        expectedEncoding = encoding,
-        pathToJsonFiles = path.getCanonicalPath,
-        expectedContent = """{"_1":"Dog","_2":42}""")
+        checkEncoding(
+          expectedEncoding = encoding,
+          pathToJsonFiles = path.getCanonicalPath,
+          expectedContent = """{"_1":"Dog","_2":42}""")
+      }
     }
   }
 
@@ -2454,22 +2464,22 @@ abstract class JsonSuite
 
   test("SPARK-23723: wrong output encoding") {
     val encoding = "UTF-128"
-    val exception = intercept[SparkException] {
-      withTempPath { path =>
-        val df = spark.createDataset(Seq((0)))
-        df.write
-          .options(Map("encoding" -> encoding))
-          .json(path.getCanonicalPath)
-      }
-    }
-
-    val baos = new ByteArrayOutputStream()
-    val ps = new PrintStream(baos, true, StandardCharsets.UTF_8.name())
-    exception.printStackTrace(ps)
-    ps.flush()
-
-    assert(baos.toString.contains(
-      "java.nio.charset.UnsupportedCharsetException: UTF-128"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        withTempPath { path =>
+          val df = spark.createDataset(Seq((0)))
+          df.write
+            .options(Map("encoding" -> encoding))
+            .json(path.getCanonicalPath)
+        }
+      },
+      errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
+      parameters = Map(
+        "charset" -> encoding,
+        "functionName" -> toSQLId("JSONOptions"),
+        "parameter" -> toSQLId("charset"),
+        "charsets" -> CharsetProvider.VALID_CHARSETS.mkString(", "))
+    )
   }
 
   test("SPARK-23723: read back json in UTF-16LE") {
@@ -2516,16 +2526,18 @@ abstract class JsonSuite
         val os = new FileOutputStream(path)
         os.write(data)
         os.close()
-        val reader = if (inferSchema) {
-          spark.read
-        } else {
-          spark.read.schema(schema)
+        withSQLConf(SQLConf.LEGACY_JAVA_CHARSETS.key -> "true") {
+          val reader = if (inferSchema) {
+            spark.read
+          } else {
+            spark.read.schema(schema)
+          }
+          val readBack = reader
+            .option("encoding", encoding)
+            .option("lineSep", lineSep)
+            .json(path.getCanonicalPath)
+          checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
         }
-        val readBack = reader
-          .option("encoding", encoding)
-          .option("lineSep", lineSep)
-          .json(path.getCanonicalPath)
-        checkAnswer(readBack, records.map(rec => Row(rec._1, rec._2)))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to