bvaradar commented on a change in pull request #1752:
URL: https://github.com/apache/hudi/pull/1752#discussion_r465333372



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
##########
@@ -38,46 +50,65 @@ class HoodieStreamingSink(sqlContext: SQLContext,
   private val retryIntervalMs = 
options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
   private val ignoreFailedBatch = 
options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean
 
+  private var isAsyncCompactorServiceShutdownAbnormally = false
+
   private val mode =
     if (outputMode == OutputMode.Append()) {
       SaveMode.Append
     } else {
       SaveMode.Overwrite
     }
 
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+  private var asyncCompactorService : AsyncCompactService = _
+  private var writeClient : 
Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
+  private var lastStartBatchTimeNanos : lang.Long = System.nanoTime()
+  private var lastEndBatchTimeNanos : lang.Long = System.nanoTime()
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = 
this.synchronized {
+    if (isAsyncCompactorServiceShutdownAbnormally)  {
+      throw new IllegalStateException("Async Compaction shutdown unexpectedly")
+    }
+
+    lastStartBatchTimeNanos = System.nanoTime()
+
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext,
-          mode,
-          options,
-          data)
+          sqlContext, mode, options, data, writeClient, 
Some(triggerAsyncCompactor))

Review comment:
       Yes, this worked fine. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to