This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 223d975  [GOBBLIN-776] Add a utility method to return Helix WorflowId 
given a Gobblin job name.
223d975 is described below

commit 223d975821798f2ffb4830f92240d808d34bcd24
Author: sv2000 <[email protected]>
AuthorDate: Sat Jun 1 22:00:16 2019 -0700

    [GOBBLIN-776] Add a utility method to return Helix WorflowId given a 
Gobblin job name.
    
    Closes #2654 from sv2000/jobNameToWorkflowId
---
 .../gobblin/cluster/GobblinHelixJobScheduler.java  | 15 ++++++--
 .../org/apache/gobblin/cluster/HelixUtils.java     | 45 +++++++++++++++++++++-
 .../gobblin/cluster/ClusterIntegrationTest.java    |  4 +-
 .../suite/IntegrationJobRestartViaSpecSuite.java   |  3 +-
 4 files changed, 60 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 659ff63..02022a8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.cluster;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -361,9 +363,16 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
     if (PropertiesUtils.getPropAsBoolean(jobConfig, 
GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
         GobblinClusterConfigurationKeys.DEFAULT_CANCEL_RUNNING_JOB_ON_DELETE)) 
{
       LOGGER.info("Cancelling workflow: {}", deleteJobArrival.getJobName());
-      TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
-      taskDriver.waitToStop(deleteJobArrival.getJobName(), 
this.helixJobStopTimeoutMillis);
-      LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+      Map<String, String> jobNameToWorkflowIdMap = 
HelixUtils.getWorkflowIdsFromJobNames(this.jobHelixManager,
+          Collections.singletonList(deleteJobArrival.getJobName()));
+      if (jobNameToWorkflowIdMap.containsKey(deleteJobArrival.getJobName())) {
+        String workflowId = 
jobNameToWorkflowIdMap.get(deleteJobArrival.getJobName());
+        TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
+        taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
+        LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+      } else {
+        LOGGER.warn("Could not find Helix Workflow Id for job: {}", 
deleteJobArrival.getJobName());
+      }
     }
   }
   /**
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 8b587ef..b874ff3 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -17,7 +17,12 @@
 
 package org.apache.gobblin.cluster;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -25,6 +30,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
@@ -35,7 +41,7 @@ import org.apache.helix.tools.ClusterSetup;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.listeners.JobListener;
 
@@ -251,4 +257,41 @@ public class HelixUtils {
     new TaskDriver(helixManager).deleteAndWaitForCompletion(workFlowName, 
10000L);
     log.info("Workflow deleted.");
   }
+  /**
+   * Returns the Helix Workflow Ids given {@link Iterable} of Gobblin job 
names. The method returns a
+   * {@link java.util.Map} from Gobblin job name to the corresponding Helix 
Workflow Id. This method iterates
+   * over all Helix workflows, and obtains the jobs of each workflow from its 
jobDag.
+   *
+   * NOTE: This call is expensive as it results in listing of znodes and 
subsequently, multiple ZK calls to get the job
+   * configuration for each HelixJob. Ideally, this method should be called 
infrequently e.g. when a job is deleted/cancelled.
+   *
+   * @param jobNames a list of Gobblin job names.
+   * @return a map from jobNames to their Helix Workflow Ids.
+   */
+  public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager 
helixManager, Collection<String> jobNames) {
+    Map<String, String> jobNameToWorkflowId = new HashMap<>();
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
+    for (String workflow : workflowConfigMap.keySet()) {
+      WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
+      Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+      for (String helixJob : helixJobs) {
+        Iterator<TaskConfig> taskConfigIterator = 
taskDriver.getJobConfig(helixJob).getTaskConfigMap().values().iterator();
+        if (taskConfigIterator.hasNext()) {
+          TaskConfig taskConfig = taskConfigIterator.next();
+          String jobName = 
taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
+          if (jobNames.contains(jobName)) {
+            if (!jobNameToWorkflowId.containsKey(jobName)) {
+              jobNameToWorkflowId.put(jobName, workflow);
+            } else {
+              log.warn("JobName {} previously found to have WorkflowId {}; 
found " + " a different WorkflowId {} for the job; "
+                  + "Skipping this entry", jobName, 
jobNameToWorkflowId.get(jobName), workflow);
+            }
+            break;
+          }
+        }
+      }
+    }
+    return jobNameToWorkflowId;
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 7e810c3..5d8d02b 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -124,7 +124,7 @@ public class ClusterIntegrationTest {
     IntegrationJobRestartViaSpecSuite restartViaSpecSuite = 
(IntegrationJobRestartViaSpecSuite) this.suite;
 
     //Add a new JobSpec to the path monitored by the SpecConsumer
-    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, 
SpecExecutor.Verb.ADD.name());
+    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME, 
SpecExecutor.Verb.ADD.name());
 
     //Start the cluster
     restartViaSpecSuite.startCluster();
@@ -150,7 +150,7 @@ public class ClusterIntegrationTest {
     Assert.assertEquals(targetState, TargetState.START.name());
 
     //Add a JobSpec with UPDATE verb signalling the Helix cluster to restart 
the workflow
-    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_ID, 
SpecExecutor.Verb.UPDATE.name());
+    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME, 
SpecExecutor.Verb.UPDATE.name());
 
     
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input
 -> {
       //Inspect the zNode at the path corresponding to the Workflow resource. 
Ensure the target state of the resource is in
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index eb47536..1486e58 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -54,6 +54,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 public class IntegrationJobRestartViaSpecSuite extends 
IntegrationJobCancelSuite {
   public static final String JOB_ID = "job_HelloWorldTestJob_1235";
+  public static final String JOB_NAME = "HelloWorldTestJob";
   public static final String JOB_CATALOG_DIR = 
"/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
   public static final String FS_SPEC_CONSUMER_DIR = 
"/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
   public static final String TASK_STATE_FILE = 
"/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
@@ -79,7 +80,7 @@ public class IntegrationJobRestartViaSpecSuite extends 
IntegrationJobCancelSuite
               ConfigurationKeys.JOB_ID_KEY, JOB_ID,
               GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, 
Boolean.TRUE,
               GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L,
-              ConfigurationKeys.JOB_NAME_KEY, JOB_ID));
+              ConfigurationKeys.JOB_NAME_KEY, JOB_NAME));
 
       newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, 
ConfigValueFactory.fromAnyRef(TASK_STATE_FILE));
       newConfig = newConfig.withFallback(rawJobConfig);

Reply via email to