This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 319b7a5 [HUDI-1427] Fix FileAlreadyExistsException when set
HOODIE_AUTO_COMMIT_PROP to true (#2295)
319b7a5 is described below
commit 319b7a58e4773ca51c4ef9b82faf55d03d208e46
Author: pengzhiwei <[email protected]>
AuthorDate: Sat Dec 5 08:07:25 2020 +0800
[HUDI-1427] Fix FileAlreadyExistsException when set HOODIE_AUTO_COMMIT_PROP
to true (#2295)
---
.../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +++--
.../org/apache/hudi/functional/TestCOWDataSource.scala | 13 +++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e109501..b10a05b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -165,7 +165,7 @@ private[hudi] object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the write.
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema.toString, path.get,
- tblName, mapAsJavaMap(parameters)
+ tblName, mapAsJavaMap(parameters -
HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
@@ -205,7 +205,8 @@ private[hudi] object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete.
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
Schema.create(Schema.Type.NULL).toString, path.get, tblName,
-
mapAsJavaMap(parameters))).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
+ mapAsJavaMap(parameters -
HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP)))
+ .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters,
jsc.hadoopConfiguration())) {
asyncCompactionTriggerFn.get.apply(client)
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index bf71e68..3db8948 100644
---
a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -307,4 +307,17 @@ class TestCOWDataSource extends HoodieClientTestBase {
}
})
}
+
+ @Test def testWithAutoCommitOn(): Unit = {
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "true")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ }
}