Repository: incubator-gobblin Updated Branches: refs/heads/master 4e9c8d5ed -> 1b9ec19f9
[GOBBLIN-372] Workaround helix workflow deletion bug that removes workflows with a matching prefix Closes #2248 from htran1/helix_job_queue_prefix_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1b9ec19f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1b9ec19f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1b9ec19f Branch: refs/heads/master Commit: 1b9ec19f9d43ea0f0862ac9d6553c508646a3337 Parents: 4e9c8d5 Author: Hung Tran <[email protected]> Authored: Tue Jan 16 15:37:22 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 16 15:37:22 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTaskDriver.java | 70 +++++++++++++++++++- .../cluster/GobblinHelixJobLauncherTest.java | 16 +++++ 2 files changed, 83 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b9ec19f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java index a39c5ca..6c29775 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java @@ -16,8 +16,7 @@ */ package org.apache.gobblin.cluster; -import com.google.common.base.Joiner; - +import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +27,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; @@ -35,9 +35,11 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.IdealState; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.task.JobDag; +import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskState; @@ -46,6 +48,9 @@ import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.apache.log4j.Logger; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + /** * #HELIX-0.6.7-WORKAROUND * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade @@ -271,13 +276,72 @@ public class GobblinHelixTaskDriver { } /** + * Trigger a controller pipeline execution for a given resource. + * + * @param accessor Helix data accessor + * @param resource the name of the resource changed to triggering the execution + */ + private void invokeRebalance(HelixDataAccessor accessor, String resource) { + // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run + LOG.info("invoke rebalance for " + resource); + PropertyKey key = accessor.keyBuilder().idealStates(resource); + IdealState is = accessor.getProperty(key); + if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { + if (!accessor.updateProperty(key, is)) { + LOG.warn("Failed to invoke rebalance on resource " + resource); + } + } else { + LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource); + } + } + + /** Helper function to change target state for a given workflow */ + private void setSingleWorkflowTargetState(String workflowName, final TargetState state) { + LOG.info("Set " + workflowName + " to target state " + state); + DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData != null) { + // Only update target state for non-completed workflows + String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); + if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { + currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), + state.name()); + } else { + LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime); + } + } else { + LOG.error("TargetState DataUpdater: Fails to update target state " + currentData); + } + return currentData; + } + }; + List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList(); + List<String> paths = Lists.newArrayList(); + + PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName); + if (_accessor.getProperty(cfgKey) != null) { + paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath()); + updaters.add(updater); + _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); + invokeRebalance(_accessor, workflowName); + } else { + LOG.error("Configuration path " + cfgKey + " not found!"); + } + } + + /** * Delete the workflow * * @param workflow The workflow name * @param timeout The timeout for deleting the workflow/queue in seconds */ public void deleteWorkflow(String workflow, long timeout) throws InterruptedException { - _taskDriver.delete(workflow); + // #HELIX-0.6.7-WORKAROUND + // Helix 0.6.7 has a bug where TaskDriver.delete(workflow) will delete all resources with a + // workflow as the prefix. Work around the bug by pulling in the code from TaskDriver and calling + // setSingleWorkflowTargetState directly to bypass the prefix matching code. + setSingleWorkflowTargetState(workflow, TargetState.DELETE); long endTime = System.currentTimeMillis() + (timeout * 1000); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b9ec19f/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index cc327fc..77a33af 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -285,13 +285,19 @@ public class GobblinHelixJobLauncherTest { final GobblinHelixJobLauncher gobblinHelixJobLauncher = new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap); + final Properties properties2 = generateJobProperties(this.baseConfig, "33", "_1504201348474"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = + new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap); + gobblinHelixJobLauncher.launchJob(null); + gobblinHelixJobLauncher2.launchJob(null); final TaskDriver taskDriver = new TaskDriver(this.helixManager); final String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY); final String jobIdKey = properties.getProperty(ConfigurationKeys.JOB_ID_KEY); final String jobContextName = jobName + "_" + jobIdKey; + final String jobName2 = properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY); org.apache.helix.task.JobContext jobContext = taskDriver.getJobContext(jobContextName); @@ -312,6 +318,16 @@ public class GobblinHelixJobLauncherTest { WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName); Assert.assertNull(workflowContext); + // second job queue with shared prefix should not be deleted when the first job queue is cleaned up + workflowConfig = taskDriver.getWorkflowConfig(jobName2); + Assert.assertNotNull(workflowConfig); + + gobblinHelixJobLauncher2.close(); + + // job queue deleted after close + workflowConfig = taskDriver.getWorkflowConfig(jobName2); + Assert.assertNull(workflowConfig); + // check that workunit and taskstate directory for the job are cleaned up final File workunitsDir = new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME
