TEZ-2707. Fix comments from reviews - part 2. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c03a6ff5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c03a6ff5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c03a6ff5 Branch: refs/heads/master Commit: c03a6ff5fc9193909d1087817bdda47a9df7ae47 Parents: 2ecffa7 Author: Siddharth Seth <[email protected]> Authored: Tue Aug 11 16:52:32 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:15:23 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../dag/app/TaskAttemptListenerImpTezDag.java | 4 ++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 +- .../apache/tez/dag/app/rm/AMSchedulerEvent.java | 10 ++- .../rm/AMSchedulerEventDeallocateContainer.java | 10 +-- .../rm/AMSchedulerEventNodeBlacklistUpdate.java | 8 +-- .../tez/dag/app/rm/AMSchedulerEventTAEnded.java | 8 +-- .../app/rm/AMSchedulerEventTALaunchRequest.java | 8 +-- .../apache/tez/dag/app/rm/node/AMNodeEvent.java | 10 +-- .../apache/tez/dag/app/rm/node/AMNodeImpl.java | 14 ++--- .../tez/dag/app/rm/node/AMNodeTracker.java | 32 +++++----- .../apache/tez/dag/app/MockDAGAppMaster.java | 2 - .../tez/dag/app/dag/impl/TestTaskAttempt.java | 65 ++++++-------------- .../tez/dag/app/dag/impl/TestVertexImpl2.java | 56 ++++++++++++++--- 15 files changed, 114 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index fd3374e..adb800b 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -46,5 +46,6 @@ ALL CHANGES: TEZ-2698. rebase 08/05 TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins TEZ-2678. Fix comments from reviews - part 1. + TEZ-2707. Fix comments from reviews - part 2. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index f88c1de..84b3095 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2422,7 +2422,7 @@ public class DAGAppMaster extends AbstractService { @VisibleForTesting - static void parsePlugin(List<NamedEntityDescriptor> resultList, + public static void parsePlugin(List<NamedEntityDescriptor> resultList, BiMap<String, Integer> pluginMap, List<TezNamedEntityDescriptorProto> namedEntityDescriptorProtos, boolean tezYarnEnabled, boolean uberEnabled, UserPayload defaultPayload) { http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java index 2f6e93c..185193f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java @@ -225,6 +225,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements " events: " + (inEvents != null ? inEvents.size() : -1)); } + long currTime = context.getClock().getTime(); List<TezEvent> otherEvents = new ArrayList<TezEvent>(); // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT) @@ -232,6 +233,9 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements // 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent // 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) { + // for now, set the event time on the AM when it is received. + // this avoids any time disparity between machines. + tezEvent.setEventReceivedTime(currTime); final EventType eventType = tezEvent.getEventType(); if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) { TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index c6d8a7e..1c4102d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -134,7 +134,6 @@ public class TaskAttemptImpl implements TaskAttempt, protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; private long launchTime = 0; - private long scheduleTime = 0; private long finishTime = 0; private String trackerName; private int httpPort; @@ -440,6 +439,7 @@ public class TaskAttemptImpl implements TaskAttempt, this.vertex = this.task.getVertex(); this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); + this.schedulingCausalTA = schedulingCausalTA; this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); @@ -703,7 +703,7 @@ public class TaskAttemptImpl implements TaskAttempt, public long getScheduleTime() { readLock.lock(); try { - return scheduleTime; + return scheduledTime; } finally { readLock.unlock(); } @@ -1071,7 +1071,7 @@ public class TaskAttemptImpl implements TaskAttempt, public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; - ta.scheduleTime = ta.clock.getTime(); + ta.scheduledTime = ta.clock.getTime(); // TODO Creating the remote task here may not be required in case of // recovery. http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java index af0bed0..dd9d951 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEvent.java @@ -22,8 +22,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent; public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> { - // TODO Not a very useful class... - public AMSchedulerEvent(AMSchedulerEventType type) { + private final int schedulerId; + + public AMSchedulerEvent(AMSchedulerEventType type, int schedulerId) { super(type); + this.schedulerId = schedulerId; + } + + public int getSchedulerId() { + return this.schedulerId; } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java index 5270aa2..d1ca99e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventDeallocateContainer.java @@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId; public class AMSchedulerEventDeallocateContainer extends AMSchedulerEvent { private final ContainerId containerId; - private final int schedulerId; - + public AMSchedulerEventDeallocateContainer(ContainerId containerId, int schedulerId) { - super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE); + super(AMSchedulerEventType.S_CONTAINER_DEALLOCATE, schedulerId); this.containerId = containerId; - this.schedulerId = schedulerId; } public ContainerId getContainerId() { return this.containerId; } - - public int getSchedulerId() { - return schedulerId; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java index 679705a..d22c0ec 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventNodeBlacklistUpdate.java @@ -23,20 +23,14 @@ import org.apache.hadoop.yarn.api.records.NodeId; public class AMSchedulerEventNodeBlacklistUpdate extends AMSchedulerEvent { private final NodeId nodeId; - private final int schedulerId; public AMSchedulerEventNodeBlacklistUpdate(NodeId nodeId, boolean add, int schedulerId) { super((add ? AMSchedulerEventType.S_NODE_BLACKLISTED - : AMSchedulerEventType.S_NODE_UNBLACKLISTED)); + : AMSchedulerEventType.S_NODE_UNBLACKLISTED), schedulerId); this.nodeId = nodeId; - this.schedulerId = schedulerId; } public NodeId getNodeId() { return this.nodeId; } - - public int getSchedulerId() { - return schedulerId; - } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java index ccc5465..f7fee3a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAEnded.java @@ -30,17 +30,15 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { private final TaskAttemptState state; private final TaskAttemptEndReason taskAttemptEndReason; private final String diagnostics; - private final int schedulerId; public AMSchedulerEventTAEnded(TaskAttempt attempt, ContainerId containerId, TaskAttemptState state, TaskAttemptEndReason taskAttemptEndReason, String diagnostics, int schedulerId) { - super(AMSchedulerEventType.S_TA_ENDED); + super(AMSchedulerEventType.S_TA_ENDED, schedulerId); this.attempt = attempt; this.containerId = containerId; this.state = state; this.taskAttemptEndReason = taskAttemptEndReason; this.diagnostics = diagnostics; - this.schedulerId = schedulerId; } public TezTaskAttemptID getAttemptID() { @@ -59,10 +57,6 @@ public class AMSchedulerEventTAEnded extends AMSchedulerEvent { return this.containerId; } - public int getSchedulerId() { - return schedulerId; - } - public TaskAttemptEndReason getTaskAttemptEndReason() { return taskAttemptEndReason; } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java index c59193c..0424c97 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java @@ -38,7 +38,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { private final TaskSpec remoteTaskSpec; private final TaskAttempt taskAttempt; - private final int schedulerId; private final int launcherId; private final int taskCommId; @@ -48,7 +47,7 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { TaskLocationHint locationHint, int priority, ContainerContext containerContext, int schedulerId, int launcherId, int taskCommId) { - super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST); + super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST, schedulerId); this.attemptId = attemptId; this.capability = capability; this.remoteTaskSpec = remoteTaskSpec; @@ -56,7 +55,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { this.locationHint = locationHint; this.priority = priority; this.containerContext = containerContext; - this.schedulerId = schedulerId; this.launcherId = launcherId; this.taskCommId = taskCommId; } @@ -89,10 +87,6 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent { return this.containerContext; } - public int getSchedulerId() { - return schedulerId; - } - public int getLauncherId() { return launcherId; } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java index 85bc513..1a975b0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeEvent.java @@ -24,19 +24,19 @@ import org.apache.hadoop.yarn.event.AbstractEvent; public class AMNodeEvent extends AbstractEvent<AMNodeEventType> { private final NodeId nodeId; - private final int sourceId; // Effectively the schedulerId + private final int schedulerId; - public AMNodeEvent(NodeId nodeId, int sourceId, AMNodeEventType type) { + public AMNodeEvent(NodeId nodeId, int schedulerId, AMNodeEventType type) { super(type); this.nodeId = nodeId; - this.sourceId = sourceId; + this.schedulerId = schedulerId; } public NodeId getNodeId() { return this.nodeId; } - public int getSourceId() { - return sourceId; + public int getSchedulerId() { + return schedulerId; } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java index 88b36cb1f..18d5978 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeImpl.java @@ -54,7 +54,7 @@ public class AMNodeImpl implements AMNode { private final ReadLock readLock; private final WriteLock writeLock; private final NodeId nodeId; - private final int sourceId; + private final int schedulerId; private final AppContext appContext; private final int maxTaskFailuresPerNode; private boolean blacklistingEnabled; @@ -173,14 +173,14 @@ public class AMNodeImpl implements AMNode { @SuppressWarnings("rawtypes") - public AMNodeImpl(NodeId nodeId, int sourceId, int maxTaskFailuresPerNode, + public AMNodeImpl(NodeId nodeId, int schedulerId, int maxTaskFailuresPerNode, EventHandler eventHandler, boolean blacklistingEnabled, AppContext appContext) { ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); this.readLock = rwLock.readLock(); this.writeLock = rwLock.writeLock(); this.nodeId = nodeId; - this.sourceId = sourceId; + this.schedulerId = schedulerId; this.appContext = appContext; this.eventHandler = eventHandler; this.blacklistingEnabled = blacklistingEnabled; @@ -249,7 +249,7 @@ public class AMNodeImpl implements AMNode { /* Blacklist the node with the AMNodeTracker and check if the node should be blacklisted */ protected boolean registerBadNodeAndShouldBlacklist() { - return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, sourceId); + return appContext.getNodeTracker().registerBadNodeAndShouldBlacklist(this, schedulerId); } protected void blacklistSelf() { @@ -259,8 +259,7 @@ public class AMNodeImpl implements AMNode { // these containers are not useful anymore pastContainers.addAll(containers); containers.clear(); - // TODO TEZ-2124 node tracking per ext source - sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, 0)); + sendEvent(new AMSchedulerEventNodeBlacklistUpdate(getNodeId(), true, schedulerId)); } @SuppressWarnings("unchecked") @@ -366,8 +365,7 @@ public class AMNodeImpl implements AMNode { public void transition(AMNodeImpl node, AMNodeEvent nEvent) { node.ignoreBlacklisting = ignore; if (node.getState() == AMNodeState.BLACKLISTED) { - // TODO TEZ-2124 node tracking per ext source - node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, 0)); + node.sendEvent(new AMSchedulerEventNodeBlacklistUpdate(node.getNodeId(), false, node.schedulerId)); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java index 32e515b..751276e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java @@ -82,14 +82,14 @@ public class AMNodeTracker extends AbstractService implements } } - public void nodeSeen(NodeId nodeId, int sourceId) { - PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(sourceId); + public void nodeSeen(NodeId nodeId, int schedulerId) { + PerSourceNodeTracker nodeTracker = getAndCreateIfNeededPerSourceTracker(schedulerId); nodeTracker.nodeSeen(nodeId); } - boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int sourceId) { - return perSourceNodeTrackers.get(sourceId).registerBadNodeAndShouldBlacklist(amNode); + boolean registerBadNodeAndShouldBlacklist(AMNode amNode, int schedulerId) { + return perSourceNodeTrackers.get(schedulerId).registerBadNodeAndShouldBlacklist(amNode); } public void handle(AMNodeEvent rEvent) { @@ -101,42 +101,42 @@ public class AMNodeTracker extends AbstractService implements case N_IGNORE_BLACKLISTING_ENABLED: case N_IGNORE_BLACKLISTING_DISABLED: // All of these will only be seen after a node has been registered. - perSourceNodeTrackers.get(rEvent.getSourceId()).handle(rEvent); + perSourceNodeTrackers.get(rEvent.getSchedulerId()).handle(rEvent); break; case N_TURNED_UNHEALTHY: case N_TURNED_HEALTHY: case N_NODE_COUNT_UPDATED: // These events can be seen without a node having been marked as 'seen' before - getAndCreateIfNeededPerSourceTracker(rEvent.getSourceId()).handle(rEvent); + getAndCreateIfNeededPerSourceTracker(rEvent.getSchedulerId()).handle(rEvent); break; } } - public AMNode get(NodeId nodeId, int sourceId) { - return perSourceNodeTrackers.get(sourceId).get(nodeId); + public AMNode get(NodeId nodeId, int schedulerId) { + return perSourceNodeTrackers.get(schedulerId).get(nodeId); } - public int getNumNodes(int sourceId) { - return perSourceNodeTrackers.get(sourceId).getNumNodes(); + public int getNumNodes(int schedulerId) { + return perSourceNodeTrackers.get(schedulerId).getNumNodes(); } @Private @VisibleForTesting - public boolean isBlacklistingIgnored(int sourceId) { - return perSourceNodeTrackers.get(sourceId).isBlacklistingIgnored(); + public boolean isBlacklistingIgnored(int schedulerId) { + return perSourceNodeTrackers.get(schedulerId).isBlacklistingIgnored(); } public void dagComplete(DAG dag) { // TODO TEZ-2337 Maybe reset failures from previous DAGs } - private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int sourceId) { - PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(sourceId); + private PerSourceNodeTracker getAndCreateIfNeededPerSourceTracker(int schedulerId) { + PerSourceNodeTracker nodeTracker = perSourceNodeTrackers.get(schedulerId); if (nodeTracker == null) { nodeTracker = - new PerSourceNodeTracker(sourceId, eventHandler, appContext, maxTaskFailuresPerNode, + new PerSourceNodeTracker(schedulerId, eventHandler, appContext, maxTaskFailuresPerNode, nodeBlacklistingEnabled, blacklistDisablePercent); - PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(sourceId, nodeTracker); + PerSourceNodeTracker old = perSourceNodeTrackers.putIfAbsent(schedulerId, nodeTracker); nodeTracker = old != null ? old : nodeTracker; } return nodeTracker; http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index b04b461..fe3e4ef 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -417,8 +417,6 @@ public class MockDAGAppMaster extends DAGAppMaster { events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress, stats), new EventMetaData( EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId), MockDAGAppMaster.this.getContext().getClock().getTime())); -// TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, -// cData.cIdStr, cData.taId, cData.nextFromEventId, 50000); TaskHeartbeatRequest request = new TaskHeartbeatRequest(cData.cIdStr, cData.taId, events, cData.nextFromEventId, cData.nextPreRoutedFromEventId, 50000); http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 947ea93..04bb2df 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -21,7 +21,6 @@ package org.apache.tez.dag.app.dag.impl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -286,10 +285,7 @@ public class TestTaskAttempt { TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0); MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -338,10 +334,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -441,10 +434,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = new MockEventHandler(); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -508,10 +498,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -602,10 +589,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -735,10 +719,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -829,10 +810,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -926,10 +904,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1031,10 +1006,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1133,10 +1105,7 @@ public class TestTaskAttempt { MockEventHandler mockEh = new MockEventHandler(); MockEventHandler eventHandler = spy(mockEh); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator taskComm = mock(TaskCommunicator.class); - doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); - doReturn(taskComm).when(taListener).getTaskCommunicator(0); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1280,11 +1249,7 @@ public class TestTaskAttempt { TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); MockEventHandler eventHandler = spy(new MockEventHandler()); - TaskAttemptListener taListener = mock(TaskAttemptListener.class); - TaskCommunicator mockTaskComm = mock(TaskCommunicator.class); - when(mockTaskComm.getAddress()).thenReturn( - new InetSocketAddress("localhost", 0)); - when(taListener.getTaskCommunicator(any(int.class))).thenReturn(mockTaskComm); + TaskAttemptListener taListener = createMockTaskAttemptListener(); Configuration taskConf = new Configuration(); taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); @@ -1412,4 +1377,12 @@ public class TestTaskAttempt { return new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), new HashMap<String, String>(), ""); } + + private TaskAttemptListener createMockTaskAttemptListener() { + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + TaskCommunicator taskComm = mock(TaskCommunicator.class); + doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress(); + doReturn(taskComm).when(taListener).getTaskCommunicator(0); + return taListener; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/c03a6ff5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java index 352ad87..0e34f68 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java @@ -24,36 +24,42 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; +import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DagTypeConverters; +import org.apache.tez.dag.api.NamedEntityDescriptor; import org.apache.tez.dag.api.TaskLocationHint; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex.VertexExecutionContext; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerContext; +import org.apache.tez.dag.app.DAGAppMaster; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.StateChangeNotifier; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; -import org.apache.tez.runtime.api.ExecutionContext; import org.junit.Test; /** @@ -363,16 +369,46 @@ public class TestVertexImpl2 { this.vertexName = "testvertex"; this.vertexExecutionContext = vertexExecutionContext; this.defaultExecutionContext = defaultDagExecitionContext; - if (numPlugins == 0) { - this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0); - this.containerLaunchers.put(TezConstants.getTezYarnServicePluginName(), 0); - this.taskSchedulers.put(TezConstants.getTezYarnServicePluginName(), 0); - } else { + if (numPlugins == 0) { // Add default container plugins only + UserPayload defaultPayload; + try { + defaultPayload = TezUtils.createUserPayloadFromConf(new Configuration(false)); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, null, + true, false, defaultPayload); + DAGAppMaster + .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, null, + true, false, defaultPayload); + DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, null, + true, false, defaultPayload); + } else { // Add N plugins, no YARN defaults + List<TezNamedEntityDescriptorProto> schedulerList = new LinkedList<>(); + List<TezNamedEntityDescriptorProto> launcherList = new LinkedList<>(); + List<TezNamedEntityDescriptorProto> taskCommList = new LinkedList<>(); for (int i = 0; i < numPlugins; i++) { - this.taskSchedulers.put(append(TASK_SCHEDULER_NAME_BASE, i), i); - this.containerLaunchers.put(append(CONTAINER_LAUNCHER_NAME_BASE, i), i); - this.taskComms.put(append(TASK_COMM_NAME_BASE, i), i); + schedulerList.add(TezNamedEntityDescriptorProto.newBuilder() + .setName(append(TASK_SCHEDULER_NAME_BASE, i)).setEntityDescriptor( + DAGProtos.TezEntityDescriptorProto.newBuilder() + .setClassName(append(TASK_SCHEDULER_NAME_BASE, i))).build()); + launcherList.add(TezNamedEntityDescriptorProto.newBuilder() + .setName(append(CONTAINER_LAUNCHER_NAME_BASE, i)).setEntityDescriptor( + DAGProtos.TezEntityDescriptorProto.newBuilder() + .setClassName(append(CONTAINER_LAUNCHER_NAME_BASE, i))).build()); + taskCommList.add( + TezNamedEntityDescriptorProto.newBuilder().setName(append(TASK_COMM_NAME_BASE, i)) + .setEntityDescriptor( + DAGProtos.TezEntityDescriptorProto.newBuilder() + .setClassName(append(TASK_COMM_NAME_BASE, i))).build()); } + DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskSchedulers, + schedulerList, false, false, null); + DAGAppMaster.parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), containerLaunchers, + launcherList, false, false, null); + DAGAppMaster + .parsePlugin(Lists.<NamedEntityDescriptor>newLinkedList(), taskComms, taskCommList, + false, false, null); } this.appContext = createDefaultMockAppContext();
