This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 36b3b6f  [HUDI-415] Get commit time when Spark start (#1113)
36b3b6f is described below

commit 36b3b6f5dd913d3f1c9aa116aff8daf6540fed65
Author: YanJia-Gary-Li <[email protected]>
AuthorDate: Thu Dec 19 22:19:06 2019 -0800

    [HUDI-415] Get commit time when Spark start (#1113)
---
 .../src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala      | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index dcd96a6..122aeab 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.util.{FSUtils, TypedProperties}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
@@ -74,11 +75,11 @@ private[hudi] object HoodieSparkSqlWriter {
       }
 
     var writeSuccessful: Boolean = false
-    var commitTime: String = null
     var writeStatuses: JavaRDD[WriteStatus] = null
 
     val jsc = new JavaSparkContext(sparkContext)
     val basePath = new Path(parameters("path"))
+    val commitTime = HoodieActiveTimeline.createNewInstantTime();
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
 
@@ -145,7 +146,7 @@ private[hudi] object HoodieSparkSqlWriter {
         log.info("new batch has no new records, skipping...")
         return (true, common.util.Option.empty())
       }
-      commitTime = client.startCommit()
+      client.startCommitWithTime(commitTime)
       writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, 
commitTime, operation)
       // Check for errors and commit the write.
       val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
@@ -223,7 +224,7 @@ private[hudi] object HoodieSparkSqlWriter {
       )
 
       // Issue deletes
-      commitTime = client.startCommit()
+      client.startCommitWithTime(commitTime)
       writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, commitTime)
       val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
       writeSuccessful =

Reply via email to