This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 824d67052c75 [SPARK-47537][SQL] Fix error data type mapping on MySQL
Connector/J
824d67052c75 is described below
commit 824d67052c7542594fe98405b5062593d90233ee
Author: Kent Yao <[email protected]>
AuthorDate: Sun Mar 24 23:04:44 2024 -0700
[SPARK-47537][SQL] Fix error data type mapping on MySQL Connector/J
### What changes were proposed in this pull request?
This PR fixes:
- BIT(n>1) is wrongly mapping to boolean instead of long for MySQL
Connector/J. This is because we only have a case branch for Maria Connector/J.
- MySQL Docker Integration Tests were using Maria Connector/J, not MySQL
Connector/J
### Why are the changes needed?
Bugfix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45689 from yaooqinn/SPARK-47537.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/jdbc/MySQLDatabaseOnDocker.scala | 4 +-
.../spark/sql/jdbc/MySQLIntegrationSuite.scala | 45 ++++++++++++++++++----
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 29 +++++++++++---
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 +++
4 files changed, 67 insertions(+), 16 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
index 87b13a06d965..568eb5f10973 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLDatabaseOnDocker.scala
@@ -26,6 +26,6 @@ class MySQLDatabaseOnDocker extends DatabaseOnDocker {
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
- s"jdbc:mysql://$ip:$port/" +
-
s"mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true&useSSL=false"
+
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
+
+ s"&useSSL=false&disableMariaDbDriver"
}
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 921e63acf7e1..09eb99c25227 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -22,9 +22,10 @@ import java.sql.{Connection, Date, Timestamp}
import java.time.LocalDateTime
import java.util.Properties
+import scala.util.Using
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StructType}
import org.apache.spark.tags.DockerTest
/**
@@ -85,6 +86,16 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
.executeUpdate()
}
+ def testConnection(): Unit = {
+ Using.resource(getConnection()) { conn =>
+ assert(conn.getClass.getName === "com.mysql.cj.jdbc.ConnectionImpl")
+ }
+ }
+
+ test("SPARK-47537: ensure use the right jdbc driver") {
+ testConnection()
+ }
+
test("Basic test") {
val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
val rows = df.collect()
@@ -246,13 +257,6 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
checkAnswer(df, Row(true, true, true))
df.write.mode("append").jdbc(jdbcUrl, "bools", new Properties)
checkAnswer(df, Seq(Row(true, true, true), Row(true, true, true)))
- val mb = new MetadataBuilder()
- .putBoolean("isTimestampNTZ", false)
- .putLong("scale", 0)
- assert(df.schema === new StructType()
- .add("b1", BooleanType, nullable = true, mb.putBoolean("isSigned",
true).build())
- .add("b2", BooleanType, nullable = true, mb.putBoolean("isSigned",
false).build())
- .add("b3", BooleanType, nullable = true, mb.putBoolean("isSigned",
true).build()))
}
test("SPARK-47515: Save TimestampNTZType as DATETIME in MySQL") {
@@ -272,3 +276,28 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
checkAnswer(df, Row(1.23f, 4.56f, 7.89d, 1.23d, 4.56d, 7.89d))
}
}
+
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ * ./build/sbt -Pdocker-integration-tests
+ * "docker-integration-tests/testOnly
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+
+ override val db = new MySQLDatabaseOnDocker {
+ override def getJdbcUrl(ip: String, port: Int): String =
+
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
+
+ s"&useSSL=false"
+ }
+
+ override def testConnection(): Unit = {
+ Using.resource(getConnection()) { conn =>
+ assert(conn.getClass.getName === "org.mariadb.jdbc.MariaDbConnection")
+ }
+ }
+}
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 5a25509fd003..4997d335fda6 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -72,12 +72,6 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationV2Suite with V2JDBCTest
private var mySQLVersion = -1
- override def defaultMetadata(dataType: DataType = StringType): Metadata =
new MetadataBuilder()
- .putLong("scale", 0)
- .putBoolean("isTimestampNTZ", false)
- .putBoolean("isSigned", true)
- .build()
-
override def tablePreparation(connection: Connection): Unit = {
mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
connection.prepareStatement(
@@ -162,3 +156,26 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationV2Suite with V2JDBCTest
}
}
}
+
+/**
+ * To run this test suite for a specific version (e.g., mysql:8.3.0):
+ * {{{
+ * ENABLE_DOCKER_INTEGRATION_TESTS=1 MYSQL_DOCKER_IMAGE_NAME=mysql:8.3.0
+ * ./build/sbt -Pdocker-integration-tests
+ * "docker-integration-tests/testOnly
*MySQLOverMariaConnectorIntegrationSuite"
+ * }}}
+ */
+@DockerTest
+class MySQLOverMariaConnectorIntegrationSuite extends MySQLIntegrationSuite {
+ override def defaultMetadata(dataType: DataType = StringType): Metadata =
new MetadataBuilder()
+ .putLong("scale", 0)
+ .putBoolean("isTimestampNTZ", false)
+ .putBoolean("isSigned", true)
+ .build()
+
+ override val db = new MySQLDatabaseOnDocker {
+ override def getJdbcUrl(ip: String, port: Int): String =
+
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass&allowPublicKeyRetrieval=true"
+
+ s"&useSSL=false"
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 5cd49961554b..2735abfe9c39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -94,10 +94,15 @@ private case class MySQLDialect() extends JdbcDialect with
SQLConfHelper {
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
sqlType match {
case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
+ // MariaDB connector behaviour
// This could instead be a BinaryType if we'd rather return
bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Some(LongType)
+ case Types.BIT if size > 1 =>
+ // MySQL connector behaviour
+ md.putLong("binarylong", 1)
+ Some(LongType)
case Types.BIT if "TINYINT".equalsIgnoreCase(typeName) =>
Some(BooleanType)
case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]