This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 12155e8e027c [SPARK-47647][SQL] Make MySQL data source able to read 
bit(n>1) as BinaryType like Postgres
12155e8e027c is described below

commit 12155e8e027c868537fda3796b8661188c306cc2
Author: Kent Yao <[email protected]>
AuthorDate: Fri Mar 29 09:08:34 2024 -0700

    [SPARK-47647][SQL] Make MySQL data source able to read bit(n>1) as 
BinaryType like Postgres
    
    ### What changes were proposed in this pull request?
    
    Make MySQL data source able to read bit(n>1) as BinaryType like Postgres. 
It seemed an unfulfilled work from the original author
    >>// This could instead be a BinaryType if we'd rather return bit-vectors 
of up to 64
      // byte arrays instead of longs.
    
    A property spark.sql.legacy.mysql.bitArrayMapping.enabled is added to 
restore the old behavior.
    
    ### Why are the changes needed?
    
    Make the behavior consistent among different JDBC data sources.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, type mapping changes
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Closes #45773 from yaooqinn/SPARK-47647.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/jdbc/MySQLIntegrationSuite.scala     | 53 ++++++++++++----------
 docs/sql-data-sources-jdbc.md                      |  7 ++-
 docs/sql-migration-guide.md                        |  1 +
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 +++++-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 16 ++++---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 10 +++-
 7 files changed, 75 insertions(+), 35 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 1343f9af7e35..dd680e6bd4a8 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -117,32 +117,35 @@ class MySQLIntegrationSuite extends 
DockerJDBCIntegrationSuite {
   }
 
   test("Numeric types") {
-    val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
-    val rows = df.collect()
-    assert(rows.length == 1)
-    val types = rows(0).toSeq.map(x => x.getClass.toString)
-    assert(types.length == 10)
-    assert(types(0).equals("class java.lang.Boolean"))
-    assert(types(1).equals("class java.lang.Long"))
-    assert(types(2).equals("class java.lang.Short"))
-    assert(types(3).equals("class java.lang.Integer"))
-    assert(types(4).equals("class java.lang.Integer"))
-    assert(types(5).equals("class java.lang.Long"))
-    assert(types(6).equals("class java.math.BigDecimal"))
-    assert(types(7).equals("class java.lang.Float"))
-    assert(types(8).equals("class java.lang.Double"))
-    assert(types(9).equals("class java.lang.Byte"))
-    assert(rows(0).getBoolean(0) == false)
-    assert(rows(0).getLong(1) == 0x225)
-    assert(rows(0).getShort(2) == 17)
-    assert(rows(0).getInt(3) == 77777)
-    assert(rows(0).getInt(4) == 123456789)
-    assert(rows(0).getLong(5) == 123456789012345L)
+    val row = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties).head()
+    assert(row.length === 10)
+    assert(row(0).isInstanceOf[Boolean])
+    assert(row(1).isInstanceOf[Array[Byte]])
+    assert(row(2).isInstanceOf[Short])
+    assert(row(3).isInstanceOf[Int])
+    assert(row(4).isInstanceOf[Int])
+    assert(row(5).isInstanceOf[Long])
+    assert(row(6).isInstanceOf[BigDecimal])
+    assert(row(7).isInstanceOf[Float])
+    assert(row(8).isInstanceOf[Double])
+    assert(row(9).isInstanceOf[Byte])
+    assert(!row.getBoolean(0))
+    assert(java.util.Arrays.equals(row.getAs[Array[Byte]](1),
+      Array[Byte](49, 48, 49, 48, 48, 49, 48, 49)))
+    assert(row.getShort(2) == 17)
+    assert(row.getInt(3) == 77777)
+    assert(row.getInt(4) == 123456789)
+    assert(row.getLong(5) == 123456789012345L)
     val bd = new BigDecimal("123456789012345.12345678901234500000")
