This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 12155e8e027c [SPARK-47647][SQL] Make MySQL data source able to read
bit(n>1) as BinaryType like Postgres
12155e8e027c is described below
commit 12155e8e027c868537fda3796b8661188c306cc2
Author: Kent Yao <[email protected]>
AuthorDate: Fri Mar 29 09:08:34 2024 -0700
[SPARK-47647][SQL] Make MySQL data source able to read bit(n>1) as
BinaryType like Postgres
### What changes were proposed in this pull request?
Make MySQL data source able to read bit(n>1) as BinaryType like Postgres.
It seemed an unfulfilled work from the original author
>>// This could instead be a BinaryType if we'd rather return bit-vectors
of up to 64
// byte arrays instead of longs.
A property spark.sql.legacy.mysql.bitArrayMapping.enabled is added to
restore the old behavior.
### Why are the changes needed?
Make the behavior consistent among different JDBC data sources.
### Does this PR introduce _any_ user-facing change?
yes, type mapping changes
### How was this patch tested?
new test
### Was this patch authored or co-authored using generative AI tooling?
Closes #45773 from yaooqinn/SPARK-47647.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/jdbc/MySQLIntegrationSuite.scala | 53 ++++++++++++----------
docs/sql-data-sources-jdbc.md | 7 ++-
docs/sql-migration-guide.md | 1 +
.../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 10 ++++
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 16 ++++---
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 10 +++-
7 files changed, 75 insertions(+), 35 deletions(-)
diff --git
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 1343f9af7e35..dd680e6bd4a8 100644
---
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -117,32 +117,35 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite {
}
test("Numeric types") {
- val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties)
- val rows = df.collect()
- assert(rows.length == 1)
- val types = rows(0).toSeq.map(x => x.getClass.toString)
- assert(types.length == 10)
- assert(types(0).equals("class java.lang.Boolean"))
- assert(types(1).equals("class java.lang.Long"))
- assert(types(2).equals("class java.lang.Short"))
- assert(types(3).equals("class java.lang.Integer"))
- assert(types(4).equals("class java.lang.Integer"))
- assert(types(5).equals("class java.lang.Long"))
- assert(types(6).equals("class java.math.BigDecimal"))
- assert(types(7).equals("class java.lang.Float"))
- assert(types(8).equals("class java.lang.Double"))
- assert(types(9).equals("class java.lang.Byte"))
- assert(rows(0).getBoolean(0) == false)
- assert(rows(0).getLong(1) == 0x225)
- assert(rows(0).getShort(2) == 17)
- assert(rows(0).getInt(3) == 77777)
- assert(rows(0).getInt(4) == 123456789)
- assert(rows(0).getLong(5) == 123456789012345L)
+ val row = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties).head()
+ assert(row.length === 10)
+ assert(row(0).isInstanceOf[Boolean])
+ assert(row(1).isInstanceOf[Array[Byte]])
+ assert(row(2).isInstanceOf[Short])
+ assert(row(3).isInstanceOf[Int])
+ assert(row(4).isInstanceOf[Int])
+ assert(row(5).isInstanceOf[Long])
+ assert(row(6).isInstanceOf[BigDecimal])
+ assert(row(7).isInstanceOf[Float])
+ assert(row(8).isInstanceOf[Double])
+ assert(row(9).isInstanceOf[Byte])
+ assert(!row.getBoolean(0))
+ assert(java.util.Arrays.equals(row.getAs[Array[Byte]](1),
+ Array[Byte](49, 48, 49, 48, 48, 49, 48, 49)))
+ assert(row.getShort(2) == 17)
+ assert(row.getInt(3) == 77777)
+ assert(row.getInt(4) == 123456789)
+ assert(row.getLong(5) == 123456789012345L)
val bd = new BigDecimal("123456789012345.12345678901234500000")
- assert(rows(0).getAs[BigDecimal](6).equals(bd))
- assert(rows(0).getFloat(7) == 42.75)
- assert(rows(0).getDouble(8) == 1.0000000000000002)
- assert(rows(0).getByte(9) == 0x80.toByte)
+ assert(row.getAs[BigDecimal](6).equals(bd))
+ assert(row.getFloat(7) == 42.75)
+ assert(row.getDouble(8) == 1.0000000000000002)
+ assert(row.getByte(9) == 0x80.toByte)
+ withSQLConf(SQLConf.LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED.key -> "true") {
+ val row = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties).head()
+ assert(row(1).isInstanceOf[Long])
+ assert(row.getLong(1) == 0x225)
+ }
}
test("SPARK-47462: Unsigned numeric types") {
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 17a61078cccf..3563088c600f 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -451,10 +451,15 @@ are also available to connect MySQL, may have different
mapping rules.
<td>BooleanType</td>
<td></td>
</tr>
+ <tr>
+ <td>BIT( >1 )</td>
+ <td>BinaryType</td>
+ <td>(Default)</td>
+ </tr>
<tr>
<td>BIT( >1 )</td>
<td>LongType</td>
- <td></td>
+ <td>spark.sql.legacy.mysql.bitArrayMapping.enabled=true</td>
</tr>
<tr>
<td>TINYINT(1)</td>
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 9a898a8fdc62..d9187e951ae1 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -43,6 +43,7 @@ license: |
- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert
MySQL TIMESTAMP to TimestampNTZType, which is available in Spark 3.5. MySQL
DATETIME is not affected.
- Since Spark 4.0, MySQL JDBC datasource will read SMALLINT as ShortType,
while in Spark 3.5 and previous, it was read as IntegerType. MEDIUMINT UNSIGNED
is read as IntegerType, while in Spark 3.5 and previous, it was read as
LongType. To restore the previous behavior, you can cast the column to the old
type.
- Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while
in Spark 3.5 and previous, it was read as DoubleType. To restore the previous
behavior, you can cast the column to the old type.
+- Since Spark 4.0, MySQL JDBC datasource will read BIT(n > 1) as BinaryType,
while in Spark 3.5 and previous, read as LongType. To restore the previous
behavior, set `spark.sql.legacy.mysql.bitArrayMapping.enabled` to `true`.
## Upgrading from Spark SQL 3.5.1 to 3.5.2
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fd1cb6491edb..fc7425ce2bea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4067,11 +4067,19 @@ object SQLConf {
val LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED =
buildConf("spark.sql.legacy.mssqlserver.numericMapping.enabled")
.internal()
- .doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
+ .doc("When true, use legacy MsSqlServer SMALLINT and REAL type mapping.")
.version("2.4.5")
.booleanConf
.createWithDefault(false)
+ val LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED =
+ buildConf("spark.sql.legacy.mysql.bitArrayMapping.enabled")
+ .internal()
+ .doc("When true, use LongType to represent MySQL BIT(n>1); otherwise,
use BinaryType.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val CSV_FILTER_PUSHDOWN_ENABLED =
buildConf("spark.sql.csv.filterPushdown.enabled")
.doc("When true, enable filter pushdown to CSV datasource.")
.version("3.0.0")
@@ -5179,6 +5187,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def legacyMsSqlServerNumericMappingEnabled: Boolean =
getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
+ def legacyMySqlBitArrayMappingEnabled: Boolean =
+ getConf(LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED)
+
override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index b2612e2975a3..27ef8bd75772 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -509,6 +509,16 @@ object JdbcUtils extends Logging with SQLConfHelper {
row.update(pos, null)
}
+ case BinaryType if metadata.contains("binarylong") =>
+ (rs: ResultSet, row: InternalRow, pos: Int) =>
+ val bytes = rs.getBytes(pos + 1)
+ if (bytes != null) {
+ val binary =
bytes.flatMap(Integer.toBinaryString(_).getBytes(StandardCharsets.US_ASCII))
+ row.update(pos, binary)
+ } else {
+ row.update(pos, null)
+ }
+
case BinaryType =>
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 3c238dc8a837..7e8598a147e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -92,17 +92,21 @@ private case class MySQLDialect() extends JdbcDialect with
SQLConfHelper {
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
+ def getCatalystTypeForBitArray: Option[DataType] = {
+ md.putLong("binarylong", 1)
+ if (conf.legacyMySqlBitArrayMappingEnabled) {
+ Some(LongType)
+ } else {
+ Some(BinaryType)
+ }
+ }
sqlType match {
case Types.VARBINARY if "BIT".equalsIgnoreCase(typeName) && size != 1 =>
// MariaDB connector behaviour
- // This could instead be a BinaryType if we'd rather return
bit-vectors of up to 64 bits as
- // byte arrays instead of longs.
- md.putLong("binarylong", 1)
- Some(LongType)
+ getCatalystTypeForBitArray
case Types.BIT if size > 1 =>
// MySQL connector behaviour
- md.putLong("binarylong", 1)
- Some(LongType)
+ getCatalystTypeForBitArray
case Types.VARCHAR if "TINYTEXT".equalsIgnoreCase(typeName) =>
// TINYTEXT is Types.VARCHAR(63) from mysql jdbc, but keep it AS-IS
for historical reason
Some(StringType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 28e86e8ca1a5..9a8c0bb9df1b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -930,9 +930,15 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
test("MySQLDialect catalyst type mapping") {
val mySqlDialect = JdbcDialects.get("jdbc:mysql")
val metadata = new MetadataBuilder().putBoolean("isSigned", value = true)
- assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2,
metadata) ==
- Some(LongType))
+ assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2,
metadata) ===
+ Some(BinaryType))
assert(metadata.build().contains("binarylong"))
+ withSQLConf(SQLConf.LEGACY_MYSQL_BIT_ARRAY_MAPPING_ENABLED.key -> "true") {
+ metadata.remove("binarylong")
+ assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 2,
metadata) ===
+ Some(LongType))
+ assert(metadata.build().contains("binarylong"))
+ }
assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 1,
metadata) == None)
assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1,
metadata) ==
Some(ByteType))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]