bvaradar commented on a change in pull request #1752:
URL: https://github.com/apache/hudi/pull/1752#discussion_r465333514



##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingWriterActivityDetector.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import java.util.function.Supplier;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * This class is used to detect activity of spark streaming writer. THis is 
used to decide if HoodieWriteClient
+ * and async compactor needs to be closed. Spark Structured Streaming do not 
have explicit API on the Sink side to
+ * determine if the stream is done. In this absence, async compactor 
proactively checks with the sink if it is
+ * active. If there is no activity for sufficient period, async compactor 
shuts down. If the sink was indeed active,
+ * a subsequent batch will re-trigger async compaction.
+ */
+public class SparkStreamingWriterActivityDetector {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(SparkStreamingWriterActivityDetector.class);
+
+  private final Supplier<Long> lastStartBatchNanoTimeSupplier;
+  private final Supplier<Long> lastEndBatchNanoTimeSupplier;
+  private final long sinkInactivityTimeoutSecs;
+
+  private static final long SECS_TO_NANOS = 1000000000L;
+
+  public SparkStreamingWriterActivityDetector(
+      Supplier<Long> lastStartBatchNanoTimeSupplier, Supplier<Long> 
lastEndBatchNanoTimeSupplier,
+      long sinkInactivityTimeoutSecs) {
+    this.lastStartBatchNanoTimeSupplier = lastStartBatchNanoTimeSupplier;
+    this.lastEndBatchNanoTimeSupplier = lastEndBatchNanoTimeSupplier;
+    this.sinkInactivityTimeoutSecs = sinkInactivityTimeoutSecs;
+  }
+
+  /**
+   * Detects if spark streaming write is still active based on time.
+   * @return
+   */
+  public boolean hasRecentlyWritten() {
+    long lastStartBatchTime = lastStartBatchNanoTimeSupplier.get();
+    long lastEndBatchTime = lastEndBatchNanoTimeSupplier.get();
+
+    LOG.info("Checking if compactor needs to be stopped. "
+        + "lastStartBatchTime=" + lastStartBatchTime + ", lastEndBatchTime=" + 
lastEndBatchTime
+        + ", CurrTime=" + System.nanoTime());
+
+    if (lastEndBatchTime - lastStartBatchTime < 0) {
+      LOG.info("End Batch Time (" + lastEndBatchTime + ") is less than Start 
Batch Time (" + lastStartBatchTime + ")"
+          + "Sink is running. So, no need to stop");
+      return true;
+    }
+
+    long currTime = System.nanoTime();
+    long elapsedTimeSecs = Double.valueOf(Math.ceil(1.0 * (currTime - 
lastEndBatchTime) / SECS_TO_NANOS)).longValue();
+    if (elapsedTimeSecs > sinkInactivityTimeoutSecs) {
+      LOG.warn("Streaming Sink has been idle for " + elapsedTimeSecs + " 
seconds");

Review comment:
       This code is deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to