This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bd42cbdb6bf [SPARK-47435][SQL] Fix overflow issue of MySQL UNSIGNED 
TINYINT  caused by SPARK-45561
8bd42cbdb6bf is described below

commit 8bd42cbdb6bfa40aead94570b06e926f8e8aa9e1
Author: Kent Yao <y...@apache.org>
AuthorDate: Mon Mar 18 08:56:55 2024 -0700

    [SPARK-47435][SQL] Fix overflow issue of MySQL UNSIGNED TINYINT  caused by 
SPARK-45561
    
    ### What changes were proposed in this pull request?
    
    SPARK-45561 mapped java.sql.Types.TINYINT to ByteType in MySQL Dialect, 
which caused unsigned TINYINT overflow. As regardless of signed or unsigned 
types, the TINYINT is used for java.sql.Types.
    
    In this PR, we put the signed info into the metadata for mapping TINYINT to 
short or byte.
    
    ### Why are the changes needed?
    
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Uses can read MySQL UNSIGNED TINYINT values after this PR like versions 
before 3.5.0 which has breaked since 3.5.1
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45556 from yaooqinn/SPARK-47435.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     |  9 ++--
 .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala    |  9 ++--
 .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala  |  6 ++-
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  | 15 ++++--
 .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala |  9 ++--
 .../sql/jdbc/v2/PostgresIntegrationSuite.scala     |  9 ++--
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 26 ++++++----
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  5 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 10 ++--
 .../v2/jdbc/JDBCTableCatalogSuite.scala            | 60 ++++++++++++----------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 24 +++++----
 11 files changed, 114 insertions(+), 68 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index b1d239337aa0..79e88f109534 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -57,10 +57,11 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 
     conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits 
BIT(10), "
       + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci 
DECIMAL(40,20), flt FLOAT, "
-      + "dbl DOUBLE, tiny TINYINT)").executeUpdate()
+      + "dbl DOUBLE, tiny TINYINT, u_tiny TINYINT UNSIGNED)").executeUpdate()
+
     conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
       + "17, 77777, 123456789, 123456789012345, 
123456789012345.123456789012345, "
-      + "42.75, 1.0000000000000002, -128)").executeUpdate()
+      + "42.75, 1.0000000000000002, -128, 255)").executeUpdate()
 
     conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts 
TIMESTAMP, "
       + "yr YEAR)").executeUpdate()
@@ -90,7 +91,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     val rows = df.collect()
     assert(rows.length == 1)
     val types = rows(0).toSeq.map(x => x.getClass.toString)
-    assert(types.length == 10)
+    assert(types.length == 11)
     assert(types(0).equals("class java.lang.Boolean"))
     assert(types(1).equals("class java.lang.Long"))
     assert(types(2).equals("class java.lang.Integer"))
@@ -101,6 +102,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(types(7).equals("class java.lang.Double"))
     assert(types(8).equals("class java.lang.Double"))
     assert(types(9).equals("class java.lang.Byte"))
+    assert(types(10).equals("class java.lang.Short"))
     assert(rows(0).getBoolean(0) == false)
     assert(rows(0).getLong(1) == 0x225)
     assert(rows(0).getInt(2) == 17)
@@ -112,6 +114,7 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
     assert(rows(0).getDouble(7) == 42.75)
     assert(rows(0).getDouble(8) == 1.0000000000000002)
     assert(rows(0).getByte(9) == 0x80.toByte)
