bvaradar commented on a change in pull request #1752:
URL: https://github.com/apache/hudi/pull/1752#discussion_r465332412



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
##########
@@ -111,12 +143,64 @@ class HoodieStreamingSink(sqlContext: SQLContext,
 
   @annotation.tailrec
   private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
+    lastStartBatchTimeNanos = System.nanoTime()
     fn match {
-      case x: util.Success[T] => x
+      case x: Success[T] =>
+        lastEndBatchTimeNanos = System.nanoTime()
+        x
       case _ if n > 1 =>
         Thread.sleep(waitInMillis)
+        lastEndBatchTimeNanos = System.nanoTime()
         retry(n - 1, waitInMillis * 2)(fn)
-      case f => f
+      case f =>
+        lastEndBatchTimeNanos = System.nanoTime()
+        reset(false)
+        f
+    }
+  }
+
+  protected def triggerAsyncCompactor(client: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+    log.info("Triggering Async compaction !!")
+    if (null == asyncCompactorService) {
+      asyncCompactorService = new SparkStreamingAsyncCompactService(new 
JavaSparkContext(sqlContext.sparkContext),
+        client, new SparkStreamingWriterActivityDetector(new 
Supplier[lang.Long] {
+          override def get(): lang.Long = lastStartBatchTimeNanos
+        }, new Supplier[lang.Long] {
+          override def get(): lang.Long = lastEndBatchTimeNanos
+        }, 10))
+      asyncCompactorService.start(new Function[java.lang.Boolean, 
java.lang.Boolean] {
+        override def apply(errored: lang.Boolean): lang.Boolean = {
+          log.info(s"Async Compactor shutdown. Errored ? $errored")
+          isAsyncCompactorServiceShutdownAbnormally = errored
+          reset(false)
+          log.info("Done resetting write client.")
+          true
+        }
+      })
+
+      // Add Shutdown Hook
+      Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+        override def run(): Unit = reset(true)
+      }))
+
+      // First time, scan .hoodie folder and get all pending compactions
+      val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,

Review comment:
       Now, only for the first time when async compactor is null. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to