Mikhail Kumachev created SPARK-30959:
----------------------------------------

             Summary: How to write using JDBC driver to SQL Server / Azure DWH 
to column of BINARY type?
                 Key: SPARK-30959
                 URL: https://issues.apache.org/jira/browse/SPARK-30959
             Project: Spark
          Issue Type: Question
          Components: Spark Core
    Affects Versions: 2.4.4
            Reporter: Mikhail Kumachev


My initial goal is to save UUId values to SQL Server/Azure DWH to column of 
BINARY(16) type.

For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
 

I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
 val uuidBytes = Array.ofDim[Byte](16)
 ByteBuffer.wrap(uuidBytes)
    .order(ByteOrder.BIG_ENDIAN)
    .putLong(uuid.getMostSignificantBits())
    .putLong(uuid.getLeastSignificantBits()
val schema = StructType(
    List(
        StructField("EventId", BinaryType, false)
    )
 )
 val data = Seq((uuidBytes)).toDF("EventId").rdd;
 val df = spark.createDataFrame(data, schema);
df.write
    .format("jdbc")
    .option("url", "<DATABASE_CONNECTION_URL>")
    .option("dbTable", "Events")
    .mode(org.apache.spark.sql.SaveMode.Append)
    .save()
{code}
 

This code returns an error:

 
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type 
VARBINARY to target column type BINARY is not supported.{noformat}
 

 

My question is how to cope with this situation and insert UUId value to 
BINARY(16) column?

 

My investigation:

Spark uses conception of JdbcDialects and has a mapping for each Catalyst type 
to database type and vice versa. For example here is 
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
 which is used when we work against SQL Server or Azure DWH. In the method 
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)", 
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.

 

So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
    override def canHandle(url: String) : Boolean = 
url.startsWith("jdbc:sqlserver")
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
        case _ => None
    }
 }
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}

With this modification I still catch exactly the same error. It looks like that 
Spark do not use mapping from my custom dialect. But I checked that the dialect 
is registered. So it is strange situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to