[GOBBLIN-398] Upgrade helix to 0.6.9 Closes #2272 from htran1/helix_069
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d29b72f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d29b72f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d29b72f4 Branch: refs/heads/0.12.0 Commit: d29b72f4997b4a435397353bb93a66ac4213d55e Parents: ff13dde Author: Hung Tran <[email protected]> Authored: Wed Jan 31 17:29:21 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Jan 31 17:29:21 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinClusterManager.java | 15 +- .../cluster/GobblinHelixJobLauncher.java | 72 +- .../gobblin/cluster/GobblinHelixTaskDriver.java | 296 +------- .../apache/helix/task/GobblinJobRebalancer.java | 713 ------------------- .../cluster/GobblinHelixJobLauncherTest.java | 22 +- gradle/scripts/computeVersions.gradle | 2 +- gradle/scripts/dependencyDefinitions.gradle | 2 +- 7 files changed, 57 insertions(+), 1065 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 6b53c6c..77e511e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -20,7 +20,6 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -60,8 +59,8 @@ import org.apache.helix.messaging.handling.MessageHandler; import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; +import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -261,21 +260,17 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri // Clean up existing jobs TaskDriver taskDriver = new TaskDriver(this.helixManager); - GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager); Map<String, WorkflowConfig> workflows = taskDriver.getWorkflows(); for (Map.Entry<String, WorkflowConfig> entry : workflows.entrySet()) { String queueName = entry.getKey(); WorkflowConfig workflowConfig = entry.getValue(); - for (String namespacedJobName : workflowConfig.getJobDag().getAllNodes()) { - String jobName = TaskUtil.getDenamespacedJobName(queueName, namespacedJobName); - LOGGER.info("job {} found for queue {} ", jobName, queueName); + // request delete if not already requested + if (workflowConfig.getTargetState() != TargetState.DELETE) { + taskDriver.delete(queueName); - // #HELIX-0.6.7-WORKAROUND - // working around 0.6.7 delete job issue for queues with IN_PROGRESS state - gobblinHelixTaskDriver.deleteJob(queueName, jobName); - LOGGER.info("deleted job {} from queue {}", jobName, queueName); + LOGGER.info("Requested delete of queue {}", queueName); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 1a39dfb..af15469 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -30,17 +30,14 @@ import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; -import org.apache.helix.IdealStateChangeListener; -import org.apache.helix.NotificationContext; -import org.apache.helix.model.IdealState; -import org.apache.helix.task.GobblinJobRebalancer; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,36 +167,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir); - - if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { - // Fix-up Ideal State with a custom rebalancer that will re-balance long-running jobs - final String clusterName = - ConfigUtils.getString(jobConfig, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, ""); - final String rebalancerToReplace = "org.apache.helix.task.JobRebalancer"; - final String rebalancerClassDesired = GobblinJobRebalancer.class.getName(); - final String jobResourceName = this.jobResourceName; - - if (!clusterName.isEmpty()) { - this.helixManager.addIdealStateChangeListener(new IdealStateChangeListener() { - @Override - public void onIdealStateChange(List<IdealState> list, NotificationContext notificationContext) { - HelixAdmin helixAdmin = helixManager.getClusterManagmentTool(); - for (String resource : helixAdmin.getResourcesInCluster(clusterName)) { - if (resource.equals(jobResourceName)) { - IdealState idealState = helixAdmin.getResourceIdealState(clusterName, resource); - if (idealState != null) { - String rebalancerClassFound = idealState.getRebalancerClassName(); - if (rebalancerToReplace.equals(rebalancerClassFound)) { - idealState.setRebalancerClassName(rebalancerClassDesired); - helixAdmin.setResourceIdealState(clusterName, resource, idealState); - } - } - } - } - } - }); - } - } } @Override @@ -240,15 +207,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { protected void executeCancellation() { if (this.jobSubmitted) { try { - // #HELIX-0.6.7-WORKAROUND - // working around helix 0.6.7 job delete issue with custom taskDriver - LOGGER.info("Cancelling job {} in Helix", this.jobContext.getJobId()); - GobblinHelixTaskDriver taskDriver = new GobblinHelixTaskDriver(this.helixManager); - taskDriver.deleteJob(this.helixQueueName, this.jobContext.getJobId()); - LOGGER.info("Job {} in cancelled Helix", this.jobContext.getJobId()); - - taskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds); - } catch (InterruptedException | IllegalArgumentException e) { + this.helixTaskDriver.delete(this.helixQueueName); + } catch (IllegalArgumentException e) { LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); } } @@ -293,6 +253,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT)); + + if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { + jobConfigBuilder.setRebalanceRunningTask(true); + } + return jobConfigBuilder; } @@ -300,18 +265,27 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { * Submit a job to run. */ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception { + WorkflowConfig workflowConfig = this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName); + + // If the queue is present, but in delete state then wait for cleanup before recreating the queue + if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { + GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager); + gobblinHelixTaskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds); + // if we get here then the workflow was successfully deleted + workflowConfig = null; + } + // Create one queue for each job with the job name being the queue name - if (null == this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName)) { - JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build(); - this.helixTaskDriver.createQueue(jobQueue); - LOGGER.info("Created job queue {}", this.helixQueueName); + if (workflowConfig == null) { + JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build(); + this.helixTaskDriver.createQueue(jobQueue); + LOGGER.info("Created job queue {}", this.helixQueueName); } else { LOGGER.info("Job queue {} already exists", this.helixQueueName); } // Put the job into the queue this.helixTaskDriver.enqueueJob(this.jobContext.getJobName(), this.jobContext.getJobId(), jobConfigBuilder); - } public void launchJob(@Nullable JobListener jobListener) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java index cedb111..ebe2b52 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java @@ -16,54 +16,23 @@ */ package org.apache.gobblin.cluster; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.I0Itec.zkclient.DataUpdater; -import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathConfig; -import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; -import org.apache.helix.model.IdealState; import org.apache.helix.store.HelixPropertyStore; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.helix.task.JobDag; import org.apache.helix.task.TargetState; -import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskState; -import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; -import org.apache.log4j.Logger; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; /** * #HELIX-0.6.7-WORKAROUND * Replacement TaskDriver methods to workaround bugs and changes in behavior for the 0.6.7 upgrade */ public class GobblinHelixTaskDriver { - /** For logging */ - private static final Logger LOG = Logger.getLogger(GobblinHelixTaskDriver.class); - - private final HelixDataAccessor _accessor; - private final ConfigAccessor _cfgAccessor; - private final HelixPropertyStore<ZNRecord> _propertyStore; - private final HelixAdmin _admin; - private final String _clusterName; private final TaskDriver _taskDriver; public GobblinHelixTaskDriver(HelixManager manager) { @@ -71,277 +40,24 @@ public class GobblinHelixTaskDriver { .getConfigAccessor(), manager.getHelixPropertyStore(), manager.getClusterName()); } - public GobblinHelixTaskDriver(ZkClient client, String clusterName) { - this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName); - } - - public GobblinHelixTaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) { - this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor), - new ConfigAccessor(client), new ZkHelixPropertyStore<ZNRecord>(baseAccessor, - PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName), null), clusterName); - } - public GobblinHelixTaskDriver(HelixAdmin admin, HelixDataAccessor accessor, ConfigAccessor cfgAccessor, HelixPropertyStore<ZNRecord> propertyStore, String clusterName) { - _admin = admin; - _accessor = accessor; - _cfgAccessor = cfgAccessor; - _propertyStore = propertyStore; - _clusterName = clusterName; _taskDriver = new TaskDriver(admin, accessor, cfgAccessor, propertyStore, clusterName); } /** - * Delete a job from an existing named queue, - * the queue has to be stopped prior to this call - * - * @param queueName - * @param jobName - */ - public void deleteJob(final String queueName, final String jobName) { - WorkflowConfig workflowCfg = - _taskDriver.getWorkflowConfig(queueName); - - if (workflowCfg == null) { - throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!"); - } - if (workflowCfg.isTerminable()) { - throw new IllegalArgumentException(queueName + " is not a queue!"); - } - - boolean isRecurringWorkflow = - (workflowCfg.getScheduleConfig() != null && workflowCfg.getScheduleConfig().isRecurring()); - - if (isRecurringWorkflow) { - WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName); - - String lastScheduledQueue = wCtx.getLastScheduledSingleWorkflow(); - - // delete the current scheduled one - deleteJobFromScheduledQueue(lastScheduledQueue, jobName, true); - - // Remove the job from the original queue template's DAG - removeJobFromDag(queueName, jobName); - - // delete the ideal state and resource config for the template job - final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); - _admin.dropResource(_clusterName, namespacedJobName); - - // Delete the job template from property store - String jobPropertyPath = - Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, namespacedJobName); - _propertyStore.remove(jobPropertyPath, AccessOption.PERSISTENT); - } else { - deleteJobFromScheduledQueue(queueName, jobName, false); - } - } - - /** - * delete a job from a scheduled (non-recurrent) queue. - * - * @param queueName - * @param jobName - */ - private void deleteJobFromScheduledQueue(final String queueName, final String jobName, - boolean isRecurrent) { - WorkflowConfig workflowCfg = _taskDriver.getWorkflowConfig(queueName); - - if (workflowCfg == null) { - // When try to delete recurrent job, it could be either not started or finished. So - // there may not be a workflow config. - if (isRecurrent) { - return; - } else { - throw new IllegalArgumentException("Queue " + queueName + " does not yet exist!"); - } - } - - WorkflowContext wCtx = _taskDriver.getWorkflowContext(queueName); - if (wCtx != null && wCtx.getWorkflowState() == null) { - throw new IllegalStateException("Queue " + queueName + " does not have a valid work state!"); - } - - // #HELIX-0.6.7-WORKAROUND - // This check is removed to get the same behavior as 0.6.6-SNAPSHOT until new APIs to support delete are provided - //String workflowState = - // (wCtx != null) ? wCtx.getWorkflowState().name() : TaskState.NOT_STARTED.name(); - //if (workflowState.equals(TaskState.IN_PROGRESS.name())) { - // throw new IllegalStateException("Queue " + queueName + " is still in progress!"); - //} - - removeJob(queueName, jobName); - } - - private boolean removeJobContext(HelixPropertyStore<ZNRecord> propertyStore, - String jobResource) { - return propertyStore.remove( - Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource), - AccessOption.PERSISTENT); - } - - private void removeJob(String queueName, String jobName) { - // Remove the job from the queue in the DAG - removeJobFromDag(queueName, jobName); - - // delete the ideal state and resource config for the job - final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); - _admin.dropResource(_clusterName, namespacedJobName); - - // update queue's property to remove job from JOB_STATES if it is already started. - removeJobStateFromQueue(queueName, jobName); - - // Delete the job from property store - removeJobContext(_propertyStore, namespacedJobName); - } - - /** Remove the job name from the DAG from the queue configuration */ - private void removeJobFromDag(final String queueName, final String jobName) { - final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); - - DataUpdater<ZNRecord> dagRemover = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - LOG.error("Could not update DAG for queue: " + queueName + " ZNRecord is null."); - return null; - } - // Add the node to the existing DAG - JobDag jobDag = JobDag.fromJson( - currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name())); - Set<String> allNodes = jobDag.getAllNodes(); - if (!allNodes.contains(namespacedJobName)) { - LOG.warn( - "Could not delete job from queue " + queueName + ", job " + jobName + " not exists"); - return currentData; - } - String parent = null; - String child = null; - // remove the node from the queue - for (String node : allNodes) { - if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { - parent = node; - jobDag.removeParentToChild(parent, namespacedJobName); - } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { - child = node; - jobDag.removeParentToChild(namespacedJobName, child); - } - } - if (parent != null && child != null) { - jobDag.addParentToChild(parent, child); - } - jobDag.removeNode(namespacedJobName); - - // Save the updated DAG - try { - currentData - .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson()); - } catch (Exception e) { - throw new IllegalStateException( - "Could not remove job " + jobName + " from DAG of queue " + queueName, e); - } - return currentData; - } - }; - - String path = _accessor.keyBuilder().resourceConfig(queueName).getPath(); - if (!_accessor.getBaseDataAccessor().update(path, dagRemover, AccessOption.PERSISTENT)) { - throw new IllegalArgumentException( - "Could not remove job " + jobName + " from DAG of queue " + queueName); - } - } - - /** update queue's property to remove job from JOB_STATES if it is already started. */ - private void removeJobStateFromQueue(final String queueName, final String jobName) { - final String namespacedJobName = TaskUtil.getNamespacedJobName(queueName, jobName); - String queuePropertyPath = - Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); - - DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData != null) { - Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES); - if (states != null && states.containsKey(namespacedJobName)) { - states.keySet().remove(namespacedJobName); - } - } - return currentData; - } - }; - if (!_propertyStore.update(queuePropertyPath, updater, AccessOption.PERSISTENT)) { - LOG.warn("Fail to remove job state for job " + namespacedJobName + " from queue " + queueName); - } - } - - /** - * Trigger a controller pipeline execution for a given resource. - * - * @param accessor Helix data accessor - * @param resource the name of the resource changed to triggering the execution - */ - private void invokeRebalance(HelixDataAccessor accessor, String resource) { - // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run - LOG.info("invoke rebalance for " + resource); - PropertyKey key = accessor.keyBuilder().idealStates(resource); - IdealState is = accessor.getProperty(key); - if (is != null && is.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) { - if (!accessor.updateProperty(key, is)) { - LOG.warn("Failed to invoke rebalance on resource " + resource); - } - } else { - LOG.warn("Can't find ideal state or ideal state is not for right type for " + resource); - } - } - - /** Helper function to change target state for a given workflow */ - private void setSingleWorkflowTargetState(String workflowName, final TargetState state) { - LOG.info("Set " + workflowName + " to target state " + state); - DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData != null) { - // Only update target state for non-completed workflows - String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); - if (finishTime == null || finishTime.equals(String.valueOf(WorkflowContext.UNFINISHED))) { - currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), - state.name()); - } else { - LOG.info("TargetState DataUpdater: ignore to update target state " + finishTime); - } - } else { - LOG.error("TargetState DataUpdater: Fails to update target state "); - } - return currentData; - } - }; - List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList(); - List<String> paths = Lists.newArrayList(); - - PropertyKey cfgKey = TaskUtil.getWorkflowConfigKey(_accessor, workflowName); - if (_accessor.getProperty(cfgKey) != null) { - paths.add(_accessor.keyBuilder().resourceConfig(workflowName).getPath()); - updaters.add(updater); - _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); - invokeRebalance(_accessor, workflowName); - } else { - LOG.error("Configuration path " + cfgKey + " not found!"); - } - } - - /** * Delete the workflow * * @param workflow The workflow name * @param timeout The timeout for deleting the workflow/queue in seconds */ public void deleteWorkflow(String workflow, long timeout) throws InterruptedException { - // #HELIX-0.6.7-WORKAROUND - // Helix 0.6.7 has a bug where TaskDriver.delete(workflow) will delete all resources with a - // workflow as the prefix. Work around the bug by pulling in the code from TaskDriver and calling - // setSingleWorkflowTargetState directly to bypass the prefix matching code. - setSingleWorkflowTargetState(workflow, TargetState.DELETE); + WorkflowConfig workflowConfig = _taskDriver.getWorkflowConfig(workflow); + + // set the target state if not already set + if (workflowConfig != null && workflowConfig.getTargetState() != TargetState.DELETE) { + _taskDriver.delete(workflow); + } long endTime = System.currentTimeMillis() + (timeout * 1000); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java b/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java deleted file mode 100644 index 25241d5..0000000 --- a/gobblin-cluster/src/main/java/org/apache/helix/task/GobblinJobRebalancer.java +++ /dev/null @@ -1,713 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.helix.task; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.helix.AccessOption; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.PropertyKey; -import org.apache.helix.ZNRecord; -import org.apache.helix.controller.stages.ClusterDataCache; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; -import org.apache.helix.model.Resource; -import org.apache.helix.model.ResourceAssignment; -import org.apache.log4j.Logger; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; - -/** - * Custom rebalancer implementation for the {@code Job} in task model. - */ -public class GobblinJobRebalancer extends TaskRebalancer { - private static final Logger LOG = Logger.getLogger(GobblinJobRebalancer.class); - private static TaskAssignmentCalculator fixTaskAssignmentCal = - new FixedTargetTaskAssignmentCalculator(); - private static TaskAssignmentCalculator genericTaskAssignmentCal = - new GenericTaskAssignmentCalculator(); - - private static final String PREV_RA_NODE = "PreviousResourceAssignment"; - - @Override - public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterData, - IdealState taskIs, Resource resource, CurrentStateOutput currStateOutput) { - final String jobName = resource.getResourceName(); - LOG.debug("Computer Best Partition for job: " + jobName); - - // Fetch job configuration - JobConfig jobCfg = TaskUtil.getJobCfg(_manager, jobName); - if (jobCfg == null) { - LOG.error("Job configuration is NULL for " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - String workflowResource = jobCfg.getWorkflow(); - - // Fetch workflow configuration and context - WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); - if (workflowCfg == null) { - LOG.error("Workflow configuration is NULL for " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); - if (workflowCtx == null) { - LOG.error("Workflow context is NULL for " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - TargetState targetState = workflowCfg.getTargetState(); - if (targetState != TargetState.START && targetState != TargetState.STOP) { - LOG.info("Target state is " + targetState.name() + " for workflow " + workflowResource - + ".Stop scheduling job " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - // Stop current run of the job if workflow or job is already in final state (failed or completed) - TaskState workflowState = workflowCtx.getWorkflowState(); - TaskState jobState = workflowCtx.getJobState(jobName); - // The job is already in a final state (completed/failed). - if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED || - jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) { - LOG.info(String.format( - "Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.", - workflowResource, jobName, workflowState, jobState)); - cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobName); - _scheduledRebalancer.removeScheduledRebalance(jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - if (!isWorkflowReadyForSchedule(workflowCfg)) { - LOG.info("Job is not ready to be run since workflow is not ready " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - if (!isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg, - workflowCtx)) { - LOG.info("Job is not ready to run " + jobName); - return buildEmptyAssignment(jobName, currStateOutput); - } - - // Fetch any existing context information from the property store. - JobContext jobCtx = TaskUtil.getJobContext(_manager, jobName); - if (jobCtx == null) { - jobCtx = new JobContext(new ZNRecord("TaskContext")); - jobCtx.setStartTime(System.currentTimeMillis()); - } - - // Grab the old assignment, or an empty one if it doesn't exist - ResourceAssignment prevAssignment = getPrevResourceAssignment(jobName); - if (prevAssignment == null) { - prevAssignment = new ResourceAssignment(jobName); - } - - // Will contain the list of partitions that must be explicitly dropped from the ideal state that - // is stored in zk. - // Fetch the previous resource assignment from the property store. This is required because of - // HELIX-230. - Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null - ? clusterData.getAllEnabledLiveInstances() - : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag()); - - if (liveInstances.isEmpty()) { - LOG.error("No available instance found for job!"); - } - - Set<Integer> partitionsToDrop = new TreeSet<Integer>(); - ResourceAssignment newAssignment = - computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances, - currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData); - - if (!partitionsToDrop.isEmpty()) { - for (Integer pId : partitionsToDrop) { - taskIs.getRecord().getMapFields().remove(pName(jobName, pId)); - } - HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - PropertyKey propertyKey = accessor.keyBuilder().idealStates(jobName); - accessor.setProperty(propertyKey, taskIs); - } - - // Update rebalancer context, previous ideal state. - TaskUtil.setJobContext(_manager, jobName, jobCtx); - TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); - setPrevResourceAssignment(jobName, newAssignment); - - LOG.debug("Job " + jobName + " new assignment " + Arrays - .toString(newAssignment.getMappedPartitions().toArray())); - return newAssignment; - } - - private Set<String> getInstancesAssignedToOtherJobs(String currentJobName, - WorkflowConfig workflowCfg) { - Set<String> ret = new HashSet<String>(); - for (String jobName : workflowCfg.getJobDag().getAllNodes()) { - if (jobName.equals(currentJobName)) { - continue; - } - JobContext jobContext = TaskUtil.getJobContext(_manager, jobName); - if (jobContext == null) { - continue; - } - for (int partition : jobContext.getPartitionSet()) { - TaskPartitionState partitionState = jobContext.getPartitionState(partition); - if (partitionState == TaskPartitionState.INIT || - partitionState == TaskPartitionState.RUNNING) { - ret.add(jobContext.getAssignedParticipant(partition)); - } - } - } - - return ret; - } - - private ResourceAssignment computeResourceMapping(String jobResource, - WorkflowConfig workflowConfig, JobConfig jobCfg, ResourceAssignment prevAssignment, - Collection<String> liveInstances, CurrentStateOutput currStateOutput, - WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs, - ClusterDataCache cache) { - TargetState jobTgtState = workflowConfig.getTargetState(); - // Update running status in workflow context - if (jobTgtState == TargetState.STOP) { - workflowCtx.setJobState(jobResource, TaskState.STOPPED); - // Workflow has been stopped if all in progress jobs are stopped - if (isWorkflowStopped(workflowCtx, workflowConfig)) { - workflowCtx.setWorkflowState(TaskState.STOPPED); - } - } else { - workflowCtx.setJobState(jobResource, TaskState.IN_PROGRESS); - // Workflow is in progress if any task is in progress - workflowCtx.setWorkflowState(TaskState.IN_PROGRESS); - } - - // Used to keep track of tasks that have already been assigned to instances. - Set<Integer> assignedPartitions = new HashSet<Integer>(); - - // Used to keep track of tasks that have failed, but whose failure is acceptable - Set<Integer> skippedPartitions = new HashSet<Integer>(); - - // Keeps a mapping of (partition) -> (instance, state) - Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>(); - - // Keeps a mapping of (partition) -> (instance, state) of partitions have have been relocated - Map<Integer, PartitionAssignment> relocatedPaMap = new TreeMap<Integer, PartitionAssignment>(); - - Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig); - - // Process all the current assignments of tasks. - TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg); - Set<Integer> allPartitions = taskAssignmentCal - .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates()); - - if (allPartitions == null || allPartitions.isEmpty()) { - // Empty target partitions, mark the job as FAILED. - String failureMsg = "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!"; - LOG.info(failureMsg); - jobCtx.setInfo(failureMsg); - markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); - markAllPartitionsError(jobCtx, TaskPartitionState.ERROR, false); - _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED); - return new ResourceAssignment(jobResource); - } - - Map<String, SortedSet<Integer>> taskAssignments = - getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions); - long currentTime = System.currentTimeMillis(); - - LOG.debug("All partitions: " + allPartitions + " taskAssignment: " + taskAssignments - + " excludedInstances: " + excludedInstances); - - for (Map.Entry<String, SortedSet<Integer>> entryInstance : taskAssignments.entrySet()) { - String instance = entryInstance.getKey(); - if (excludedInstances.contains(instance)) { - continue; - } - - Set<Integer> pSet = entryInstance.getValue(); - // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, - // TASK_ERROR, ERROR. - Set<Integer> donePartitions = new TreeSet<Integer>(); - for (int pId : pSet) { - final String pName = pName(jobResource, pId); - - // Check for pending state transitions on this (partition, instance). - Message pendingMessage = - currStateOutput.getPendingState(jobResource, new Partition(pName), instance); - if (pendingMessage != null) { - // There is a pending state transition for this (partition, instance). Just copy forward - // the state assignment from the previous ideal state. - Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName)); - if (stateMap != null) { - String prevState = stateMap.get(instance); - paMap.put(pId, new PartitionAssignment(instance, prevState)); - assignedPartitions.add(pId); - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.", - pName, instance, prevState)); - } - } - - continue; - } - - TaskPartitionState currState = - TaskPartitionState.valueOf(currStateOutput.getCurrentState(jobResource, new Partition( - pName), instance)); - jobCtx.setPartitionState(pId, currState); - - String taskMsg = currStateOutput.getInfo(jobResource, new Partition( - pName), instance); - if (taskMsg != null) { - jobCtx.setPartitionInfo(pId, taskMsg); - } - - // Process any requested state transitions. - String requestedStateStr = - currStateOutput.getRequestedState(jobResource, new Partition(pName), instance); - if (requestedStateStr != null && !requestedStateStr.isEmpty()) { - TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr); - if (requestedState.equals(currState)) { - LOG.warn(String.format( - "Requested state %s is the same as the current state for instance %s.", - requestedState, instance)); - } - - paMap.put(pId, new PartitionAssignment(instance, requestedState.name())); - assignedPartitions.add(pId); - LOG.debug(String.format( - "Instance %s requested a state transition to %s for partition %s.", instance, - requestedState, pName)); - continue; - } - - switch (currState) { - case RUNNING: - case STOPPED: { - TaskPartitionState nextState; - if (jobTgtState == TargetState.START) { - nextState = TaskPartitionState.RUNNING; - } else { - nextState = TaskPartitionState.STOPPED; - } - - paMap.put(pId, new PartitionAssignment(instance, nextState.name())); - assignedPartitions.add(pId); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - nextState, instance)); - } - break; - case COMPLETED: { - // The task has completed on this partition. Mark as such in the context object. - donePartitions.add(pId); - LOG.debug(String - .format( - "Task partition %s has completed with state %s. Marking as such in rebalancer context.", - pName, currState)); - partitionsToDropFromIs.add(pId); - markPartitionCompleted(jobCtx, pId); - } - break; - case TIMED_OUT: - case TASK_ERROR: - case TASK_ABORTED: - case ERROR: { - donePartitions.add(pId); // The task may be rescheduled on a different instance. - LOG.debug(String.format( - "Task partition %s has error state %s with msg %s. Marking as such in rebalancer context.", pName, - currState, taskMsg)); - markPartitionError(jobCtx, pId, currState, true); - // The error policy is to fail the task as soon a single partition fails for a specified - // maximum number of attempts or task is in ABORTED state. - if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask() || - currState.equals(TaskPartitionState.TASK_ABORTED)) { - // If we have some leeway for how many tasks we can fail, then we don't have - // to fail the job immediately - if (skippedPartitions.size() >= jobCfg.getFailureThreshold()) { - markJobFailed(jobResource, jobCtx, workflowConfig, workflowCtx); - _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.FAILED); - markAllPartitionsError(jobCtx, currState, false); - addAllPartitions(allPartitions, partitionsToDropFromIs); - - // remove IdealState of this job - cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); - return buildEmptyAssignment(jobResource, currStateOutput); - } else { - skippedPartitions.add(pId); - partitionsToDropFromIs.add(pId); - } - - LOG.debug("skippedPartitions:" + skippedPartitions); - } else { - // Mark the task to be started at some later time (if enabled) - markPartitionDelayed(jobCfg, jobCtx, pId); - } - } - break; - case INIT: - case DROPPED: { - // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned. - donePartitions.add(pId); - LOG.debug(String.format( - "Task partition %s has state %s. It will be dropped from the current ideal state.", - pName, currState)); - } - break; - default: - throw new AssertionError("Unknown enum symbol: " + currState); - } - } - - // Remove the set of task partitions that are completed or in one of the error states. - pSet.removeAll(donePartitions); - } - - // For delayed tasks, trigger a rebalance event for the closest upcoming ready time - scheduleForNextTask(jobResource, jobCtx, currentTime); - - if (isJobComplete(jobCtx, allPartitions, skippedPartitions, jobCfg)) { - markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx); - _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED); - // remove IdealState of this job - cleanupIdealStateExtView(_manager.getHelixDataAccessor(), jobResource); - } - - // Make additional task assignments if needed. - if (jobTgtState == TargetState.START) { - // Contains the set of task partitions that must be excluded from consideration when making - // any new assignments. - // This includes all completed, failed, delayed, and already assigned partitions. - //Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions); - //HACK: Modify excludeSet to be empty - Set<Integer> excludeSet = Sets.newTreeSet(); - addCompletedTasks(excludeSet, jobCtx, allPartitions); - addGiveupPartitions(excludeSet, jobCtx, allPartitions, jobCfg); - excludeSet.addAll(skippedPartitions); - excludeSet.addAll(getNonReadyPartitions(jobCtx, currentTime)); - // Get instance->[partition, ...] mappings for the target resource. - Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal - .getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx, - workflowConfig, workflowCtx, allPartitions, cache.getIdealStates()); - for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) { - String instance = entry.getKey(); - if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances - .contains(instance)) { - continue; - } - - // Contains the set of task partitions currently assigned to the instance. - Set<Integer> pSet = entry.getValue(); - int numToAssign = jobCfg.getNumConcurrentTasksPerInstance() - pSet.size(); - if (numToAssign > 0) { - List<Integer> nextPartitions = - getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign); - for (Integer pId : nextPartitions) { - // if partition is not currently assigned to instance then it may have been moved - if (!pSet.contains(pId)) { - // look at current assignment to see if task is running on another instance - for (Map.Entry<String, SortedSet<Integer>> currentEntry : taskAssignments.entrySet()) { - String currentInstance = currentEntry.getKey(); - Set<Integer> currentpSet = currentEntry.getValue(); - - // task is being moved, so transition to STOPPED state - if (!instance.equals(currentInstance) && currentpSet.contains(pId)) { - relocatedPaMap.put(pId, new PartitionAssignment(currentInstance, TaskPartitionState.STOPPED.name())); - break; - } - } - } - - String pName = pName(jobResource, pId); - paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name())); - excludeSet.add(pId); - jobCtx.setAssignedParticipant(pId, instance); - jobCtx.setPartitionState(pId, TaskPartitionState.INIT); - jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); - LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, - TaskPartitionState.RUNNING, instance)); - } - } - } - } - - // Construct a ResourceAssignment object from the map of partition assignments. - ResourceAssignment ra = new ResourceAssignment(jobResource); - for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) { - PartitionAssignment pa = e.getValue(); - - if (relocatedPaMap.containsKey(e.getKey())) { - PartitionAssignment currentPa = relocatedPaMap.get(e.getKey()); - - ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), - ImmutableMap.of(pa._instance, pa._state, currentPa._instance, currentPa._state)); - } else { - ra.addReplicaMap(new Partition(pName(jobResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state)); - } - } - - return ra; - } - - private void markJobComplete(String jobName, JobContext jobContext, - WorkflowConfig workflowConfig, WorkflowContext workflowContext) { - long currentTime = System.currentTimeMillis(); - workflowContext.setJobState(jobName, TaskState.COMPLETED); - jobContext.setFinishTime(currentTime); - if (isWorkflowFinished(workflowContext, workflowConfig)) { - workflowContext.setFinishTime(currentTime); - } - } - - private void scheduleForNextTask(String job, JobContext jobCtx, long now) { - // Clear current entries if they exist and are expired - long currentTime = now; - long scheduledTime = _scheduledRebalancer.getRebalanceTime(job); - if (scheduledTime > 0 && currentTime > scheduledTime) { - _scheduledRebalancer.removeScheduledRebalance(job); - } - - // Figure out the earliest schedulable time in the future of a non-complete job - boolean shouldSchedule = false; - long earliestTime = Long.MAX_VALUE; - for (int p : jobCtx.getPartitionSet()) { - long retryTime = jobCtx.getNextRetryTime(p); - TaskPartitionState state = jobCtx.getPartitionState(p); - state = (state != null) ? state : TaskPartitionState.INIT; - Set<TaskPartitionState> errorStates = - Sets.newHashSet(TaskPartitionState.ERROR, TaskPartitionState.TASK_ERROR, - TaskPartitionState.TIMED_OUT); - if (errorStates.contains(state) && retryTime > currentTime && retryTime < earliestTime) { - earliestTime = retryTime; - shouldSchedule = true; - } - } - - // If any was found, then schedule it - if (shouldSchedule) { - _scheduledRebalancer.scheduleRebalance(_manager, job, earliestTime); - } - } - - /** - * Get the last task assignment for a given job - * - * @param resourceName the name of the job - * @return {@link ResourceAssignment} instance, or null if no assignment is available - */ - private ResourceAssignment getPrevResourceAssignment(String resourceName) { - ZNRecord r = _manager.getHelixPropertyStore() - .get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - null, AccessOption.PERSISTENT); - return r != null ? new ResourceAssignment(r) : null; - } - - /** - * Set the last task assignment for a given job - * - * @param resourceName the name of the job - * @param ra {@link ResourceAssignment} containing the task assignment - */ - private void setPrevResourceAssignment(String resourceName, - ResourceAssignment ra) { - _manager.getHelixPropertyStore() - .set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE), - ra.getRecord(), AccessOption.PERSISTENT); - } - - /** - * Checks if the job has completed. - * @param ctx The rebalancer context. - * @param allPartitions The set of partitions to check. - * @param skippedPartitions partitions that failed, but whose failure is acceptable - * @return true if all task partitions have been marked with status - * {@link TaskPartitionState#COMPLETED} in the rebalancer - * context, false otherwise. - */ - private static boolean isJobComplete(JobContext ctx, Set<Integer> allPartitions, - Set<Integer> skippedPartitions, JobConfig cfg) { - for (Integer pId : allPartitions) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (!skippedPartitions.contains(pId) && state != TaskPartitionState.COMPLETED - && !isTaskGivenup(ctx, cfg, pId)) { - return false; - } - } - return true; - } - - - private static void addAllPartitions(Set<Integer> toAdd, Set<Integer> destination) { - for (Integer pId : toAdd) { - destination.add(pId); - } - } - - private static void addCompletedTasks(Set<Integer> set, JobContext ctx, - Iterable<Integer> pIds) { - for (Integer pId : pIds) { - TaskPartitionState state = ctx.getPartitionState(pId); - if (state == TaskPartitionState.COMPLETED) { - set.add(pId); - } - } - } - - private static boolean isTaskGivenup(JobContext ctx, JobConfig cfg, int pId) { - TaskPartitionState state = ctx.getPartitionState(pId); - if ((state != null) && (state.equals(TaskPartitionState.TASK_ABORTED) || state - .equals(TaskPartitionState.ERROR))) { - return true; - } - return ctx.getPartitionNumAttempts(pId) >= cfg.getMaxAttemptsPerTask(); - } - - // add all partitions that have been tried maxNumberAttempts - private static void addGiveupPartitions(Set<Integer> set, JobContext ctx, Iterable<Integer> pIds, - JobConfig cfg) { - for (Integer pId : pIds) { - if (isTaskGivenup(ctx, cfg, pId)) { - set.add(pId); - } - } - } - - private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, - Set<Integer> excluded, int n) { - List<Integer> result = new ArrayList<Integer>(); - for (Integer pId : candidatePartitions) { - if (result.size() >= n) { - break; - } - - if (!excluded.contains(pId)) { - result.add(pId); - } - } - - return result; - } - - private static void markPartitionDelayed(JobConfig cfg, JobContext ctx, int p) { - long delayInterval = cfg.getTaskRetryDelay(); - if (delayInterval <= 0) { - return; - } - long nextStartTime = ctx.getPartitionFinishTime(p) + delayInterval; - ctx.setNextRetryTime(p, nextStartTime); - } - - private static void markPartitionCompleted(JobContext ctx, int pId) { - ctx.setPartitionState(pId, TaskPartitionState.COMPLETED); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - ctx.incrementNumAttempts(pId); - } - - private static void markPartitionError(JobContext ctx, int pId, TaskPartitionState state, - boolean incrementAttempts) { - ctx.setPartitionState(pId, state); - ctx.setPartitionFinishTime(pId, System.currentTimeMillis()); - if (incrementAttempts) { - ctx.incrementNumAttempts(pId); - } - } - - private static void markAllPartitionsError(JobContext ctx, TaskPartitionState state, - boolean incrementAttempts) { - for (int pId : ctx.getPartitionSet()) { - markPartitionError(ctx, pId, state, incrementAttempts); - } - } - - /** - * Return the assignment of task partitions per instance. - */ - private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments( - Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) { - Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>(); - for (String instance : instanceList) { - result.put(instance, new TreeSet<Integer>()); - } - - for (Partition partition : assignment.getMappedPartitions()) { - int pId = TaskUtil.getPartitionId(partition.getPartitionName()); - if (includeSet.contains(pId)) { - Map<String, String> replicaMap = assignment.getReplicaMap(partition); - for (String instance : replicaMap.keySet()) { - SortedSet<Integer> pList = result.get(instance); - if (pList != null) { - pList.add(pId); - } - } - } - } - return result; - } - - private static Set<Integer> getNonReadyPartitions(JobContext ctx, long now) { - Set<Integer> nonReadyPartitions = Sets.newHashSet(); - for (int p : ctx.getPartitionSet()) { - long toStart = ctx.getNextRetryTime(p); - if (now < toStart) { - nonReadyPartitions.add(p); - } - } - return nonReadyPartitions; - } - - private TaskAssignmentCalculator getAssignmentCalulator(JobConfig jobConfig) { - Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap(); - if (taskConfigMap != null && !taskConfigMap.isEmpty()) { - return genericTaskAssignmentCal; - } else { - return fixTaskAssignmentCal; - } - } - - /** - * Computes the partition name given the resource name and partition id. - */ - private String pName(String resource, int pId) { - return resource + "_" + pId; - } - - /** - * An (instance, state) pair. - */ - private static class PartitionAssignment { - private final String _instance; - private final String _state; - - private PartitionAssignment(String instance, String state) { - _instance = instance; - _state = state; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index aaf5f05..10ef3db 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -307,6 +307,9 @@ public class GobblinHelixJobLauncherTest { gobblinHelixJobLauncher.close(); + // job queue deleted asynchronously after close + waitForQueueCleanup(taskDriver, jobName); + jobContext = taskDriver.getJobContext(jobContextName); // job context should have been deleted @@ -325,7 +328,9 @@ public class GobblinHelixJobLauncherTest { gobblinHelixJobLauncher2.close(); - // job queue deleted after close + // job queue deleted asynchronously after close + waitForQueueCleanup(taskDriver, jobName2); + workflowConfig = taskDriver.getWorkflowConfig(jobName2); Assert.assertNull(workflowConfig); @@ -358,4 +363,19 @@ public class GobblinHelixJobLauncherTest { this.closer.close(); } } + + private void waitForQueueCleanup(TaskDriver taskDriver, String queueName) { + for (int i = 0; i < 60; i++) { + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(queueName); + + if (workflowConfig == null) { + break; + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/computeVersions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/computeVersions.gradle b/gradle/scripts/computeVersions.gradle index 2c05350..cf7baaa 100644 --- a/gradle/scripts/computeVersions.gradle +++ b/gradle/scripts/computeVersions.gradle @@ -62,7 +62,7 @@ ext.gradleVersionMajor = Integer.parseInt(gradleVersions[0]) ext.gradleVersionMinor = Integer.parseInt(gradleVersions[1]) println "Detected Gradle version major=" + gradleVersionMajor + " minor=" + gradleVersionMinor -ext.dropwizardMetricsVersion = '3.1.0' +ext.dropwizardMetricsVersion = '3.2.3' ext.findBugsVersion = '3.0.0' ext.googleVersion = '1.22.0' ext.slf4jVersion = '1.7.21' http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d29b72f4/gradle/scripts/dependencyDefinitions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index d138a55..ad1c1cf 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -61,7 +61,7 @@ ext.externalDependency = [ "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion, "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion, "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0", - "helix": "org.apache.helix:helix-core:0.6.7", + "helix": "org.apache.helix:helix-core:0.6.9", "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion, "hiveService": "org.apache.hive:hive-service:" + hiveVersion, "hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,
