This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new ddccf5add8f5 [SPARK-45561][SQL] Add proper conversions for TINYINT in
MySQLDialect
ddccf5add8f5 is described below
commit ddccf5add8f5aa35693c4120f0b161a74379aec9
Author: Michael Zhang <[email protected]>
AuthorDate: Tue Oct 24 14:51:45 2023 +0500
[SPARK-45561][SQL] Add proper conversions for TINYINT in MySQLDialect
### What changes were proposed in this pull request?
Change MySql Dialect to convert catalyst TINYINT into MySQL TINYINT rather
than BYTE and INTEGER. BYTE does not exist in MySQL. The same applies to
MsSqlServerDialect.
### Why are the changes needed?
Since BYTE type does not exist in MySQL, any casts that could be pushed
down involving BYTE type would fail.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT pass.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43390 from michaelzhan-db/SPARK-45561.
Lead-authored-by: Michael Zhang <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 5092c8970246eb828a31154796c3b16f0b61bddd)
Signed-off-by: Max Gekk <[email protected]>
---
.../scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 8 +++++---
.../src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 ++++-
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 ++
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index dc3acb66ff1f..20fdc965874f 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -56,10 +56,10 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits
BIT(10), "
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci
DECIMAL(40,20), flt FLOAT, "
- + "dbl DOUBLE)").executeUpdate()
+ + "dbl DOUBLE, tiny TINYINT)").executeUpdate()
conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ "17, 77777, 123456789, 123456789012345,
123456789012345.123456789012345, "
- + "42.75, 1.0000000000000002)").executeUpdate()
+ + "42.75, 1.0000000000000002, -128)").executeUpdate()
conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts
TIMESTAMP, "
+ "yr YEAR)").executeUpdate()
@@ -89,7 +89,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
- assert(types.length == 9)
+ assert(types.length == 10)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
@@ -99,6 +99,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(types(6).equals("class java.math.BigDecimal"))
assert(types(7).equals("class java.lang.Double"))
assert(types(8).equals("class java.lang.Double"))
+ assert(types(9).equals("class java.lang.Byte"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
@@ -109,6 +110,7 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
assert(rows(0).getAs[BigDecimal](6).equals(bd))
assert(rows(0).getDouble(7) == 42.75)
assert(rows(0).getDouble(8) == 1.0000000000000002)
+ assert(rows(0).getByte(9) == 0x80.toByte)
}
test("Date types") {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index a08c89318b66..c7e14cc78d5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference,
NamedReference, NullOrdering, SortDirection}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType,
MetadataBuilder, StringType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType,
LongType, MetadataBuilder, StringType}
private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
@@ -102,6 +102,8 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
// Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a
precision of -1.
// Explicitly converts it into StringType here.
Some(StringType)
+ } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) {
+ Some(ByteType)
} else None
}
@@ -184,6 +186,7 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
// We override getJDBCType so that FloatType is mapped to FLOAT instead
case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT))
case StringType => Option(JdbcType("LONGTEXT", java.sql.Types.LONGVARCHAR))
+ case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
case _ => JdbcUtils.getCommonJDBCType(dt)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index eae171e20b70..71c7245b0609 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -914,6 +914,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 1,
metadata) == None)
assert(mySqlDialect.getCatalystType(java.sql.Types.BIT, "TINYINT", 1,
metadata) ==
Some(BooleanType))
+ assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1,
metadata) ==
+ Some(ByteType))
}
test("SPARK-35446: MySQLDialect type mapping of float") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]