This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf05f95 [HUDI-269] Limit sync frequency (#921)
bf05f95 is described below
commit bf05f954133da5b760a66906557df0305dbd0eaf
Author: Xing Pan <[email protected]>
AuthorDate: Tue Sep 24 20:30:35 2019 +0800
[HUDI-269] Limit sync frequency (#921)
* [HUDI-269] Throttle DeltaStreamer sync runs
---
.../hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | 11 +++++++++++
1 file changed, 11 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 4346388..ee826ac 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -223,6 +223,10 @@ public class HoodieDeltaStreamer implements Serializable {
+ " source-fetch -> Transform -> Hudi Write in loop")
public Boolean continuousMode = false;
+ @Parameter(names = {"--min-sync-interval-seconds"}, description = "the min
sync interval of each sync in "
+ + "continuous mode")
+ public Integer minSyncIntervalSeconds = 0;
+
@Parameter(names = {"--spark-master"}, description = "spark master to
use.")
public String sparkMaster = "local[2]";
@@ -384,6 +388,7 @@ public class HoodieDeltaStreamer implements Serializable {
try {
while (!isShutdownRequested()) {
try {
+ long start = System.currentTimeMillis();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
log.info("Enqueuing new pending compaction instant (" +
scheduledCompactionInstant + ")");
@@ -392,6 +397,12 @@ public class HoodieDeltaStreamer implements Serializable {
scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
+ long toSleepMs = cfg.minSyncIntervalSeconds * 1000 -
(System.currentTimeMillis() - start);
+ if (toSleepMs > 0) {
+ log.info("Last sync ran less than min sync interval: " +
cfg.minSyncIntervalSeconds + " s, sleep: "
+ + toSleepMs + " ms.");
+ Thread.sleep(toSleepMs);
+ }
} catch (Exception e) {
log.error("Shutting down delta-sync due to exception", e);
error = true;