Mikhail Kumachev created SPARK-30959:
----------------------------------------
Summary: How to write using JDBC driver to SQL Server / Azure DWH
to column of BINARY type?
Key: SPARK-30959
URL: https://issues.apache.org/jira/browse/SPARK-30959
Project: Spark
Issue Type: Question
Components: Spark Core
Affects Versions: 2.4.4
Reporter: Mikhail Kumachev
My initial goal is to save UUId values to SQL Server/Azure DWH to column of
BINARY(16) type.
For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
val uuidBytes = Array.ofDim[Byte](16)
ByteBuffer.wrap(uuidBytes)
.order(ByteOrder.BIG_ENDIAN)
.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits()
val schema = StructType(
List(
StructField("EventId", BinaryType, false)
)
)
val data = Seq((uuidBytes)).toDF("EventId").rdd;
val df = spark.createDataFrame(data, schema);
df.write
.format("jdbc")
.option("url", "<DATABASE_CONNECTION_URL>")
.option("dbTable", "Events")
.mode(org.apache.spark.sql.SaveMode.Append)
.save()
{code}
This code returns an error:
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type
VARBINARY to target column type BINARY is not supported.{noformat}
My question is how to cope with this situation and insert UUId value to
BINARY(16) column?
My investigation:
Spark uses conception of JdbcDialects and has a mapping for each Catalyst type
to database type and vice versa. For example here is
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
which is used when we work against SQL Server or Azure DWH. In the method
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)",
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.
So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
override def canHandle(url: String) : Boolean =
url.startsWith("jdbc:sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
case _ => None
}
}
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}
With this modification I still catch exactly the same error. It looks like that
Spark do not use mapping from my custom dialect. But I checked that the dialect
is registered. So it is strange situation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]