This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 36b3b6f [HUDI-415] Get commit time when Spark start (#1113)
36b3b6f is described below
commit 36b3b6f5dd913d3f1c9aa116aff8daf6540fed65
Author: YanJia-Gary-Li <[email protected]>
AuthorDate: Thu Dec 19 22:19:06 2019 -0800
[HUDI-415] Get commit time when Spark start (#1113)
---
.../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index dcd96a6..122aeab 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
@@ -74,11 +75,11 @@ private[hudi] object HoodieSparkSqlWriter {
}
var writeSuccessful: Boolean = false
- var commitTime: String = null
var writeStatuses: JavaRDD[WriteStatus] = null
val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
+ val commitTime = HoodieActiveTimeline.createNewInstantTime();
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
@@ -145,7 +146,7 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("new batch has no new records, skipping...")
return (true, common.util.Option.empty())
}
- commitTime = client.startCommit()
+ client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords,
commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
@@ -223,7 +224,7 @@ private[hudi] object HoodieSparkSqlWriter {
)
// Issue deletes
- commitTime = client.startCommit()
+ client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doDeleteOperation(client,
hoodieKeysToDelete, commitTime)
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =