This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 223d975 [GOBBLIN-776] Add a utility method to return Helix WorflowId
given a Gobblin job name.
223d975 is described below
commit 223d975821798f2ffb4830f92240d808d34bcd24
Author: sv2000 <[email protected]>
AuthorDate: Sat Jun 1 22:00:16 2019 -0700
[GOBBLIN-776] Add a utility method to return Helix WorflowId given a
Gobblin job name.
Closes #2654 from sv2000/jobNameToWorkflowId
---
.../gobblin/cluster/GobblinHelixJobScheduler.java | 15 ++++++--
.../org/apache/gobblin/cluster/HelixUtils.java | 45 +++++++++++++++++++++-
.../gobblin/cluster/ClusterIntegrationTest.java | 4 +-
.../suite/IntegrationJobRestartViaSpecSuite.java | 3 +-
4 files changed, 60 insertions(+), 7 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 659ff63..02022a8 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.cluster;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -361,9 +363,16 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
if (PropertiesUtils.getPropAsBoolean(jobConfig,
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE))
{
LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
- TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
- taskDriver.waitToStop(deleteJobArrival.getJobName(),
this.helixJobStopTimeoutMillis);
- LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+ Map<String, String> jobNameToWorkflowIdMap =
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
+ Collections.singletonList(deleteJobArrival.getJobName()));
+ if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
+ String workflowId =
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
+ TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
+ taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
+ LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+ } else {
+ LOGGER.warn("Could not find Helix Workflow Id for job: {}",
deleteJobArrival.getJobName());
+ }
}
}
/**
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 8b587ef..b874ff3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,7 +17,12 @@
package org.apache.gobblin.cluster;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -25,6 +30,7 @@ import org.apache.helix.HelixManager;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
@@ -35,7 +41,7 @@ import org.apache.helix.tools.ClusterSetup;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.listeners.JobListener;
@@ -251,4 +257,41 @@ public class HelixUtils {
new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName,
10000L);
log.info("Workflow deleted.");
}
+ /**
+ * Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job
names. The method returns a
+ * {@link java.util.Map} from Gobblin job name to the corresponding Helix
Workflow Id. This method iterates
+ * over all Helix workflows, and obtains the jobs of each workflow from its
jobDag.
+ *
+ * NOTE: This call is expensive as it results in listing of znodes and
subsequently, multiple ZK calls to get the job
+ * configuration for each HelixJob. Ideally, this method should be called
infrequently e.g. when a job is deleted/cancelled.
+ *
+ * @param jobNames a list of Gobblin job names.
+ * @return a map from jobNames to their Helix Workflow Ids.
+ */
+ public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager
helixManager, Collection<String> jobNames) {
+ Map<String, String> jobNameToWorkflowId = new HashMap<>();
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+ Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
+ for (String workflow : workflowConfigMap.keySet()) {
+ WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
+ Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+ for (String helixJob : helixJobs) {
+ Iterator<TaskConfig> taskConfigIterator =
taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
+ if (taskConfigIterator.hasNext()) {
+ TaskConfig taskConfig = taskConfigIterator.next();
+ String jobName =
taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
+ if (jobNames.contains(jobName)) {
+ if (!jobNameToWorkflowId.containsKey(jobName)) {
+ jobNameToWorkflowId.put(jobName, workflow);
+ } else {
+ log.warn("JobName {} previously found to have WorkflowId {};
found " + " a different WorkflowId {} for the job; "
+ + "Skipping this entry", jobName,
jobNameToWorkflowId.get(jobName), workflow);
+ }
+ break;
+ }
+ }
+ }
+ }
+ return jobNameToWorkflowId;
+ }
}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 7e810c3..5d8d02b 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -124,7 +124,7 @@ public class ClusterIntegrationTest {
IntegrationJobRestartViaSpecSuite restartViaSpecSuite =
(IntegrationJobRestartViaSpecSuite) this.suite;
//Add a new JobSpec to the path monitored by the SpecConsumer
- restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID,
SpecExecutor.Verb.ADD.name());
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME,
SpecExecutor.Verb.ADD.name());
//Start the cluster
restartViaSpecSuite.startCluster();
@@ -150,7 +150,7 @@ public class ClusterIntegrationTest {
Assert.assertEquals(targetState, TargetState.START.name());
//Add a JobSpec with UPDATE verb signalling the Helix cluster to restart
the workflow
- restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID,
SpecExecutor.Verb.UPDATE.name());
+ restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME,
SpecExecutor.Verb.UPDATE.name());
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input
-> {
//Inspect the zNode at the path corresponding to the Workflow resource.
Ensure the target state of the resource is in
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index eb47536..1486e58 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class IntegrationJobRestartViaSpecSuite extends
IntegrationJobCancelSuite {
public static final String JOB_ID = "job_HelloWorldTestJob_1235";
+ public static final String JOB_NAME = "HelloWorldTestJob";
public static final String JOB_CATALOG_DIR =
"/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
public static final String FS_SPEC_CONSUMER_DIR =
"/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
public static final String TASK_STATE_FILE =
"/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
@@ -79,7 +80,7 @@ public class IntegrationJobRestartViaSpecSuite extends
IntegrationJobCancelSuite
ConfigurationKeys.JOB_ID_KEY, JOB_ID,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
Boolean.TRUE,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
- ConfigurationKeys.JOB_NAME_KEY, JOB_ID));
+ ConfigurationKeys.JOB_NAME_KEY, JOB_NAME));
newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY,
ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
newConfig = newConfig.withFallback(rawJobConfig);