[ 
https://issues.apache.org/jira/browse/SPARK-29185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-29185:
----------------------------------
    Affects Version/s:     (was: 3.0.0)
                       3.1.0

> Add new SaveMode types for Spark SQL jdbc datasource
> ----------------------------------------------------
>
>                 Key: SPARK-29185
>                 URL: https://issues.apache.org/jira/browse/SPARK-29185
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Timothy Zhang
>            Priority: Major
>
>  It is necessary to add new SaveMode for Delete, Update, and Upsert, such as:
>  * SaveMode.Delete
>  * SaveMode.Update
>  * SaveMode.Upsert
> So that Spark SQL could support legacy RDBMS much betters, e.g. Oracle, DB2, 
> MySQL etc. Actually code implementation of current SaveMode.Append types is 
> very flexible. All types could share the same savePartition function, add 
> only add new getStatement functions for Delete, Update, Upsert with SQL 
> statements DELETE FROM, UPDATE, MERGE INTO respectively. We have an initial 
> implementations for them:
> {code:java}
> def getDeleteStatement(table: String, rddSchema: StructType, dialect: 
> JdbcDialect): String = {
>     val columns = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name) + 
> "=?").mkString(" AND ")
>     s"DELETE FROM ${table.toUpperCase} WHERE $columns"
>   }
>   def getUpdateStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
>     val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
>     val priCols = priKeys.map(dialect.quoteIdentifier(_))
>     val columns = (fullCols diff priCols).map(_ + "=?").mkString(",")
>     val cnditns = priCols.map(_ + "=?").mkString(" AND ")
>     s"UPDATE ${table.toUpperCase} SET $columns WHERE $cnditns"
>   }
>   def getMergeStatement(table: String, rddSchema: StructType, priKeys: 
> Seq[String], dialect: JdbcDialect): String = {
>     val fullCols = rddSchema.fields.map(x => dialect.quoteIdentifier(x.name))
>     val priCols = priKeys.map(dialect.quoteIdentifier(_))
>     val nrmCols = fullCols diff priCols
>     val fullPart = fullCols.map(c => 
> s"${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
>     val priPart = priCols.map(c => 
> s"${dialect.quoteIdentifier("TGT")}.$c=${dialect.quoteIdentifier("SRC")}.$c").mkString("
>  AND ")
>     val nrmPart = nrmCols.map(c => 
> s"$c=${dialect.quoteIdentifier("SRC")}.$c").mkString(",")
>     val columns = fullCols.mkString(",")
>     val placeholders = fullCols.map(_ => "?").mkString(",")
>     s"MERGE INTO ${table.toUpperCase} AS ${dialect.quoteIdentifier("TGT")} " +
>       s"USING TABLE(VALUES($placeholders)) " +
>       s"AS ${dialect.quoteIdentifier("SRC")}($columns) " +
>       s"ON $priPart " +
>       s"WHEN NOT MATCHED THEN INSERT ($columns) VALUES ($fullPart) " +
>       s"WHEN MATCHED THEN UPDATE SET $nrmPart"
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to