Repository: crunch Updated Branches: refs/heads/master 6c0ae4131 -> 23a2da07d
CRUNCH-665: Add crunch.max.poll.interval property Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/23a2da07 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/23a2da07 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/23a2da07 Branch: refs/heads/master Commit: 23a2da07d3a80b3650568dde8973acc941f4f25d Parents: 6c0ae41 Author: Clément MATHIEU <[email protected]> Authored: Wed Mar 7 10:13:51 2018 +0100 Committer: Josh Wills <[email protected]> Committed: Thu Mar 8 13:26:03 2018 -0800 ---------------------------------------------------------------------- .../apache/crunch/impl/mr/exec/MRExecutor.java | 20 +++++++++++++++++--- .../crunch/impl/mr/run/RuntimeParameters.java | 2 ++ 2 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/23a2da07/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 87546e1..45084f2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -31,6 +31,7 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.MRPipelineExecution; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; @@ -90,9 +91,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe monitorLoop(); } }); - this.pollInterval = isLocalMode() - ? new CappedExponentialCounter(50, 1000) - : new CappedExponentialCounter(500, 10000); + this.pollInterval = getPollInterval(conf); this.namedDotFiles = new ConcurrentHashMap<String, String>(); } @@ -255,6 +254,21 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe } } + private CappedExponentialCounter getPollInterval(Configuration conf) { + long maxPollInterval = conf.getLong(RuntimeParameters.MAX_POLL_INTERVAL, -1); + + if (maxPollInterval <= 0) { + if (isLocalMode()) { + maxPollInterval = 1_000; + } else { + maxPollInterval = 10_000; + } + } + + long minPollInterval = Math.max(maxPollInterval / 20, 1); + return new CappedExponentialCounter(minPollInterval, maxPollInterval); + } + private static boolean isLocalMode() { Configuration conf = new Configuration(); String frameworkName = conf.get("mapreduce.framework.name", ""); http://git-wip-us.apache.org/repos/asf/crunch/blob/23a2da07/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index fe6f7ee..a36b910 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -45,6 +45,8 @@ public final class RuntimeParameters { public static final String FILE_TARGET_MAX_THREADS = "crunch.file.target.max.threads"; + public static final String MAX_POLL_INTERVAL = "crunch.max.poll.interval"; + // Not instantiated private RuntimeParameters() { }
