This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 51edd5d  [GOBBLIN-816] Implement a workaround to abort Helix 
TaskDriver#getWorkflows() after a timeout[]
51edd5d is described below

commit 51edd5d82f4246db24b05de0f9e45f86bc7a7af2
Author: sv2000 <[email protected]>
AuthorDate: Fri Jul 19 14:12:00 2019 -0700

    [GOBBLIN-816] Implement a workaround to abort Helix 
TaskDriver#getWorkflows() after a timeout[]
    
    Closes #2680 from sv2000/getWorkflowsWorkaround
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  3 +++
 .../gobblin/cluster/GobblinHelixJobScheduler.java  | 28 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index d1267c5..4881a99 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -151,6 +151,9 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
   public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
 
+  public static final String HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 
GOBBLIN_CLUSTER_PREFIX + "workflowListingTimeoutSeconds";
+  public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS = 
300;
+
   public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX + 
"bootup.clean.dist.jobs";
   public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 3054d2e..6349471 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -36,6 +37,11 @@ import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
@@ -98,6 +104,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
   final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
   final HelixJobsMapping jobsMapping;
   final Striped<Lock> locks = Striped.lazyWeakLock(256);
+  private final long helixWorkflowListingTimeoutMillis;
 
   private boolean startServicesCompleted;
   private final long helixJobStopTimeoutMillis;
@@ -150,6 +157,10 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
 
     this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
         
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
+
+    this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(jobConfig, 
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
+        
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS) 
* 1000;
+
   }
 
   @Override
@@ -360,8 +371,22 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     if (PropertiesUtils.getPropAsBoolean(jobConfig, 
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
         GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE)) 
{
       LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
-      Map<String, String> jobNameToWorkflowIdMap = 
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
+
+      //Workaround for preventing indefinite hangs observed in 
TaskDriver.getWorkflows() call.
+      Callable<Map<String, String>> workflowsCallable = () -> 
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
           Collections.singletonList(deleteJobArrival.getJobName()));
+      Retryer<Map<String, String>> retryer = RetryerBuilder.<Map<String, 
String>>newBuilder()
+          .retryIfException()
+          .withStopStrategy(StopStrategies.stopAfterAttempt(1))
+          
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.helixWorkflowListingTimeoutMillis,
 TimeUnit.MILLISECONDS)).build();
+      Map<String, String> jobNameToWorkflowIdMap;
+      try {
+        jobNameToWorkflowIdMap = retryer.call(workflowsCallable);
+      } catch (ExecutionException | RetryException e) {
+        LOGGER.error("Exception encountered when getting workflows from Helix; 
likely a Helix/Zk issue.", e);
+        return;
+      }
+
       if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
         String workflowId = 
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
         TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
@@ -374,6 +399,7 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
       }
     }
   }
+
   /**
    * This class is responsible for running non-scheduled jobs.
    */

Reply via email to