bvaradar commented on a change in pull request #1752:
URL: https://github.com/apache/hudi/pull/1752#discussion_r465332053
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
##########
@@ -111,12 +143,64 @@ class HoodieStreamingSink(sqlContext: SQLContext,
@annotation.tailrec
private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
+ lastStartBatchTimeNanos = System.nanoTime()
fn match {
- case x: util.Success[T] => x
+ case x: Success[T] =>
+ lastEndBatchTimeNanos = System.nanoTime()
+ x
case _ if n > 1 =>
Thread.sleep(waitInMillis)
+ lastEndBatchTimeNanos = System.nanoTime()
retry(n - 1, waitInMillis * 2)(fn)
- case f => f
+ case f =>
+ lastEndBatchTimeNanos = System.nanoTime()
+ reset(false)
+ f
+ }
+ }
+
+ protected def triggerAsyncCompactor(client:
HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+ log.info("Triggering Async compaction !!")
+ if (null == asyncCompactorService) {
+ asyncCompactorService = new SparkStreamingAsyncCompactService(new
JavaSparkContext(sqlContext.sparkContext),
+ client, new SparkStreamingWriterActivityDetector(new
Supplier[lang.Long] {
+ override def get(): lang.Long = lastStartBatchTimeNanos
+ }, new Supplier[lang.Long] {
+ override def get(): lang.Long = lastEndBatchTimeNanos
+ }, 10))
+ asyncCompactorService.start(new Function[java.lang.Boolean,
java.lang.Boolean] {
+ override def apply(errored: lang.Boolean): lang.Boolean = {
+ log.info(s"Async Compactor shutdown. Errored ? $errored")
+ isAsyncCompactorServiceShutdownAbnormally = errored
+ reset(false)
+ log.info("Done resetting write client.")
+ true
+ }
+ })
+
+ // Add Shutdown Hook
+ Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
Review comment:
Yes, this and setting daemon mode was good enough
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]