Repository: incubator-gobblin Updated Branches: refs/heads/master fa1932b63 -> e1716b527
[GOBBLIN-608] Allow user to configure the max waiting time for fork operation Closes #2474 from yukuai518/maxWait Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e1716b52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e1716b52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e1716b52 Branch: refs/heads/master Commit: e1716b5279fd690fc27b6bcb0044ceb68fdc2ba1 Parents: fa1932b Author: Kuai Yu <[email protected]> Authored: Tue Oct 9 13:04:20 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 9 13:04:20 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 ++ .../java/org/apache/gobblin/runtime/StreamModelTaskRunner.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e1716b52/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 1e70209..f2f58f1 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -330,6 +330,8 @@ public class ConfigurationKeys { public static final long DEFAULT_FORK_RECORD_QUEUE_TIMEOUT = 1000; public static final String FORK_RECORD_QUEUE_TIMEOUT_UNIT_KEY = "fork.record.queue.timeout.unit"; public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT = TimeUnit.MILLISECONDS.name(); + public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes"; + public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60; /** * Writer configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e1716b52/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java index 7713b79..b20aeda 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java @@ -75,6 +75,8 @@ public class StreamModelTaskRunner { private final String watermarkingStrategy; protected void run() throws Exception { + long maxWaitInMinute = taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES); + // Get the fork operator. By default IdentityForkOperator is used with a single branch. ForkOperator forkOperator = closer.register(this.taskContext.getForkOperator()); @@ -145,7 +147,7 @@ public class StreamModelTaskRunner { connectableStream.connect(); if (!ExponentialBackoff.awaitCondition().callable(() -> this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)). - initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(60)).await()) { + initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await()) { throw new TimeoutException("Forks did not finish withing specified timeout."); } }
