Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 4e9c8d5ed -> 1b9ec19f9


[GOBBLIN-372] Workaround helix workflow deletion bug that removes workflows 
with a matching prefix

Closes #2248 from
htran1/helix_job_queue_prefix_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1b9ec19f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1b9ec19f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1b9ec19f

Branch: refs/heads/master
Commit: 1b9ec19f9d43ea0f0862ac9d6553c508646a3337
Parents: 4e9c8d5
Author: Hung Tran <[email protected]>
Authored: Tue Jan 16 15:37:22 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Jan 16 15:37:22 2018 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinHelixTaskDriver.java | 70 +++++++++++++++++++-
 .../cluster/GobblinHelixJobLauncherTest.java    | 16 +++++
 2 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b9ec19f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index a39c5ca..6c29775 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -16,8 +16,7 @@
  */
 package org.apache.gobblin.cluster;
 
-import com.google.common.base.Joiner;
-
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -28,6 +27,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
@@ -35,9 +35,11 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
@@ -46,6 +48,9 @@ import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 /**
  * #HELIX-0.6.7-WORKAROUND
  * Replacement TaskDriver methods to workaround bugs and changes in behavior 
for the 0.6.7 upgrade
@@ -271,13 +276,72 @@ public class GobblinHelixTaskDriver {
   }
 
   /**
+   * Trigger a controller pipeline execution for a given resource.
+   *
+   * @param accessor Helix data accessor
+   * @param resource the name of the resource changed to triggering the 
execution
+   */
+  private void invokeRebalance(HelixDataAccessor accessor, String resource) {
+    // The pipeline is idempotent, so touching an ideal state is enough to 
trigger a pipeline run
+    LOG.info("invoke rebalance for " + resource);
+    PropertyKey key = accessor.keyBuilder().idealStates(resource);
+    IdealState is = accessor.getProperty(key);
+    if (is != null && 
is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+      if (!accessor.updateProperty(key, is)) {
+        LOG.warn("Failed to invoke rebalance on resource " + resource);
+      }
+    } else {
+      LOG.warn("Can't find ideal state or ideal state is not for right type 
for " + resource);
+    }
+  }
+
+  /** Helper function to change target state for a given workflow */
+  private void setSingleWorkflowTargetState(String workflowName, final 
TargetState state) {
+    LOG.info("Set " + workflowName + " to target state " + state);
+    DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData != null) {
+          // Only update target state for non-completed workflows
+          String finishTime = 
currentData.getSimpleField(WorkflowContext.FINISH_TIME);
+          if (finishTime == null || 
finishTime.equals(WorkflowContext.UNFINISHED)) {
+            
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(),
+                state.name());
+          } else {
+            LOG.info("TargetState DataUpdater: ignore to update target state " 
+ finishTime);
+          }
+        } else {
+          LOG.error("TargetState DataUpdater: Fails to update target state " + 
currentData);
+        }
+        return currentData;
+      }
+    };
+    List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList();
+    List<String> paths = Lists.newArrayList();
+
+    PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, 
workflowName);
+    if (_accessor.getProperty(cfgKey) != null) {
+      paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath());
+      updaters.add(updater);
+      _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+      invokeRebalance(_accessor, workflowName);
+    } else {
+      LOG.error("Configuration path " + cfgKey + " not found!");
+    }
+  }
+
+  /**
    * Delete the workflow
    *
    * @param workflow  The workflow name
    * @param timeout   The timeout for deleting the workflow/queue in seconds
    */
   public void deleteWorkflow(String workflow, long timeout) throws 
InterruptedException {
-    _taskDriver.delete(workflow);
+    // #HELIX-0.6.7-WORKAROUND
+    // Helix 0.6.7 has a bug where TaskDriver.delete(workflow) will delete all 
resources with a
+    // workflow as the prefix. Work around the bug by pulling in the code from 
TaskDriver and calling
+    // setSingleWorkflowTargetState directly to bypass the prefix matching 
code.
+    setSingleWorkflowTargetState(workflow, TargetState.DELETE);
 
     long endTime = System.currentTimeMillis() + (timeout * 1000);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b9ec19f/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index cc327fc..77a33af 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -285,13 +285,19 @@ public class GobblinHelixJobLauncherTest {
     final GobblinHelixJobLauncher gobblinHelixJobLauncher =
         new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap);
 
+    final Properties properties2 = generateJobProperties(this.baseConfig, 
"33", "_1504201348474");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher2 =
+        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap);
+
     gobblinHelixJobLauncher.launchJob(null);
+    gobblinHelixJobLauncher2.launchJob(null);
 
     final TaskDriver taskDriver = new TaskDriver(this.helixManager);
 
     final String jobName = 
properties.getProperty(ConfigurationKeys.JOB_NAME_KEY);
     final String jobIdKey = 
properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
     final String jobContextName = jobName + "_" + jobIdKey;
+    final String jobName2 = 
properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY);
 
     org.apache.helix.task.JobContext jobContext = 
taskDriver.getJobContext(jobContextName);
 
@@ -312,6 +318,16 @@ public class GobblinHelixJobLauncherTest {
     WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName);
     Assert.assertNull(workflowContext);
 
+    // second job queue with shared prefix should not be deleted when the 
first job queue is cleaned up
+    workflowConfig  = taskDriver.getWorkflowConfig(jobName2);
+    Assert.assertNotNull(workflowConfig);
+
+    gobblinHelixJobLauncher2.close();
+
+    // job queue deleted after close
+    workflowConfig  = taskDriver.getWorkflowConfig(jobName2);
+    Assert.assertNull(workflowConfig);
+
     // check that workunit and taskstate directory for the job are cleaned up
     final File workunitsDir =
         new File(this.appWorkDir + File.separator + 
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME

Reply via email to