This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 63cb0939679 [SPARK-43362][SQL] Special handling of JSON type for MySQL 
connector
63cb0939679 is described below

commit 63cb09396798da2a16db4bae2b42e0f95bef831b
Author: tianhanhu <adrianh...@gmail.com>
AuthorDate: Mon May 8 10:25:37 2023 +0900

    [SPARK-43362][SQL] Special handling of JSON type for MySQL connector
    
    ### What changes were proposed in this pull request?
    
    MySQL JSON type is converted into JDBC VARCHAR type with precision of -1 on 
some MariaDB drivers.
    When receiving VARCHAR with negative precision, Spark will throw an error.
    
    This PR special cases this scenario by directly converting JSON type into 
StringType in MySQLDialect.
    
    ### Why are the changes needed?
    
    Enable reading MySQL tables that has a JSON column.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Update existing integration test
    
    Closes #41040 from tianhanhu/SPARK-43362.
    
    Authored-by: tianhanhu <adrianh...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala   | 8 +++++---
 .../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala   | 4 ++++
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index c5ca5a72a83..dc3acb66ff1 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -68,10 +68,10 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 
     // TODO: Test locale conversion for strings.
     conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c 
TINYTEXT, "
-      + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i 
BLOB)"
+      + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i 
BLOB, j JSON)"
     ).executeUpdate()
     conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 
'brown', 'fox', " +
-      "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
+      "'jumps', 'over', 'the', 'lazy', 'dog', '{\"status\": 
\"merrily\"}')").executeUpdate()
   }
 
   test("Basic test") {
@@ -137,7 +137,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     val rows = df.collect()
     assert(rows.length == 1)
     val types = rows(0).toSeq.map(x => x.getClass.toString)
-    assert(types.length == 9)
+    assert(types.length == 10)
     assert(types(0).equals("class java.lang.String"))
     assert(types(1).equals("class java.lang.String"))
     assert(types(2).equals("class java.lang.String"))
@@ -147,6 +147,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(types(6).equals("class [B"))
     assert(types(7).equals("class [B"))
     assert(types(8).equals("class [B"))
+    assert(types(9).equals("class java.lang.String"))
     assert(rows(0).getString(0).equals("the".padTo(10, ' ')))
     assert(rows(0).getString(1).equals("quick"))
     assert(rows(0).getString(2).equals("brown"))
@@ -156,6 +157,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), 
Array[Byte](116, 104, 101, 0)))
     assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), 
Array[Byte](108, 97, 122, 121)))
     assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), 
Array[Byte](100, 111, 103)))
+    assert(rows(0).getString(9).equals("{\"status\": \"merrily\"}"))
   }
 
   test("Basic write test") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5e85ff3ebf6..d6edb67e57e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -98,6 +98,10 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
     } else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
       // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for 
historical reason
       Some(StringType)
+    } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) {
+      // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a 
precision of -1.
+      // Explicitly converts it into StringType here.
+      Some(StringType)
     } else None
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to