This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new cd7a3c2 [SPARK-30062][SQL] Add the IMMEDIATE statement to the DB2
dialect truncate implementation
cd7a3c2 is described below
commit cd7a3c2e667600c722c86b3914d487394f711916
Author: Ivan Karol <[email protected]>
AuthorDate: Tue Jan 25 19:14:24 2022 -0800
[SPARK-30062][SQL] Add the IMMEDIATE statement to the DB2 dialect truncate
implementation
### What changes were proposed in this pull request?
I've added a DB2 specific truncate implementation that adds an IMMEDIATE
statement at the end of the query.
### Why are the changes needed?
I've encountered this issue myself while working with DB2 and trying to use
truncate functionality.
A quick google search shows that some people have also encountered this
issue before:
https://stackoverflow.com/questions/70027567/overwrite-mode-does-not-work-in-spark-sql-while-adding-data-in-db2
https://issues.apache.org/jira/browse/SPARK-30062
By looking into DB2 docs it becomes apparent that the IMMEDIATE statement
is only optional if the table is column organized(though I'm not sure if it
applies to all DB2 versions). So for the cases(such as mine) where the table is
not column organized adding an IMMEDIATE statement becomes essential for the
query to work.
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
Also, that might not be the best example, but I've found that DbVisualizer
does add an IMMEDIATE statement at the end of the truncate command. Though,
does it only for versions that are >=9.7
https://fossies.org/linux/dbvis/resources/profiles/db2.xml (please look at
line number 473)
### Does this PR introduce _any_ user-facing change?
It should not, as even though the docs mention that if the TRUNCATE
statement is executed in conjunction with IMMEDIATE, it has to be the first
statement in the transaction, the JDBC connection that is established to
execute the TRUNCATE statement has the auto-commit mode turned on. This means
that there won't be any other query/statement executed prior within the same
transaction.
https://www.ibm.com/docs/en/db2/11.5?topic=statements-truncate (see the
description for IMMEDIATE)
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala#L49
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala#L57
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L108
### How was this patch tested?
Existing test case with slightly adjusted logic.
Closes #35283 from ikarol/SPARK-30062.
Authored-by: Ivan Karol <[email protected]>
Signed-off-by: huaxingao <[email protected]>
(cherry picked from commit 7e5c3b216431b6a5e9a0786bf7cded694228cdee)
Signed-off-by: huaxingao <[email protected]>
---
.../apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 21 ++++++++++++++++++++-
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 9 +++++++++
.../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 ++++++--
3 files changed, 35 insertions(+), 3 deletions(-)
diff --git
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index 77d7254..fd4f2aa 100644
---
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -23,7 +23,7 @@ import java.util.Properties
import org.scalatest.time.SpanSugar._
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.types.{BooleanType, ByteType, ShortType,
StructType}
import org.apache.spark.tags.DockerTest
@@ -198,4 +198,23 @@ class DB2IntegrationSuite extends
DockerJDBCIntegrationSuite {
""".stripMargin.replaceAll("\n", " "))
assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
}
+
+ test("SPARK-30062") {
+ val expectedResult = Set(
+ (42, "fred"),
+ (17, "dave")
+ ).map { case (x, y) =>
+ Row(Integer.valueOf(x), String.valueOf(y))
+ }
+ val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
+ for (_ <- 0 to 2) {
+ df.write.mode(SaveMode.Append).jdbc(jdbcUrl, "tblcopy", new Properties)
+ }
+ assert(sqlContext.read.jdbc(jdbcUrl, "tblcopy", new Properties).count ===
6)
+ df.write.mode(SaveMode.Overwrite).option("truncate", true)
+ .jdbc(jdbcUrl, "tblcopy", new Properties)
+ val actual = sqlContext.read.jdbc(jdbcUrl, "tblcopy", new
Properties).collect
+ assert(actual.length === 2)
+ assert(actual.toSet === expectedResult)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 0b394db..7eb5385 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -53,6 +53,15 @@ private object DB2Dialect extends JdbcDialect {
override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
// scalastyle:off line.size.limit
+ // See
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
+ // scalastyle:on line.size.limit
+ override def getTruncateQuery(
+ table: String,
+ cascade: Option[Boolean] = isCascadingTruncateTable): String = {
+ s"TRUNCATE TABLE $table IMMEDIATE"
+ }
+
+ // scalastyle:off line.size.limit
// See
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000980.html
// scalastyle:on line.size.limit
override def renameTable(oldTable: String, newTable: String): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 8842db2..51837f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1023,14 +1023,16 @@ class JDBCSuite extends QueryTest
val defaultQuery = s"TRUNCATE TABLE $table"
val postgresQuery = s"TRUNCATE TABLE ONLY $table"
val teradataQuery = s"DELETE FROM $table ALL"
+ val db2Query = s"TRUNCATE TABLE $table IMMEDIATE"
- Seq(mysql, db2, h2, derby).foreach{ dialect =>
+ Seq(mysql, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}
assert(postgres.getTruncateQuery(table) == postgresQuery)
assert(oracle.getTruncateQuery(table) == defaultQuery)
assert(teradata.getTruncateQuery(table) == teradataQuery)
+ assert(db2.getTruncateQuery(table) == db2Query)
}
test("SPARK-22880: Truncate table with CASCADE by jdbc dialect") {
@@ -1049,13 +1051,15 @@ class JDBCSuite extends QueryTest
val postgresQuery = s"TRUNCATE TABLE ONLY $table CASCADE"
val oracleQuery = s"TRUNCATE TABLE $table CASCADE"
val teradataQuery = s"DELETE FROM $table ALL"
+ val db2Query = s"TRUNCATE TABLE $table IMMEDIATE"
- Seq(mysql, db2, h2, derby).foreach{ dialect =>
+ Seq(mysql, h2, derby).foreach{ dialect =>
assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
}
assert(postgres.getTruncateQuery(table, Some(true)) == postgresQuery)
assert(oracle.getTruncateQuery(table, Some(true)) == oracleQuery)
assert(teradata.getTruncateQuery(table, Some(true)) == teradataQuery)
+ assert(db2.getTruncateQuery(table, Some(true)) == db2Query)
}
test("Test DataFrame.where for Date and Timestamp") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]