This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 51edd5d [GOBBLIN-816] Implement a workaround to abort Helix
TaskDriver#getWorkflows() after a timeout[]
51edd5d is described below
commit 51edd5d82f4246db24b05de0f9e45f86bc7a7af2
Author: sv2000 <[email protected]>
AuthorDate: Fri Jul 19 14:12:00 2019 -0700
[GOBBLIN-816] Implement a workaround to abort Helix
TaskDriver#getWorkflows() after a timeout[]
Closes #2680 from sv2000/getWorkflowsWorkaround
---
.../cluster/GobblinClusterConfigurationKeys.java | 3 +++
.../gobblin/cluster/GobblinHelixJobScheduler.java | 28 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index d1267c5..4881a99 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -151,6 +151,9 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS =
GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds";
public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300;
+ public static final String HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS =
GOBBLIN_CLUSTER_PREFIX + "workflowListingTimeoutSeconds";
+ public static final long DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS =
300;
+
public static final String CLEAN_ALL_DIST_JOBS = GOBBLIN_CLUSTER_PREFIX +
"bootup.clean.dist.jobs";
public static final boolean DEFAULT_CLEAN_ALL_DIST_JOBS = false;
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 3054d2e..6349471 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -36,6 +37,11 @@ import org.apache.helix.task.TaskDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
@@ -98,6 +104,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
final HelixJobsMapping jobsMapping;
final Striped<Lock> locks = Striped.lazyWeakLock(256);
+ private final long helixWorkflowListingTimeoutMillis;
private boolean startServicesCompleted;
private final long helixJobStopTimeoutMillis;
@@ -150,6 +157,10 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
this.helixJobStopTimeoutMillis = ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS) * 1000;
+
+ this.helixWorkflowListingTimeoutMillis = ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_LISTING_TIMEOUT_SECONDS)
* 1000;
+
}
@Override
@@ -360,8 +371,22 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
if (PropertiesUtils.getPropAsBoolean(jobConfig,
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE))
{
LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
- Map<String, String> jobNameToWorkflowIdMap =
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
+
+ //Workaround for preventing indefinite hangs observed in
TaskDriver.getWorkflows() call.
+ Callable<Map<String, String>> workflowsCallable = () ->
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
Collections.singletonList(deleteJobArrival.getJobName()));
+ Retryer<Map<String, String>> retryer = RetryerBuilder.<Map<String,
String>>newBuilder()
+ .retryIfException()
+ .withStopStrategy(StopStrategies.stopAfterAttempt(1))
+
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.helixWorkflowListingTimeoutMillis,
TimeUnit.MILLISECONDS)).build();
+ Map<String, String> jobNameToWorkflowIdMap;
+ try {
+ jobNameToWorkflowIdMap = retryer.call(workflowsCallable);
+ } catch (ExecutionException | RetryException e) {
+ LOGGER.error("Exception encountered when getting workflows from Helix;
likely a Helix/Zk issue.", e);
+ return;
+ }
+
if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
String workflowId =
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
@@ -374,6 +399,7 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
}
}
}
+
/**
* This class is responsible for running non-scheduled jobs.
*/