This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new cd7a3c2  [SPARK-30062][SQL] Add the IMMEDIATE statement to the DB2 
dialect truncate implementation
cd7a3c2 is described below

commit cd7a3c2e667600c722c86b3914d487394f711916
Author: Ivan Karol <[email protected]>
AuthorDate: Tue Jan 25 19:14:24 2022 -0800

    [SPARK-30062][SQL] Add the IMMEDIATE statement to the DB2 dialect truncate 
implementation
    
    ### What changes were proposed in this pull request?
    I've added a DB2 specific truncate implementation that adds an IMMEDIATE 
statement at the end of the query.
    
    ### Why are the changes needed?
    I've encountered this issue myself while working with DB2 and trying to use 
truncate functionality.
    A quick google search shows that some people have also encountered this 
issue before:
    
https://stackoverflow.com/questions/70027567/overwrite-mode-does-not-work-in-spark-sql-while-adding-data-in-db2
    https://issues.apache.org/jira/browse/SPARK-30062
    
    By looking into DB2 docs it becomes apparent that the IMMEDIATE statement 
is only optional if the table is column organized(though I'm not sure if it 
applies to all DB2 versions). So for the cases(such as mine) where the table is 
not column organized adding an IMMEDIATE statement becomes essential for the 
query to work.
    
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
    
    Also, that might not be the best example, but I've found that DbVisualizer 
does add an IMMEDIATE statement at the end of the truncate command. Though, 
does it only for versions that are >=9.7
    https://fossies.org/linux/dbvis/resources/profiles/db2.xml (please look at 
line number 473)
    
    ### Does this PR introduce _any_ user-facing change?
    It should not, as even though the docs mention that if the TRUNCATE 
statement is executed in conjunction with IMMEDIATE, it has to be the first 
statement in the transaction, the JDBC connection that is established to 
execute the TRUNCATE statement has the auto-commit mode turned on. This means 
that there won't be any other query/statement executed prior within the same 
transaction.
    https://www.ibm.com/docs/en/db2/11.5?topic=statements-truncate (see the 
description for IMMEDIATE)
    
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala#L49
    
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala#L57
    
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L108
    
    ### How was this patch tested?
    Existing test case with slightly adjusted logic.
    
    Closes #35283 from ikarol/SPARK-30062.
    
    Authored-by: Ivan Karol <[email protected]>
    Signed-off-by: huaxingao <[email protected]>
    (cherry picked from commit 7e5c3b216431b6a5e9a0786bf7cded694228cdee)
    Signed-off-by: huaxingao <[email protected]>
---
 .../apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 21 ++++++++++++++++++++-
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala      |  9 +++++++++
 .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala |  8 ++++++--
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index 77d7254..fd4f2aa 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -23,7 +23,7 @@ import java.util.Properties
 
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
 import org.apache.spark.sql.types.{BooleanType, ByteType, ShortType, 
StructType}
 import org.apache.spark.tags.DockerTest
@@ -198,4 +198,23 @@ class DB2IntegrationSuite extends 
DockerJDBCIntegrationSuite {
        """.stripMargin.replaceAll("\n", " "))
     assert(sql("select x, y from queryOption").collect.toSet == expectedResult)
   }
+
+  test("SPARK-30062") {
+    val expectedResult = Set(
+      (42, "fred"),
+      (17, "dave")
+    ).map { case (x, y) =>
+      Row(Integer.valueOf(x), String.valueOf(y))
+    }
+    val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties)
+    for (_ <- 0 to 2) {
+      df.write.mode(SaveMode.Append).jdbc(jdbcUrl, "tblcopy", new Properties)
+    }
+    assert(sqlContext.read.jdbc(jdbcUrl, "tblcopy", new Properties).count === 
6)
+    df.write.mode(SaveMode.Overwrite).option("truncate", true)
+      .jdbc(jdbcUrl, "tblcopy", new Properties)
+    val actual = sqlContext.read.jdbc(jdbcUrl, "tblcopy", new 
Properties).collect
+    assert(actual.length === 2)
+    assert(actual.toSet === expectedResult)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 0b394db..7eb5385 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -53,6 +53,15 @@ private object DB2Dialect extends JdbcDialect {
   override def isCascadingTruncateTable(): Option[Boolean] = Some(false)
 
   // scalastyle:off line.size.limit
+  // See 
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0053474.html
+  // scalastyle:on line.size.limit
+  override def getTruncateQuery(
+      table: String,
+      cascade: Option[Boolean] = isCascadingTruncateTable): String = {
+    s"TRUNCATE TABLE $table IMMEDIATE"
+  }
+
+  // scalastyle:off line.size.limit
   // See 
https://www.ibm.com/support/knowledgecenter/en/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0000980.html
   // scalastyle:on line.size.limit
   override def renameTable(oldTable: String, newTable: String): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 8842db2..51837f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1023,14 +1023,16 @@ class JDBCSuite extends QueryTest
     val defaultQuery = s"TRUNCATE TABLE $table"
     val postgresQuery = s"TRUNCATE TABLE ONLY $table"
     val teradataQuery = s"DELETE FROM $table ALL"
+    val db2Query = s"TRUNCATE TABLE $table IMMEDIATE"
 
-    Seq(mysql, db2, h2, derby).foreach{ dialect =>
+    Seq(mysql, h2, derby).foreach{ dialect =>
       assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
     }
 
     assert(postgres.getTruncateQuery(table) == postgresQuery)
     assert(oracle.getTruncateQuery(table) == defaultQuery)
     assert(teradata.getTruncateQuery(table) == teradataQuery)
+    assert(db2.getTruncateQuery(table) == db2Query)
   }
 
   test("SPARK-22880: Truncate table with CASCADE by jdbc dialect") {
@@ -1049,13 +1051,15 @@ class JDBCSuite extends QueryTest
     val postgresQuery = s"TRUNCATE TABLE ONLY $table CASCADE"
     val oracleQuery = s"TRUNCATE TABLE $table CASCADE"
     val teradataQuery = s"DELETE FROM $table ALL"
+    val db2Query = s"TRUNCATE TABLE $table IMMEDIATE"
 
-    Seq(mysql, db2, h2, derby).foreach{ dialect =>
+    Seq(mysql, h2, derby).foreach{ dialect =>
       assert(dialect.getTruncateQuery(table, Some(true)) == defaultQuery)
     }
     assert(postgres.getTruncateQuery(table, Some(true)) == postgresQuery)
     assert(oracle.getTruncateQuery(table, Some(true)) == oracleQuery)
     assert(teradata.getTruncateQuery(table, Some(true)) == teradataQuery)
+    assert(db2.getTruncateQuery(table, Some(true)) == db2Query)
   }
 
   test("Test DataFrame.where for Date and Timestamp") {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to