This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bf05f95  [HUDI-269] Limit sync frequency (#921)
bf05f95 is described below

commit bf05f954133da5b760a66906557df0305dbd0eaf
Author: Xing Pan <[email protected]>
AuthorDate: Tue Sep 24 20:30:35 2019 +0800

    [HUDI-269] Limit sync frequency (#921)
    
    * [HUDI-269] Throttle DeltaStreamer sync runs
---
 .../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java     | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 4346388..ee826ac 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -223,6 +223,10 @@ public class HoodieDeltaStreamer implements Serializable {
         + " source-fetch -> Transform -> Hudi Write in loop")
     public Boolean continuousMode = false;
 
+    @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min 
sync interval of each sync in "
+        + "continuous mode")
+    public Integer minSyncIntervalSeconds = 0;
+
     @Parameter(names = {"--spark-master"}, description = "spark master to 
use.")
     public String sparkMaster = "local[2]";
 
@@ -384,6 +388,7 @@ public class HoodieDeltaStreamer implements Serializable {
         try {
           while (!isShutdownRequested()) {
             try {
+              long start = System.currentTimeMillis();
               Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
               if (scheduledCompactionInstant.isPresent()) {
                 log.info("Enqueuing new pending compaction instant (" + 
scheduledCompactionInstant + ")");
@@ -392,6 +397,12 @@ public class HoodieDeltaStreamer implements Serializable {
                     scheduledCompactionInstant.get()));
                 
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
               }
+              long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - 
(System.currentTimeMillis() - start);
+              if (toSleepMs > 0) {
+                log.info("Last sync ran less than min sync interval: " + 
cfg.minSyncIntervalSeconds + " s, sleep: "
+                    + toSleepMs + " ms.");
+                Thread.sleep(toSleepMs);
+              }
             } catch (Exception e) {
               log.error("Shutting down delta-sync due to exception", e);
               error = true;

Reply via email to