+    assert(rows(0).getShort(10) == 0xff.toShort)
   }
 
   test("Date types") {
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
index c3ec7e1925fa..6c1b7fdd1be5 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
@@ -70,11 +70,13 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest {
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE DOUBLE")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", DoubleType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", DoubleType, true, defaultMetadata(DoubleType))
     assert(t.schema === expectedSchema)
     // Update column type from DOUBLE to STRING
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)"
@@ -97,7 +99,8 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest {
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('CCSID'='UNICODE')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
index fc93f5cba4c0..e451cc2b8c52 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
@@ -93,11 +93,13 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JD
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 5e340f135c85..2b189db2c1cb 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -83,6 +83,12 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
 
   private var mySQLVersion = -1
 
+  override def defaultMetadata(dataType: DataType = StringType): Metadata = 
new MetadataBuilder()
+    .putLong("scale", 0)
+    .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", true)
+    .build()
+
   override def tablePreparation(connection: Connection): Unit = {
     mySQLVersion = connection.getMetaData.getDatabaseMajorVersion
     connection.prepareStatement(
@@ -93,11 +99,13 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
@@ -145,7 +153,8 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTest
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('ENGINE'='InnoDB', 'DEFAULT CHARACTER SET'='utf8')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
index 591147413486..0aa2905f93b8 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
@@ -77,9 +77,10 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTes
   override val namespaceOpt: Option[String] = Some("SYSTEM")
   override val db = new OracleDatabaseOnDocker
 
-  override val defaultMetadata: Metadata = new MetadataBuilder()
+  override def defaultMetadata(dataType: DataType): Metadata = new 
MetadataBuilder()
     .putLong("scale", 0)
     .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType] || 
dataType.isInstanceOf[StringType])
     .putString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY, "varchar(255)")
     .build()
 
@@ -101,11 +102,13 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCTes
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", DecimalType(10, 0), true, 
super.defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", DecimalType(10, 0), true, 
super.defaultMetadata(DecimalType(10, 0)))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE LONG")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, 
super.defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", DecimalType(19, 0), true, 
super.defaultMetadata(DecimalType(19, 0)))
     assert(t.schema === expectedSchema)
     // Update column type from LONG to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index 233a634cac67..1f09c2fd3fc5 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -64,11 +64,13 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCT
   override def testUpdateColumnType(tbl: String): Unit = {
     sql(s"CREATE TABLE $tbl (ID INTEGER)")
     var t = spark.table(tbl)
-    var expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    var expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE STRING")
     t = spark.table(tbl)
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType()
+      .add("ID", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
     val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
@@ -91,7 +93,8 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationV2Suite with V2JDBCT
     sql(s"CREATE TABLE $tbl (ID INT)" +
       s" TBLPROPERTIES('TABLESPACE'='pg_default')")
     val t = spark.table(tbl)
-    val expectedSchema = new StructType().add("ID", IntegerType, true, 
defaultMetadata)
+    val expectedSchema = new StructType()
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === expectedSchema)
   }
 
diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 8dd377f4a35f..c80fbfc748dd 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -49,9 +49,10 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with 
DockerIntegrationFu
 
   def notSupportsTableComment: Boolean = false
 
-  def defaultMetadata: Metadata = new MetadataBuilder()
+  def defaultMetadata(dataType: DataType = StringType): Metadata = new 
MetadataBuilder()
     .putLong("scale", 0)
     .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
     .build()
 
   def testUpdateColumnNullability(tbl: String): Unit = {
@@ -59,11 +60,11 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
     var t = spark.table(s"$catalogName.alt_table")
     // nullable is true in the expectedSchema because Spark always sets 
nullable to true
     // regardless of the JDBC metadata 
https://github.com/apache/spark/pull/18445
-    var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata())
     assert(t.schema === expectedSchema)
     sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN ID DROP NOT NULL")
     t = spark.table(s"$catalogName.alt_table")
