This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 63cb0939679 [SPARK-43362][SQL] Special handling of JSON type for MySQL connector 63cb0939679 is described below commit 63cb09396798da2a16db4bae2b42e0f95bef831b Author: tianhanhu <adrianh...@gmail.com> AuthorDate: Mon May 8 10:25:37 2023 +0900 [SPARK-43362][SQL] Special handling of JSON type for MySQL connector ### What changes were proposed in this pull request? MySQL JSON type is converted into JDBC VARCHAR type with precision of -1 on some MariaDB drivers. When receiving VARCHAR with negative precision, Spark will throw an error. This PR special cases this scenario by directly converting JSON type into StringType in MySQLDialect. ### Why are the changes needed? Enable reading MySQL tables that has a JSON column. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update existing integration test Closes #41040 from tianhanhu/SPARK-43362. Authored-by: tianhanhu <adrianh...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 8 +++++--- .../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 ++++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index c5ca5a72a83..dc3acb66ff1 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -68,10 +68,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { // TODO: Test locale conversion for strings. conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c TINYTEXT, " - + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i BLOB)" + + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i BLOB, j JSON)" ).executeUpdate() conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " + - "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate() + "'jumps', 'over', 'the', 'lazy', 'dog', '{\"status\": \"merrily\"}')").executeUpdate() } test("Basic test") { @@ -137,7 +137,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) - assert(types.length == 9) + assert(types.length == 10) assert(types(0).equals("class java.lang.String")) assert(types(1).equals("class java.lang.String")) assert(types(2).equals("class java.lang.String")) @@ -147,6 +147,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(6).equals("class [B")) assert(types(7).equals("class [B")) assert(types(8).equals("class [B")) + assert(types(9).equals("class java.lang.String")) assert(rows(0).getString(0).equals("the".padTo(10, ' '))) assert(rows(0).getString(1).equals("quick")) assert(rows(0).getString(2).equals("brown")) @@ -156,6 +157,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0))) assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121))) assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103))) + assert(rows(0).getString(9).equals("{\"status\": \"merrily\"}")) } test("Basic write test") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 5e85ff3ebf6..d6edb67e57e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -98,6 +98,10 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { } else if ("TINYTEXT".equalsIgnoreCase(typeName)) { // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for historical reason Some(StringType) + } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) { + // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. + // Explicitly converts it into StringType here. + Some(StringType) } else None } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org