[HELIX-615] Naming problem of scheduled jobs from recurrent queue.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7569a0a7 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7569a0a7 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7569a0a7 Branch: refs/heads/helix-0.6.x Commit: 7569a0a7b999fb6675447919d91e756200364ff5 Parents: d129d3a Author: Lei Xia <[email protected]> Authored: Fri Nov 20 15:50:30 2015 -0800 Committer: Lei Xia <[email protected]> Committed: Fri Nov 20 15:50:30 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskConstants.java | 4 ---- .../main/java/org/apache/helix/task/TaskDriver.java | 16 +++++++++------- .../java/org/apache/helix/task/TaskRebalancer.java | 7 +++++-- .../integration/task/TestRecurringJobQueue.java | 2 +- 4 files changed, 15 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java index 34008d6..305323d 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java @@ -39,8 +39,4 @@ public class TaskConstants { * The root property store path at which the {@link TaskRebalancer} stores context information. */ public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer"; - /** - * Resource prefix for scheduled workflows - */ - public static final String SCHEDULED = "SCHEDULED"; } http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index b4b94f8..cc1eac1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -570,13 +570,15 @@ public class TaskDriver { private void setWorkflowTargetState(String workflowName, TargetState state) { setSingleWorkflowTargetState(workflowName, state); - // For recurring schedules, child workflows must also be handled - List<String> resources = _accessor.getChildNames(_accessor.keyBuilder().resourceConfigs()); - String prefix = workflowName + "_" + TaskConstants.SCHEDULED; - for (String resource : resources) { - if (resource.startsWith(prefix)) { - setSingleWorkflowTargetState(resource, state); - } + // For recurring schedules, last scheduled incomplete workflow must also be handled + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflowName); + String lastScheduledWorkflow = wCtx.getLastScheduledSingleWorkflow(); + WorkflowContext lastScheduledWorkflowCtx = + TaskUtil.getWorkflowContext(_propertyStore, lastScheduledWorkflow); + if (lastScheduledWorkflowCtx != null && + !(lastScheduledWorkflowCtx.getWorkflowState() == TaskState.COMPLETED + || lastScheduledWorkflowCtx.getWorkflowState() == TaskState.FAILED)) { + setSingleWorkflowTargetState(lastScheduledWorkflow, state); } } http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 5a86c3d..3842b66 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -19,6 +19,8 @@ package org.apache.helix.task; * under the License. */ +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -579,8 +581,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { long timeToSchedule = period * offsetMultiplier + startTime.getTime(); // Now clone the workflow if this clone has not yet been created - String newWorkflowName = - workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier; + DateFormat df = new SimpleDateFormat("yyyyMMdd'T'HHmmssZ"); + // Now clone the workflow if this clone has not yet been created + String newWorkflowName = workflowResource + "_" + df.format(new java.util.Date(timeToSchedule)); LOG.debug("Ready to start workflow " + newWorkflowName); if (!newWorkflowName.equals(lastScheduled)) { Workflow clonedWf = http://git-wip-us.apache.org/repos/asf/helix/blob/7569a0a7/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index deca8a7..eef1ce6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -25,7 +25,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; -import java.util.Map;; +import java.util.Map; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager;
