Timothy Zhang created SPARK-29185:
-------------------------------------
Summary: Add new SaveMode types for Spark SQL jdbc datasource
Key: SPARK-29185
URL: https://issues.apache.org/jira/browse/SPARK-29185
Project: Spark
Issue Type: New Feature
Components: Input/Output
Affects Versions: 2.4.4
Reporter: Timothy Zhang
It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
* SaveMode.Delete
* SaveMode.Update
* SaveMode.Upsert
So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2,
MySQL etc. Actually code implementation of current SaveMode.Append types is
very flexible. All types could share the same savePartition function, add only
add new getStatement functions for Delete, Update, Upsert with SQL statements
DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial
implementations for them:
{code:java}
def getDeleteStatement(table: String, rddSchema: StructType, dialect:
JdbcDialect): String = {
val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) +
"=?").mkString(" AND ")
s"DELETE FROM ${table.toUpperCase} WHERE $columns"
}
def getUpdateStatement(table: String, rddSchema: StructType, priKeys:
Seq[String], dialect: JdbcDialect): String = {
val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
val priCols = priKeys.map(dialect.quoteIdentifier(_))
val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
val cnditns = priCols.map(_ + "=?").mkString(" AND ")
s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
}
def getMergeStatement(table: String, rddSchema: StructType, priKeys:
Seq[String], dialect: JdbcDialect): String = {
val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
val priCols = priKeys.map(dialect.quoteIdentifier(_))
val nrmCols = fullCols diff priCols
val fullPart = fullCols.map(c =>
s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
val priPart = priCols.map(c =>
s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString("
AND ")
val nrmPart = nrmCols.map(c =>
s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
val columns = fullCols.mkString(",")
val placeholders = fullCols.map(_ => "?").mkString(",")
s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
s"USING TABLE(VALUES($placeholders)) " +
s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
s"ON $priPart " +
s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
s"WHEN MATCHED THEN UPDATE SET $nrmPart"
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]