This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 63cb0939679 [SPARK-43362][SQL] Special handling of JSON type for MySQL
connector
63cb0939679 is described below
commit 63cb09396798da2a16db4bae2b42e0f95bef831b
Author: tianhanhu <[email protected]>
AuthorDate: Mon May 8 10:25:37 2023 +0900
[SPARK-43362][SQL] Special handling of JSON type for MySQL connector
### What changes were proposed in this pull request?
MySQL JSON type is converted into JDBC VARCHAR type with precision of -1 on
some MariaDB drivers.
When receiving VARCHAR with negative precision, Spark will throw an error.
This PR special cases this scenario by directly converting JSON type into
StringType in MySQLDialect.
### Why are the changes needed?
Enable reading MySQL tables that has a JSON column.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Update existing integration test
Closes #41040 from tianhanhu/SPARK-43362.
Authored-by: tianhanhu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 8 +++++---
.../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 ++++
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index c5ca5a72a83..dc3acb66ff1 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -68,10 +68,10 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
// TODO: Test locale conversion for strings.
conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c
TINYTEXT, "
- + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i
BLOB)"
+ + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i
BLOB, j JSON)"
).executeUpdate()
conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick',
'brown', 'fox', " +
- "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate()
+ "'jumps', 'over', 'the', 'lazy', 'dog', '{\"status\":
\"merrily\"}')").executeUpdate()
}
test("Basic test") {
@@ -137,7 +137,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
- assert(types.length == 9)
+ assert(types.length == 10)
assert(types(0).equals("class java.lang.String"))
assert(types(1).equals("class java.lang.String"))
assert(types(2).equals("class java.lang.String"))
@@ -147,6 +147,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(types(6).equals("class [B"))
assert(types(7).equals("class [B"))
assert(types(8).equals("class [B"))
+ assert(types(9).equals("class java.lang.String"))
assert(rows(0).getString(0).equals("the".padTo(10, ' ')))
assert(rows(0).getString(1).equals("quick"))
assert(rows(0).getString(2).equals("brown"))
@@ -156,6 +157,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6),
Array[Byte](116, 104, 101, 0)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7),
Array[Byte](108, 97, 122, 121)))
assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8),
Array[Byte](100, 111, 103)))
+ assert(rows(0).getString(9).equals("{\"status\": \"merrily\"}"))
}
test("Basic write test") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5e85ff3ebf6..d6edb67e57e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -98,6 +98,10 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
} else if ("TINYTEXT".equalsIgnoreCase(typeName)) {
// TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS for
historical reason
Some(StringType)
+ } else if (sqlType == Types.VARCHAR && typeName.equals("JSON")) {
+ // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a
precision of -1.
+ // Explicitly converts it into StringType here.
+ Some(StringType)
} else None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]