-    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+    expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata())
     assert(t.schema === expectedSchema)
     // Update nullability of not existing column
     val msg = intercept[AnalysisException] {
@@ -75,8 +76,9 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with 
DockerIntegrationFu
   def testRenameColumn(tbl: String): Unit = {
     sql(s"ALTER TABLE $tbl RENAME COLUMN ID TO RENAMED")
     val t = spark.table(s"$tbl")
-    val expectedSchema = new StructType().add("RENAMED", StringType, true, 
defaultMetadata)
-      .add("ID1", StringType, true, defaultMetadata).add("ID2", StringType, 
true, defaultMetadata)
+    val expectedSchema = new StructType().add("RENAMED", StringType, true, 
defaultMetadata())
+      .add("ID1", StringType, true, defaultMetadata())
+      .add("ID2", StringType, true, defaultMetadata())
     assert(t.schema === expectedSchema)
   }
 
@@ -86,16 +88,19 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
     withTable(s"$catalogName.alt_table") {
       sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
       var t = spark.table(s"$catalogName.alt_table")
-      var expectedSchema = new StructType().add("ID", StringType, true, 
defaultMetadata)
+      var expectedSchema = new StructType()
+        .add("ID", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C1 STRING, C2 
STRING)")
       t = spark.table(s"$catalogName.alt_table")
-      expectedSchema = expectedSchema.add("C1", StringType, true, 
defaultMetadata)
-        .add("C2", StringType, true, defaultMetadata)
+      expectedSchema = expectedSchema
+        .add("C1", StringType, true, defaultMetadata())
+        .add("C2", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $catalogName.alt_table ADD COLUMNS (C3 STRING)")
       t = spark.table(s"$catalogName.alt_table")
-      expectedSchema = expectedSchema.add("C3", StringType, true, 
defaultMetadata)
+      expectedSchema = expectedSchema
+        .add("C3", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       // Add already existing column
       checkError(
@@ -128,7 +133,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession 
with DockerIntegrationFu
       sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN C1")
       sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN c3")
       val t = spark.table(s"$catalogName.alt_table")
-      val expectedSchema = new StructType().add("C2", StringType, true, 
defaultMetadata)
+      val expectedSchema = new StructType()
+        .add("C2", StringType, true, defaultMetadata())
       assert(t.schema === expectedSchema)
       // Drop not existing column
       val msg = intercept[AnalysisException] {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index bc88ab9bfcae..84d87f008217 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -274,6 +274,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
     val fields = new Array[StructField](ncols)
     var i = 0
     while (i < ncols) {
+      val metadata = new MetadataBuilder()
       val columnName = rsmd.getColumnLabel(i + 1)
       val dataType = rsmd.getColumnType(i + 1)
       val typeName = rsmd.getColumnTypeName(i + 1)
@@ -294,8 +295,6 @@ object JdbcUtils extends Logging with SQLConfHelper {
       } else {
         rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
       }
-      val metadata = new MetadataBuilder()
-      metadata.putLong("scale", fieldScale)
 
       dataType match {
         case java.sql.Types.TIME =>
@@ -307,7 +306,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
           metadata.putBoolean("rowid", true)
         case _ =>
       }
+      metadata.putBoolean("isSigned", isSigned)
       metadata.putBoolean("isTimestampNTZ", isTimestampNTZ)
+      metadata.putLong("scale", fieldScale)
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
           getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, 
isTimestampNTZ))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 42b1a3a2854e..4e5f092b193c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference, NullOrdering, SortDirection}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, 
LongType, MetadataBuilder, StringType, TimestampType}
+import org.apache.spark.sql.types._
 
 private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
 
@@ -107,8 +107,12 @@ private case object MySQLDialect extends JdbcDialect with 
SQLConfHelper {
         // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with 
a precision of -1.
         // Explicitly converts it into StringType here.
         Some(StringType)
-      case Types.TINYINT if "TINYINT".equalsIgnoreCase(typeName) =>
-        Some(ByteType)
+      case Types.TINYINT =>
+        if (md.build().getBoolean("isSigned")) {
+          Some(ByteType)
+        } else {
+          Some(ShortType)
+        }
       case Types.TIMESTAMP if "DATETIME".equalsIgnoreCase(typeName) =>
         // scalastyle:off line.size.limit
         // In MYSQL, DATETIME is TIMESTAMP WITHOUT TIME ZONE
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
index fc313de6c8fe..f4e7921e88bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala
@@ -37,9 +37,11 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
   val tempDir = Utils.createTempDir()
   val url = 
s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass"
-  val defaultMetadata = new MetadataBuilder()
+
+  def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder()
     .putLong("scale", 0)
     .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
     .build()
 
   override def sparkConf: SparkConf = super.sparkConf
@@ -142,8 +144,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
   test("load a table") {
     val t = spark.table("h2.test.people")
     val expectedSchema = new StructType()
-      .add("NAME", VarcharType(32), true, defaultMetadata)
-      .add("ID", IntegerType, true, defaultMetadata)
+      .add("NAME", VarcharType(32), true, defaultMetadata(VarcharType(32)))
+      .add("ID", IntegerType, true, defaultMetadata(IntegerType))
     assert(t.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
     Seq(
       "h2.test.not_existing_table" -> "`h2`.`test`.`not_existing_table`",
@@ -185,13 +187,13 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ADD COLUMNS (C1 INTEGER, C2 STRING)")
       var t = spark.table(tableName)
       var expectedSchema = new StructType()
-        .add("ID", IntegerType, true, defaultMetadata)
-        .add("C1", IntegerType, true, defaultMetadata)
-        .add("C2", StringType, true, defaultMetadata)
+        .add("ID", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C1", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C2", StringType, true, defaultMetadata(StringType))
       assert(t.schema === expectedSchema)
       sql(s"ALTER TABLE $tableName ADD COLUMNS (c3 DOUBLE)")
       t = spark.table(tableName)
-      expectedSchema = expectedSchema.add("c3", DoubleType, true, 
defaultMetadata)
+      expectedSchema = expectedSchema.add("c3", DoubleType, true, 
defaultMetadata(DoubleType))
       assert(t.schema === expectedSchema)
       // Add already existing column
       checkError(
@@ -229,8 +231,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName RENAME COLUMN id TO C")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("C", IntegerType, true, defaultMetadata)
-        .add("C0", IntegerType, true, defaultMetadata)
+        .add("C", IntegerType, true, defaultMetadata(IntegerType))
+        .add("C0", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Rename to already existing column
       checkError(
@@ -268,7 +270,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName DROP COLUMN C1")
       sql(s"ALTER TABLE $tableName DROP COLUMN c3")
       val t = spark.table(tableName)
-      val expectedSchema = new StructType().add("C2", IntegerType, true, 
defaultMetadata)
+      val expectedSchema = new StructType()
+        .add("C2", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Drop not existing column
       val sqlText = s"ALTER TABLE $tableName DROP COLUMN bad_column"
@@ -307,8 +310,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE DOUBLE")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("ID", DoubleType, true, defaultMetadata)
-        .add("deptno", DoubleType, true, defaultMetadata)
+        .add("ID", DoubleType, true, defaultMetadata(DoubleType))
+        .add("deptno", DoubleType, true, defaultMetadata(DoubleType))
       assert(t.schema === expectedSchema)
       // Update not existing column
       val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column TYPE 
DOUBLE"
@@ -356,8 +359,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno DROP NOT NULL")
       val t = spark.table(tableName)
       val expectedSchema = new StructType()
-        .add("ID", IntegerType, true, defaultMetadata)
-        .add("deptno", IntegerType, true, defaultMetadata)
+        .add("ID", IntegerType, true, defaultMetadata(IntegerType))
+        .add("deptno", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
       // Update nullability of not existing column
       val sqlText = s"ALTER TABLE $tableName ALTER COLUMN bad_column DROP NOT 
NULL"
@@ -491,8 +494,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"CREATE TABLE $tableName (c1 INTEGER NOT NULL, c2 INTEGER)")
       var t = spark.table(tableName)
       var expectedSchema = new StructType()
-        .add("c1", IntegerType, true, defaultMetadata)
-        .add("c2", IntegerType, true, defaultMetadata)
+        .add("c1", IntegerType, true, defaultMetadata(IntegerType))
+        .add("c2", IntegerType, true, defaultMetadata(IntegerType))
       assert(t.schema === expectedSchema)
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
@@ -516,8 +519,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName RENAME COLUMN C2 TO c3")
         expectedSchema = new StructType()
-          .add("c1", IntegerType, true, defaultMetadata)
-          .add("c3", IntegerType, true, defaultMetadata)
+          .add("c1", IntegerType, true, defaultMetadata(IntegerType))
+          .add("c3", IntegerType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -542,7 +545,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName DROP COLUMN C3")
-        expectedSchema = new StructType().add("c1", IntegerType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", IntegerType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -566,7 +570,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName ALTER COLUMN C1 TYPE DOUBLE")
-        expectedSchema = new StructType().add("c1", DoubleType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", DoubleType, true, defaultMetadata(DoubleType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -590,7 +595,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
 
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
         sql(s"ALTER TABLE $tableName ALTER COLUMN C1 DROP NOT NULL")
-        expectedSchema = new StructType().add("c1", DoubleType, true, 
defaultMetadata)
+        expectedSchema = new StructType()
+          .add("c1", DoubleType, true, defaultMetadata(IntegerType))
         t = spark.table(tableName)
         assert(t.schema === expectedSchema)
       }
@@ -660,8 +666,8 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
       sql(s"ALTER TABLE $tableName ALTER COLUMN deptno TYPE VARCHAR(30)")
       val t = spark.table(tableName)
       val expected = new StructType()
-        .add("ID", CharType(10), true, defaultMetadata)
-        .add("deptno", VarcharType(30), true, defaultMetadata)
+        .add("ID", CharType(10), true, defaultMetadata(CharType(10)))
+        .add("deptno", VarcharType(30), true, defaultMetadata(VarcharType(30)))
       val replaced = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected)
       assert(t.schema === replaced)
     }
@@ -674,13 +680,13 @@ class JDBCTableCatalogSuite extends QueryTest with 
SharedSparkSession {
         .executeUpdate())
       withSQLConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key -> "true") {
         val expected = new StructType()
-          .add("ID", StringType, true, defaultMetadata)
-          .add("DEPTNO", StringType, true, defaultMetadata)
+          .add("ID", StringType, true, defaultMetadata(StringType))
+          .add("DEPTNO", StringType, true, defaultMetadata(StringType))
         assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === expected)
       }
       val expected = new StructType()
-        .add("ID", CharType(5), true, defaultMetadata)
-        .add("DEPTNO", VarcharType(10), true, defaultMetadata)
+        .add("ID", CharType(5), true, defaultMetadata(CharType(5)))
+        .add("DEPTNO", VarcharType(10), true, defaultMetadata(VarcharType(10)))
       val replaced = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expected)
       assert(sql(s"SELECT * FROM h2.test.char_tbl").schema === replaced)
     } finally {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9e8df6d733e0..a2dac5a9e1e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -78,9 +78,10 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  val defaultMetadata = new MetadataBuilder()
+  def defaultMetadata(dataType: DataType): Metadata = new MetadataBuilder()
     .putLong("scale", 0)
     .putBoolean("isTimestampNTZ", false)
+    .putBoolean("isSigned", dataType.isInstanceOf[NumericType])
     .build()
 
   override def beforeAll(): Unit = {
@@ -928,7 +929,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
 
   test("MySQLDialect catalyst type mapping") {
     val mySqlDialect = JdbcDialects.get("jdbc:mysql")
-    val metadata = new MetadataBuilder()
+    val metadata = new MetadataBuilder().putBoolean("isSigned", value = true)
     assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, 
metadata) ==
       Some(LongType))
     assert(metadata.build().contains("binarylong"))
@@ -937,6 +938,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       Some(BooleanType))
     assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, 
metadata) ==
       Some(ByteType))
+    metadata.putBoolean("isSigned", value = false)
+    assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, 
metadata) ===
+      Some(ShortType))
   }
 
   test("SPARK-35446: MySQLDialect type mapping of float") {
@@ -1386,8 +1390,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
   }
 
   test("SPARK-16848: jdbc API throws an exception for user specified schema") {
-    val schema = StructType(Seq(StructField("name", StringType, false, 
defaultMetadata),
-      StructField("theid", IntegerType, false, defaultMetadata)))
+    val schema = StructType(Seq(StructField("name", StringType, false, 
defaultMetadata(StringType)),
+      StructField("theid", IntegerType, false, defaultMetadata(IntegerType))))
     val parts = Array[String]("THEID < 2", "THEID >= 2")
     val e1 = intercept[AnalysisException] {
       spark.read.schema(schema).jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, 
new Properties())
@@ -1407,8 +1411,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
     props.put("customSchema", customSchema)
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
     assert(df.schema.size === 2)
-    val expectedSchema = new 
StructType(CatalystSqlParser.parseTableSchema(customSchema).map(
-      f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata)).toArray)
+    val structType = CatalystSqlParser.parseTableSchema(customSchema)
+    val expectedSchema = new StructType(structType.map(
+      f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata(f.dataType))).toArray)
     assert(df.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
     assert(df.count() === 3)
   }
@@ -1426,7 +1431,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
       val df = sql("select * from people_view")
       assert(df.schema.length === 2)
       val expectedSchema = new 
StructType(CatalystSqlParser.parseTableSchema(customSchema)
-        .map(f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata)).toArray)
+        .map(f => StructField(f.name, f.dataType, f.nullable, 
defaultMetadata(f.dataType))).toArray)
 
       assert(df.schema === 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(expectedSchema))
       assert(df.count() === 3)
@@ -1577,8 +1582,9 @@ class JDBCSuite extends QueryTest with SharedSparkSession 
{
     }
 
   test("jdbc data source shouldn't have unnecessary metadata in its schema") {
-    var schema = StructType(Seq(StructField("NAME", VarcharType(32), true, 
defaultMetadata),
-      StructField("THEID", IntegerType, true, defaultMetadata)))
+    var schema = StructType(
+      Seq(StructField("NAME", VarcharType(32), true, 
defaultMetadata(VarcharType(32))),
+      StructField("THEID", IntegerType, true, defaultMetadata(IntegerType))))
     schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
     val df = spark.read.format("jdbc")
       .option("Url", urlWithUserAndPass)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to