TEZ-2508. rebase 06/01. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fe9b690f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fe9b690f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fe9b690f Branch: refs/heads/master Commit: fe9b690ffb75dc0f987549b73d45b9eb6e9f16c7 Parents: 5846b8d Author: Siddharth Seth <[email protected]> Authored: Mon Jun 1 16:37:26 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:13:55 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../java/org/apache/tez/dag/api/TaskHeartbeatRequest.java | 7 +++++++ .../java/org/apache/tez/dag/api/TaskHeartbeatResponse.java | 8 +++++++- .../org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java | 8 ++++---- .../java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java | 3 ++- .../apache/tez/runtime/LogicalIOProcessorRuntimeTask.java | 3 --- 6 files changed, 21 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index 42c2e1e..55002fe 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -30,5 +30,6 @@ ALL CHANGES: TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info. TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. TEZ-2502. Fix for TezTaskRunner2 not killing tasks properly in all situations. + TEZ-2508. rebase 06/01 INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java index f6bc8f0..b5ff991 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java @@ -29,15 +29,18 @@ public class TaskHeartbeatRequest { private final TezTaskAttemptID taskAttemptId; private final List<TezEvent> events; private final int startIndex; + private final int preRoutedStartIndex; private final int maxEvents; public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex, + int preRoutedStartIndex, int maxEvents) { this.containerIdentifier = containerIdentifier; this.taskAttemptId = taskAttemptId; this.events = events; this.startIndex = startIndex; + this.preRoutedStartIndex = preRoutedStartIndex; this.maxEvents = maxEvents; } @@ -57,6 +60,10 @@ public class TaskHeartbeatRequest { return startIndex; } + public int getPreRoutedStartIndex() { + return preRoutedStartIndex; + } + public int getMaxEvents() { return maxEvents; } http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java index b826e76..7f063c4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java @@ -23,12 +23,14 @@ public class TaskHeartbeatResponse { private final boolean shouldDie; private final int nextFromEventId; + private final int nextPreRoutedEventId; private final List<TezEvent> events; - public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId) { + public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) { this.shouldDie = shouldDie; this.events = events; this.nextFromEventId = nextFromEventId; + this.nextPreRoutedEventId = nextPreRoutedEventId; } public boolean isShouldDie() { @@ -42,4 +44,8 @@ public class TaskHeartbeatResponse { public int getNextFromEventId() { return nextFromEventId; } + + public int getNextPreRoutedEventId() { + return nextPreRoutedEventId; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 1c61a0d..e2d44e2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -79,7 +79,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements protected final TaskHeartbeatHandler taskHeartbeatHandler; protected final ContainerHeartbeatHandler containerHeartbeatHandler; - private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0); + private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0); private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts = new ConcurrentHashMap<TezTaskAttemptID, ContainerId>(); @@ -195,7 +195,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // So - avoiding synchronization. pingContainerHeartbeatHandler(containerId); - TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null); + TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0); TezTaskAttemptID taskAttemptID = request.getTaskAttemptId(); if (taskAttemptID != null) { ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID); @@ -241,10 +241,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements eventInfo = context .getCurrentDAG() .getVertex(taskAttemptID.getTaskID().getVertexID()) - .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), + .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(), request.getMaxEvents()); } - return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId()); + return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId()); } public void taskAlive(TezTaskAttemptID taskAttemptId) { taskHeartbeatHandler.pinged(taskAttemptId); http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java index 3774eb4..83322f2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java @@ -363,13 +363,14 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator { } TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(), request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(), - request.getMaxEvents()); + request.getPreRoutedStartIndex(), request.getMaxEvents()); tResponse = taskCommunicatorContext.heartbeat(tRequest); } TezHeartbeatResponse response = new TezHeartbeatResponse(); response.setLastRequestId(requestId); response.setEvents(tResponse.getEvents()); response.setNextFromEventId(tResponse.getNextFromEventId()); + response.setNextPreRoutedEventId(tResponse.getNextPreRoutedEventId()); containerInfo.lastRequestId = requestId; containerInfo.lastResponse = response; return response; http://git-wip-us.apache.org/repos/asf/tez/blob/fe9b690f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 449fa0f..c79da5d 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -172,9 +172,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { this.outputsMap = new ConcurrentHashMap<String, LogicalOutput>(numOutputs); this.outputContextMap = new ConcurrentHashMap<String, OutputContext>(numOutputs); - this.initializedInputs = new ConcurrentHashMap<String, LogicalInput>(); - this.initializedOutputs = new ConcurrentHashMap<String, LogicalOutput>(); - this.runInputMap = new LinkedHashMap<String, LogicalInput>(); this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
