Repository: spark
Updated Branches:
refs/heads/master d6795c7a2 -> cc1d2dcb6
[SPARK-16463][SQL] Support `truncate` option in Overwrite mode for JDBC
DataFrameWriter
## What changes were proposed in this pull request?
This PR adds a boolean option, `truncate`, for `SaveMode.Overwrite` of JDBC
DataFrameWriter. If this option is `true`, it try to take advantage of
`TRUNCATE TABLE` instead of `DROP TABLE`. This is a trivial option, but will
provide great **convenience** for BI tool users based on RDBMS tables generated
by Spark.
**Goal**
- Without `CREATE/DROP` privilege, we can save dataframe to database. Sometime
these are not allowed for security.
- It will preserve the existing table information, so users can add and keep
some additional `INDEX` and `CONSTRAINT`s for the table.
- Sometime, `TRUNCATE` is faster than the combination of `DROP/CREATE`.
**Supported DBMS**
The following is `truncate`-option support table. Due to the different behavior
of `TRUNCATE TABLE` among DBMSs, it's not always safe to use `TRUNCATE TABLE`.
Spark will ignore the `truncate` option for **unknown** and **some** DBMS with
**default CASCADING** behavior. Newly added JDBCDialect should implement
corresponding function to support `truncate` option additionally.
Spark Dialects | `truncate` OPTION SUPPORT
---------------|-------------------------------
MySQLDialect | O
PostgresDialect | X
DB2Dialect | O
MsSqlServerDialect | O
DerbyDialect | O
OracleDialect | O
**Before (TABLE with INDEX case)**: SparkShell & MySQL CLI are interleaved
intentionally.
```scala
scala> val (url, prop)=("jdbc:mysql://localhost:3306/temp?useSSL=false", new
java.util.Properties)
scala> prop.setProperty("user","root")
scala> df.write.mode("overwrite").jdbc(url, "table_with_index", prop)
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index",
prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | | NULL | |
+-------+------------+------+-----+---------+-------+
mysql> CREATE UNIQUE INDEX idx_id ON table_with_index(id);
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | PRI | NULL | |
+-------+------------+------+-----+---------+-------+
scala> spark.range(10).write.mode("overwrite").jdbc(url, "table_with_index",
prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | | NULL | |
+-------+------------+------+-----+---------+-------+
```
**After (TABLE with INDEX case)**
```scala
scala> spark.range(10).write.mode("overwrite").option("truncate",
true).jdbc(url, "table_with_index", prop)
mysql> DESC table_with_index;
+-------+------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+------------+------+-----+---------+-------+
| id | bigint(20) | NO | PRI | NULL | |
+-------+------------+------+-----+---------+-------+
```
**Error Handling**
- In case of exceptions, Spark will not retry. Users should turn off the
`truncate` option.
- In case of schema change:
- If one of the column names changes, this will raise exceptions intuitively.
- If there exists only type difference, this will work like Append mode.
## How was this patch tested?
Pass the Jenkins tests with a updated testcase.
Author: Dongjoon Hyun <[email protected]>
Closes #14086 from dongjoon-hyun/SPARK-16410.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc1d2dcb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc1d2dcb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc1d2dcb
Branch: refs/heads/master
Commit: cc1d2dcb612fb5df39c9a9e57a3484ecad90c745
Parents: d6795c7
Author: Dongjoon Hyun <[email protected]>
Authored: Sun Jul 24 09:25:02 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Sun Jul 24 09:25:02 2016 +0100
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameWriter.scala | 18 +++++++++++++--
.../execution/datasources/jdbc/JdbcUtils.scala | 16 ++++++++++++++
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 ++
.../apache/spark/sql/jdbc/JdbcDialects.scala | 7 ++++++
.../spark/sql/jdbc/MsSqlServerDialect.scala | 2 ++
.../apache/spark/sql/jdbc/MySQLDialect.scala | 2 ++
.../apache/spark/sql/jdbc/OracleDialect.scala | 2 ++
.../apache/spark/sql/jdbc/PostgresDialect.scala | 2 ++
.../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 23 ++++++++++++++++++--
9 files changed, 70 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e6a8dfa..753b64b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -387,6 +387,15 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
* Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
* your external database systems.
*
+ * You can set the following JDBC-specific option(s) for storing JDBC:
+ * <li>`truncate` (default `false`): use `TRUNCATE TABLE` instead of `DROP
TABLE`.</li>
+ *
+ * In case of failures, users should turn off `truncate` option to use `DROP
TABLE` again. Also,
+ * due to the different behavior of `TRUNCATE TABLE` among DBMS, it's not
always safe to use this.
+ * MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and
OracleDialect supports this
+ * while PostgresDialect and default JDBCDirect doesn't. For unknown and
unsupported JDBCDirect,
+ * the user option `truncate` is ignored.
+ *
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of
arbitrary string
@@ -423,8 +432,13 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
}
if (mode == SaveMode.Overwrite && tableExists) {
- JdbcUtils.dropTable(conn, table)
- tableExists = false
+ if (extraOptions.getOrElse("truncate", "false").toBoolean &&
+ JdbcUtils.isCascadingTruncateTable(url) == Some(false)) {
+ JdbcUtils.truncateTable(conn, table)
+ } else {
+ JdbcUtils.dropTable(conn, table)
+ tableExists = false
+ }
}
// Create the table if the table didn't exist.
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index ce71a7d..cb474cb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -99,6 +99,22 @@ object JdbcUtils extends Logging {
}
/**
+ * Truncates a table from the JDBC database.
+ */
+ def truncateTable(conn: Connection, table: String): Unit = {
+ val statement = conn.createStatement
+ try {
+ statement.executeUpdate(s"TRUNCATE TABLE $table")
+ } finally {
+ statement.close()
+ }
+ }
+
+ def isCascadingTruncateTable(url: String): Option[Boolean] = {
+ JdbcDialects.get(url).isCascadingTruncateTable()
+ }
+
+ /**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection, table: String, rddSchema: StructType,
dialect: JdbcDialect)
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index f12b6ca..190463d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -28,4 +28,6 @@ private object DB2Dialect extends JdbcDialect {
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}
+
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 948106f..7810780 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -108,6 +108,13 @@ abstract class JdbcDialect extends Serializable {
def beforeFetch(connection: Connection, properties: Map[String, String]):
Unit = {
}
+ /**
+ * Return Some[true] iff `TRUNCATE TABLE` causes cascading default.
+ * Some[true] : TRUNCATE TABLE causes cascading.
+ * Some[false] : TRUNCATE TABLE does not cause cascading.
+ * None: The behavior of TRUNCATE TABLE is unknown (default).
+ */
+ def isCascadingTruncateTable(): Option[Boolean] = None
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 3eb722b..70122f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -38,4 +38,6 @@ private object MsSqlServerDialect extends JdbcDialect {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None
}
+
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index e171704..b2cff78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -44,4 +44,6 @@ private case object MySQLDialect extends JdbcDialect {
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
+
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index b795e8b..ce8731e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -53,4 +53,6 @@ private case object OracleDialect extends JdbcDialect {
case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
case _ => None
}
+
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 6baf1b6..fb959d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -94,4 +94,6 @@ private object PostgresDialect extends JdbcDialect {
}
}
+
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(true)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc1d2dcb/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 2c6449f..d99b3cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -40,6 +40,14 @@ class JDBCWriteSuite extends SharedSQLContext with
BeforeAndAfter {
properties.setProperty("password", "testPass")
properties.setProperty("rowId", "false")
+ val testH2Dialect = new JdbcDialect {
+ override def canHandle(url: String) : Boolean = url.startsWith("jdbc:h2")
+ override def getCatalystType(
+ sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] =
+ Some(StringType)
+ override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
+ }
+
before {
Utils.classForName("org.h2.Driver")
conn = DriverManager.getConnection(url)
@@ -145,14 +153,25 @@ class JDBCWriteSuite extends SharedSQLContext with
BeforeAndAfter {
assert(2 === spark.read.jdbc(url, "TEST.APPENDTEST", new
Properties()).collect()(0).length)
}
- test("CREATE then INSERT to truncate") {
+ test("Truncate") {
+ JdbcDialects.registerDialect(testH2Dialect)
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
val df2 = spark.createDataFrame(sparkContext.parallelize(arr1x2), schema2)
+ val df3 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
- df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST",
properties)
+ df2.write.mode(SaveMode.Overwrite).option("truncate", true)
+ .jdbc(url1, "TEST.TRUNCATETEST", properties)
assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST",
properties).count())
assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST",
properties).collect()(0).length)
+
+ val m = intercept[SparkException] {
+ df3.write.mode(SaveMode.Overwrite).option("truncate", true)
+ .jdbc(url1, "TEST.TRUNCATETEST", properties)
+ }.getMessage
+ assert(m.contains("Column \"seq\" not found"))
+ assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST",
properties).count())
+ JdbcDialects.unregisterDialect(testH2Dialect)
}
test("Incompatible INSERT to append") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]