Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fa1932b63 -> e1716b527


[GOBBLIN-608] Allow user to configure the max waiting time for fork operation

Closes #2474 from yukuai518/maxWait


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e1716b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e1716b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e1716b52

Branch: refs/heads/master
Commit: e1716b5279fd690fc27b6bcb0044ceb68fdc2ba1
Parents: fa1932b
Author: Kuai Yu <[email protected]>
Authored: Tue Oct 9 13:04:20 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Oct 9 13:04:20 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 2 ++
 .../java/org/apache/gobblin/runtime/StreamModelTaskRunner.java   | 4 +++-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e1716b52/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 1e70209..f2f58f1 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -330,6 +330,8 @@ public class ConfigurationKeys {
   public static final long DEFAULT_FORK_RECORD_QUEUE_TIMEOUT = 1000;
   public static final String FORK_RECORD_QUEUE_TIMEOUT_UNIT_KEY = 
"fork.record.queue.timeout.unit";
   public static final String DEFAULT_FORK_RECORD_QUEUE_TIMEOUT_UNIT = 
TimeUnit.MILLISECONDS.name();
+  public static final String FORK_MAX_WAIT_MININUTES = "fork.max.wait.minutes";
+  public static final long DEFAULT_FORK_MAX_WAIT_MININUTES = 60;
 
   /**
    * Writer configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e1716b52/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index 7713b79..b20aeda 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -75,6 +75,8 @@ public class StreamModelTaskRunner {
   private final String watermarkingStrategy;
 
   protected void run() throws Exception {
+    long maxWaitInMinute = 
taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, 
ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);
+
     // Get the fork operator. By default IdentityForkOperator is used with a 
single branch.
     ForkOperator forkOperator = 
closer.register(this.taskContext.getForkOperator());
 
@@ -145,7 +147,7 @@ public class StreamModelTaskRunner {
     connectableStream.connect();
 
     if (!ExponentialBackoff.awaitCondition().callable(() -> 
this.forks.keySet().stream().map(Optional::get).allMatch(Fork::isDone)).
-        
initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(60)).await())
 {
+        
initialDelay(1000L).maxDelay(1000L).maxWait(TimeUnit.MINUTES.toMillis(maxWaitInMinute)).await())
 {
       throw new TimeoutException("Forks did not finish withing specified 
timeout.");
     }
   }

Reply via email to