[
https://issues.apache.org/jira/browse/SPARK-30959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mikhail Kumachev updated SPARK-30959:
-------------------------------------
Description:
My initial goal is to save UUId values to SQL Server/Azure DWH to column of
BINARY(16) type.
For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
val uuidBytes = Array.ofDim[Byte](16)
ByteBuffer.wrap(uuidBytes)
.order(ByteOrder.BIG_ENDIAN)
.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits()
val schema = StructType(
List(
StructField("EventId", BinaryType, false)
)
)
val data = Seq((uuidBytes)).toDF("EventId").rdd;
val df = spark.createDataFrame(data, schema);
df.write
.format("jdbc")
.option("url", "<DATABASE_CONNECTION_URL>")
.option("dbTable", "Events")
.mode(org.apache.spark.sql.SaveMode.Append)
.save()
{code}
This code returns an error:
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type
VARBINARY to target column type BINARY is not supported.{noformat}
My question is how to cope with this situation and insert UUId value to
BINARY(16) column?
My investigation:
Spark uses conception of JdbcDialects and has a mapping for each Catalyst type
to database type and vice versa. For example here is
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
which is used when we work against SQL Server or Azure DWH. In the method
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)",
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.
So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
override def canHandle(url: String) : Boolean =
url.startsWith("jdbc:sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
case _ => None
}
}
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}
With this modification I still catch exactly the same error. It looks like that
Spark do not use mapping from my custom dialect. But I checked that the dialect
is registered. So it is strange situation.
was:
My initial goal is to save UUId values to SQL Server/Azure DWH to column of
BINARY(16) type.
For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
val uuidBytes = Array.ofDim[Byte](16)
ByteBuffer.wrap(uuidBytes)
.order(ByteOrder.BIG_ENDIAN)
.putLong(uuid.getMostSignificantBits())
.putLong(uuid.getLeastSignificantBits()
val schema = StructType(
List(
StructField("EventId", BinaryType, false)
)
)
val data = Seq((uuidBytes)).toDF("EventId").rdd;
val df = spark.createDataFrame(data, schema);
df.write
.format("jdbc")
.option("url", "<DATABASE_CONNECTION_URL>")
.option("dbTable", "Events")
.mode(org.apache.spark.sql.SaveMode.Append)
.save()
{code}
This code returns an error:
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type
VARBINARY to target column type BINARY is not supported.{noformat}
My question is how to cope with this situation and insert UUId value to
BINARY(16) column?
My investigation:
Spark uses conception of JdbcDialects and has a mapping for each Catalyst type
to database type and vice versa. For example here is
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
which is used when we work against SQL Server or Azure DWH. In the method
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)",
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.
So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
override def canHandle(url: String) : Boolean =
url.startsWith("jdbc:sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
case _ => None
}
}
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}
With this modification I still catch exactly the same error. It looks like that
Spark do not use mapping from my custom dialect. But I checked that the dialect
is registered. So it is strange situation.
> How to write using JDBC driver to SQL Server / Azure DWH to column of BINARY
> type?
> ----------------------------------------------------------------------------------
>
> Key: SPARK-30959
> URL: https://issues.apache.org/jira/browse/SPARK-30959
> Project: Spark
> Issue Type: Question
> Components: Spark Core
> Affects Versions: 2.4.4
> Reporter: Mikhail Kumachev
> Priority: Minor
>
> My initial goal is to save UUId values to SQL Server/Azure DWH to column of
> BINARY(16) type.
> For example, I have demo table:
> {code:java}
> CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
>
> I want to write data to it using Spark like this:
> {code:java}
> import java.util.UUID
> val uuid = UUID.randomUUID()
> val uuidBytes = Array.ofDim[Byte](16)
> ByteBuffer.wrap(uuidBytes)
> .order(ByteOrder.BIG_ENDIAN)
> .putLong(uuid.getMostSignificantBits())
> .putLong(uuid.getLeastSignificantBits()
> val schema = StructType(
> List(
> StructField("EventId", BinaryType, false)
> )
> )
> val data = Seq((uuidBytes)).toDF("EventId").rdd;
> val df = spark.createDataFrame(data, schema);
> df.write
> .format("jdbc")
> .option("url", "<DATABASE_CONNECTION_URL>")
> .option("dbTable", "Events")
> .mode(org.apache.spark.sql.SaveMode.Append)
> .save()
> {code}
>
> This code returns an error:
>
> {noformat}
> java.sql.BatchUpdateException: Conversion from variable or parameter type
> VARBINARY to target column type BINARY is not supported.{noformat}
>
>
> My question is how to cope with this situation and insert UUId value to
> BINARY(16) column?
>
> My investigation:
> Spark uses conception of JdbcDialects and has a mapping for each Catalyst
> type to database type and vice versa. For example here is
> [MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
> which is used when we work against SQL Server or Azure DWH. In the method
> `getJDBCType` you can see the mapping:
> {code:java}
> case BinaryType => Some(JdbcType("VARBINARY(MAX)",
> java.sql.Types.VARBINARY)){code}
> and this is the root of my problem as I think.
>
> So, I decide to implement my own JdbcDialect to override this behavior:
> {code:java}
> class SqlServerDialect extends JdbcDialect {
> override def canHandle(url: String) : Boolean =
> url.startsWith("jdbc:sqlserver")
> override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
> case BinaryType => Option(JdbcType("BINARY(16)",
> java.sql.Types.BINARY))
> case _ => None
> }
> }
> val dialect = new SqlServerDialect
> JdbcDialects.registerDialect(dialect)
> {code}
> With this modification I still catch exactly the same error. It looks like
> that Spark do not use mapping from my custom dialect. But I checked that the
> dialect is registered. So it is strange situation.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]