-    assert(rows(0).getAs[BigDecimal](6).equals(bd))
-    assert(rows(0).getFloat(7) == 42.75)
-    assert(rows(0).getDouble(8) == 1.0000000000000002)
-    assert(rows(0).getByte(9) == 0x80.toByte)
+    assert(row.getAs[BigDecimal](6).equals(bd))
+    assert(row.getFloat(7) == 42.75)
+    assert(row.getDouble(8) == 1.0000000000000002)
+    assert(row.getByte(9) == 0x80.toByte)
+    withSQLConf(SQLConf.LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED.key -> "true") {
+      val row = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties).head()
+      assert(row(1).isInstanceOf[Long])
+      assert(row.getLong(1) == 0x225)
+    }
   }
 
   test("SPARK-47462: Unsigned numeric types") {
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 17a61078cccf..3563088c600f 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -451,10 +451,15 @@ are also available to connect MySQL, may have different 
mapping rules.
       <td>BooleanType</td>
       <td></td>
     </tr>
+    <tr>
+      <td>BIT( &gt;1 )</td>
+      <td>BinaryType</td>
+      <td>(Default)</td>
+    </tr>
     <tr>
       <td>BIT( &gt;1 )</td>
       <td>LongType</td>
-      <td></td>
+      <td>spark.sql.legacy.mysql.bitArrayMapping.enabled=true</td>
     </tr>
     <tr>
       <td>TINYINT(1)</td>
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 9a898a8fdc62..d9187e951ae1 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -43,6 +43,7 @@ license: |
 - Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert 
MySQL TIMESTAMP to TimestampNTZType, which is available in Spark 3.5. MySQL 
DATETIME is not affected.
 - Since Spark 4.0, MySQL JDBC datasource will read SMALLINT as ShortType, 
while in Spark 3.5 and previous, it was read as IntegerType. MEDIUMINT UNSIGNED 
is read as IntegerType, while in Spark 3.5 and previous, it was read as 
LongType. To restore the previous behavior, you can cast the column to the old 
type.
 - Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while 
in Spark 3.5 and previous, it was read as DoubleType. To restore the previous 
behavior, you can cast the column to the old type.
+- Since Spark 4.0, MySQL JDBC datasource will read BIT(n > 1) as BinaryType, 
while in Spark 3.5 and previous, read as LongType. To restore the previous 
behavior, set `spark.sql.legacy.mysql.bitArrayMapping.enabled` to `true`.
 
 ## Upgrading from Spark SQL 3.5.1 to 3.5.2
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fd1cb6491edb..fc7425ce2bea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4067,11 +4067,19 @@ object SQLConf {
   val LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED =
     buildConf("spark.sql.legacy.mssqlserver.numericMapping.enabled")
       .internal()
-      .doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
+      .doc("When true, use legacy MsSqlServer SMALLINT and REAL type mapping.")
       .version("2.4.5")
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED =
+    buildConf("spark.sql.legacy.mysql.bitArrayMapping.enabled")
+      .internal()
+      .doc("When true, use LongType to represent MySQL BIT(n>1); otherwise, 
use BinaryType.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val CSV_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.csv.filterPushdown.enabled")
     .doc("When true, enable filter pushdown to CSV datasource.")
     .version("3.0.0")
@@ -5179,6 +5187,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def legacyMsSqlServerNumericMappingEnabled: Boolean =
     getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
 
+  def legacyMySqlBitArrayMappingEnabled: Boolean =
+    getConf(LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED)
+
   override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
     LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b2612e2975a3..27ef8bd75772 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -509,6 +509,16 @@ object JdbcUtils extends Logging with SQLConfHelper {
           row.update(pos, null)
         }
 
+    case BinaryType if metadata.contains("binarylong") =>
+      (rs: ResultSet, row: InternalRow, pos: Int) =>
+        val bytes = rs.getBytes(pos + 1)
+        if (bytes != null) {
+          val binary = 
bytes.flatMap(Integer.toBinaryString(_).getBytes(StandardCharsets.US_ASCII))
+          row.update(pos, binary)
+        } else {
+          row.update(pos, null)
+        }
+
     case BinaryType =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         row.update(pos, rs.getBytes(pos + 1))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 3c238dc8a837..7e8598a147e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -92,17 +92,21 @@ private case class MySQLDialect() extends JdbcDialect with 
SQLConfHelper {
 
   override def getCatalystType(
       sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
+    def getCatalystTypeForBitArray: Option[DataType] = {
+      md.putLong("binarylong", 1)
+      if (conf.legacyMySqlBitArrayMappingEnabled) {
+        Some(LongType)
+      } else {
+        Some(BinaryType)
+      }
+    }
     sqlType match {
       case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
         // MariaDB connector behaviour
-        // This could instead be a BinaryType if we'd rather return 
bit-vectors of up to 64 bits as
-        // byte arrays instead of longs.
-        md.putLong("binarylong", 1)
-        Some(LongType)
+        getCatalystTypeForBitArray
       case Types.BIT if size > 1 =>
         // MySQL connector behaviour
-        md.putLong("binarylong", 1)
-        Some(LongType)
+        getCatalystTypeForBitArray
       case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>
         // TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS 
for historical reason
         Some(StringType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 28e86e8ca1a5..9a8c0bb9df1b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -930,9 +930,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
   test("MySQLDialect catalyst type mapping") {
     val mySqlDialect = JdbcDialects.get("jdbc:mysql")
     val metadata = new MetadataBuilder().putBoolean("isSigned", value = true)
-    assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, 
metadata) ==
-      Some(LongType))
+    assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, 
metadata) ===
+      Some(BinaryType))
     assert(metadata.build().contains("binarylong"))
+    withSQLConf(SQLConf.LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED.key -> "true") {
+      metadata.remove("binarylong")
+      assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2, 
metadata) ===
+        Some(LongType))
+      assert(metadata.build().contains("binarylong"))
+    }
     assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 1, 
metadata) == None)
     assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, 
metadata) ==
       Some(ByteType))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to