This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 319b7a5  [HUDI-1427] Fix FileAlreadyExistsException when set 
HOODIE_AUTO_COMMIT_PROP to true (#2295)
319b7a5 is described below

commit 319b7a58e4773ca51c4ef9b82faf55d03d208e46
Author: pengzhiwei <[email protected]>
AuthorDate: Sat Dec 5 08:07:25 2020 +0800

    [HUDI-1427] Fix FileAlreadyExistsException when set HOODIE_AUTO_COMMIT_PROP 
to true (#2295)
---
 .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala   |  5 +++--
 .../org/apache/hudi/functional/TestCOWDataSource.scala      | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e109501..b10a05b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -165,7 +165,7 @@ private[hudi] object HoodieSparkSqlWriter {
 
           // Create a HoodieWriteClient & issue the write.
           val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
+            tblName, mapAsJavaMap(parameters - 
HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)
           )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
           if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
@@ -205,7 +205,8 @@ private[hudi] object HoodieSparkSqlWriter {
           // Create a HoodieWriteClient & issue the delete.
           val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
             Schema.create(Schema.Type.NULL).toString, path.get, tblName,
-            
mapAsJavaMap(parameters))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+            mapAsJavaMap(parameters - 
HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)))
+            .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
           if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
             asyncCompactionTriggerFn.get.apply(client)
diff --git 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bf71e68..3db8948 100644
--- 
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -307,4 +307,17 @@ class TestCOWDataSource extends HoodieClientTestBase {
       }
     })
   }
+
+  @Test def testWithAutoCommitOn(): Unit = {
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "true")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+  }
 }

Reply via email to