Add Partition task start time Add setup and get start time for partition tasks.
RB=715086 G=nuage-reviewers R=lxia,cji A=lxia Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ddefebb2 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ddefebb2 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ddefebb2 Branch: refs/heads/helix-0.6.x Commit: ddefebb245f4f43c8c64a9f8cc1a2536ae25eab4 Parents: 2efa448 Author: Junkai Xue <j...@linkedin.com> Authored: Thu Apr 28 11:36:20 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Tue Jul 5 16:17:38 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/JobContext.java | 23 +++++++++++++++++--- .../org/apache/helix/task/JobRebalancer.java | 1 + .../org/apache/helix/task/WorkflowContext.java | 1 + 3 files changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ddefebb2/helix-core/src/main/java/org/apache/helix/task/JobContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobContext.java b/helix-core/src/main/java/org/apache/helix/task/JobContext.java index 77885cd..2057f27 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobContext.java @@ -59,7 +59,7 @@ public class JobContext extends HelixProperty { public long getStartTime() { String tStr = _record.getSimpleField(ContextProperties.START_TIME.toString()); if (tStr == null) { - return -1; + return WorkflowContext.UNSTARTED; } return Long.parseLong(tStr); } @@ -121,6 +121,23 @@ public class JobContext extends HelixProperty { return Integer.parseInt(nStr); } + public void setPartitionStartTime(int p, long t) { + Map<String, String> map = getMapField(p); + map.put(ContextProperties.START_TIME.toString(), String.valueOf(t)); + } + + public long getPartitionStartTime(int p) { + Map<String, String> map = getMapField(p); + if (map == null) { + return WorkflowContext.UNSTARTED; + } + String tStr = map.get(ContextProperties.START_TIME.toString()); + if (tStr == null) { + return WorkflowContext.UNSTARTED; + } + return Long.parseLong(tStr); + } + public void setPartitionFinishTime(int p, long t) { Map<String, String> map = getMapField(p); map.put(ContextProperties.FINISH_TIME.toString(), String.valueOf(t)); @@ -129,11 +146,11 @@ public class JobContext extends HelixProperty { public long getPartitionFinishTime(int p) { Map<String, String> map = getMapField(p); if (map == null) { - return -1; + return WorkflowContext.UNFINISHED; } String tStr = map.get(ContextProperties.FINISH_TIME.toString()); if (tStr == null) { - return -1; + return WorkflowContext.UNFINISHED; } return Long.parseLong(tStr); } http://git-wip-us.apache.org/repos/asf/helix/blob/ddefebb2/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java index b02089f..0f34178 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java @@ -421,6 +421,7 @@ public class JobRebalancer extends TaskRebalancer { excludeSet.add(pId); jobCtx.setAssignedParticipant(pId, instance); jobCtx.setPartitionState(pId, TaskPartitionState.INIT); + jobCtx.setPartitionStartTime(pId, System.currentTimeMillis()); LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName, TaskPartitionState.RUNNING, instance)); } http://git-wip-us.apache.org/repos/asf/helix/blob/ddefebb2/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java index 0e0a283..9c1f77a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java @@ -36,6 +36,7 @@ public class WorkflowContext extends HelixProperty { public static final String FINISH_TIME = "FINISH_TIME"; public static final String JOB_STATES = "JOB_STATES"; public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW"; + public static final int UNSTARTED = -1; public static final int UNFINISHED = -1; public WorkflowContext(ZNRecord record) {