TEZ-2191. Simulation improvements to MockDAGAppMaster (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b18552b2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b18552b2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b18552b2 Branch: refs/heads/TEZ-2003 Commit: b18552b2a426a72ea7ab7e24137bc14580c7cd7e Parents: a809f96 Author: Bikas Saha <[email protected]> Authored: Fri Mar 13 17:23:54 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Mar 13 17:23:54 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/dag/app/DAGAppMaster.java | 12 +- .../apache/tez/dag/app/MockDAGAppMaster.java | 250 +++++++++++++++---- .../org/apache/tez/dag/app/MockLocalClient.java | 14 +- .../org/apache/tez/dag/app/MockTezClient.java | 13 +- .../tez/dag/app/TestMockDAGAppMaster.java | 71 +++++- .../org/apache/tez/dag/app/TestPreemption.java | 8 +- .../org/apache/tez/dag/app/TestSpeculation.java | 11 +- .../java/org/apache/tez/test/TestInput.java | 3 +- 9 files changed, 294 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ccf804e..d222619 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2191. Simulation improvements to MockDAGAppMaster TEZ-2195. TestTezJobs::testInvalidQueueSubmission/testInvalidQueueSubmissionToSession fail with hadoop branch-2. TEZ-1827. MiniTezCluster takes 10 minutes to shut down. http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index f6256d7..67efefc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -1252,12 +1252,7 @@ public class DAGAppMaster extends AbstractService { @Override public DAG getCurrentDAG() { - try { - rLock.lock(); - return dag; - } finally { - rLock.unlock(); - } + return dag; } @Override @@ -1676,7 +1671,6 @@ public class DAGAppMaster extends AbstractService { if (this.dagSubmissionTimer != null) { this.dagSubmissionTimer.cancel(); } - stopServices(); // Given pre-emption, we should delete tez scratch dir only if unregister is @@ -1715,7 +1709,9 @@ public class DAGAppMaster extends AbstractService { } } - execService.shutdownNow(); + if (execService != null) { + execService.shutdownNow(); + } super.serviceStop(); } http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index 04a47c6..9943232 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -18,14 +18,21 @@ package org.apache.tez.dag.app; -import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.net.UnknownHostException; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,8 +48,9 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.tez.client.TezApiVersionInfo; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; -import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent; import org.apache.tez.dag.app.launcher.ContainerLauncher; import org.apache.tez.dag.app.rm.NMCommunicatorEvent; import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; @@ -52,8 +60,6 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; import org.apache.tez.dag.app.rm.container.AMContainerEventType; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; @@ -63,8 +69,18 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; +import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; @SuppressWarnings("unchecked") public class MockDAGAppMaster extends DAGAppMaster { @@ -74,6 +90,20 @@ public class MockDAGAppMaster extends DAGAppMaster { boolean initFailFlag; boolean startFailFlag; boolean sendDMEvents; + CountersDelegate countersDelegate; + long launcherSleepTime = 1; + boolean doSleep = true; + int handlerConcurrency = 1; + int numConcurrentContainers = 1; + + ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); + AtomicLong heartbeatCpu = new AtomicLong(0); + AtomicLong heartbeatTime = new AtomicLong(0); + AtomicLong numHearbeats = new AtomicLong(0); + + public static interface CountersDelegate { + public TezCounters getCounters(TaskSpec taskSpec); + } // mock container launcher does not launch real tasks. // Upon, launch of a container is simulates the container asking for tasks @@ -83,14 +113,18 @@ public class MockDAGAppMaster extends DAGAppMaster { BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>(); Thread eventHandlingThread; + ListeningExecutorService executorService; Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap(); + ArrayBlockingQueue<Worker> workers; TaskAttemptListenerImpTezDag taListener; AtomicBoolean startScheduling = new AtomicBoolean(true); AtomicBoolean goFlag; boolean updateProgress = true; + LinkedBlockingQueue<ContainerData> containersToProcess = new LinkedBlockingQueue<ContainerData>(); + Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap(); Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap(); @@ -107,19 +141,31 @@ public class MockDAGAppMaster extends DAGAppMaster { TaskSpec taskSpec; ContainerLaunchContext launchContext; int numUpdates = 0; + int nextFromEventId = 0; boolean completed; + String cIdStr; + AtomicBoolean remove = new AtomicBoolean(false); public ContainerData(ContainerId cId, ContainerLaunchContext context) { this.cId = cId; + this.cIdStr = cId.toString(); this.launchContext = context; } + void remove() { + remove.set(true); + } + void clear() { taId = null; vName = null; taskSpec = null; completed = false; launchContext = null; + numUpdates = 0; + nextFromEventId = 0; + cIdStr = null; + remove.set(false); } } @@ -128,6 +174,15 @@ public class MockDAGAppMaster extends DAGAppMaster { taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener(); eventHandlingThread = new Thread(this); eventHandlingThread.start(); + ExecutorService rawExecutor = Executors.newFixedThreadPool(handlerConcurrency, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MockLauncherExecutionThread [%d]") + .build()); + this.executorService = MoreExecutors.listeningDecorator(rawExecutor); + int numWorkers = numConcurrentContainers*2; // handle races that cause extra + workers = new ArrayBlockingQueue<Worker>(numWorkers); + for (int i=0; i<numWorkers; ++i) { + workers.add(new Worker()); + } } @Override @@ -136,6 +191,9 @@ public class MockDAGAppMaster extends DAGAppMaster { eventHandlingThread.interrupt(); eventHandlingThread.join(2000l); } + if (executorService != null) { + executorService.shutdownNow(); + } } @Override @@ -201,8 +259,10 @@ public class MockDAGAppMaster extends DAGAppMaster { void launch(NMCommunicatorLaunchRequestEvent event) { // launch container by putting it in simulated container list - containers.put(event.getContainerId(), new ContainerData(event.getContainerId(), - event.getContainerLaunchContext())); + ContainerData cData = new ContainerData(event.getContainerId(), + event.getContainerLaunchContext()); + containers.put(event.getContainerId(), cData); + containersToProcess.add(cData); getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId())); } @@ -218,37 +278,102 @@ public class MockDAGAppMaster extends DAGAppMaster { ((MockClock) clock).incrementTime(inc); } } - + @Override public void run() { + Thread.currentThread().setName("MockLauncher"); // wait for test to sync with us and get a reference to us. Go when sync is done LOG.info("Waiting to go"); waitToGo(); LOG.info("Signal to go"); - while(true) { - if (!startScheduling.get()) { // schedule when asked to do so by the test code - continue; + try { + while (true) { + if (!startScheduling.get()) { // schedule when asked to do so by the test code + Thread.sleep(launcherSleepTime); + continue; + } + incrementTime(1000); + ContainerData cData = containersToProcess.take(); + if (!cData.remove.get()) { + Worker worker = workers.remove(); + worker.setContainerData(cData); + ListenableFuture<Void> future = executorService.submit(worker); + Futures.addCallback(future, worker.getCallback()); + } else { + containers.remove(cData.cId); + } + if (doSleep) { + Thread.sleep(launcherSleepTime); + } + } + } catch (InterruptedException ie) { + LOG.warn("Exception in mock container launcher thread", ie); + } + } + + private void doHeartbeat(TezHeartbeatRequest request, ContainerData cData) throws Exception { + long startTime = System.nanoTime(); + long startCpuTime = threadMxBean.getCurrentThreadCpuTime(); + TezHeartbeatResponse response = taListener.heartbeat(request); + if (response.shouldDie()) { + cData.remove(); + } else { + cData.nextFromEventId += response.getEvents().size(); + if (!response.getEvents().isEmpty()) { + long stopTime = System.nanoTime(); + long stopCpuTime = threadMxBean.getCurrentThreadCpuTime(); + heartbeatTime.addAndGet((stopTime-startTime)/1000); + heartbeatCpu.addAndGet((stopCpuTime-startCpuTime)/1000); + numHearbeats.incrementAndGet(); + } + } + } + + class Worker implements Callable<Void> { + class WorkerCallback implements FutureCallback<Void> { + @Override + public void onSuccess(Void arg) { + completeOperation(); + } + + @Override + public void onFailure(Throwable t) { + LOG.fatal("Unexpected error during processing", t); + Worker.this.cData.remove(); + completeOperation(); + } + + void completeOperation() { + workers.add(Worker.this); + containersToProcess.add(Worker.this.cData); } - incrementTime(1000); - for (Map.Entry<ContainerId, ContainerData> entry : containers.entrySet()) { - ContainerData cData = entry.getValue(); - ContainerId cId = entry.getKey(); + } + + volatile ContainerData cData; + WorkerCallback callback = new WorkerCallback(); + + WorkerCallback getCallback() { + return callback; + } + + void setContainerData(ContainerData cData) { + this.cData = cData; + } + + @Override + public Void call() throws Exception { + try { if (cData.taId == null) { // if container is not assigned a task, ask for a task - try { - ContainerTask cTask = taListener.getTask(new ContainerContext(cId.toString())); - if (cTask == null) { - continue; - } + ContainerTask cTask = taListener.getTask(new ContainerContext(cData.cIdStr)); + if (cTask != null) { if (cTask.shouldDie()) { - containers.remove(cId); + cData.remove(); } else { cData.taId = cTask.getTaskSpec().getTaskAttemptID(); cData.vName = cTask.getTaskSpec().getVertexName(); cData.taskSpec = cTask.getTaskSpec(); } - } catch (IOException e) { - e.printStackTrace(); } } else if (!cData.completed) { // container is assigned a task and task is not completed @@ -257,52 +382,59 @@ public class MockDAGAppMaster extends DAGAppMaster { Integer updatesToMake = tasksWithStatusUpdates.get(cData.taId); if (cData.numUpdates == 0 || // do at least one update updatesToMake != null && cData.numUpdates < updatesToMake) { + List<TezEvent> events = Lists.newArrayListWithCapacity( + cData.taskSpec.getOutputs().size() + 1); + if (sendDMEvents) { + for (OutputSpec output : cData.taskSpec.getOutputs()) { + if (output.getPhysicalEdgeCount() == 1) { + events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData( + EventProducerConsumerType.OUTPUT, cData.vName, output + .getDestinationVertexName(), cData.taId))); + } else { + events.add(new TezEvent(CompositeDataMovementEvent.create(0, + output.getPhysicalEdgeCount(), null), new EventMetaData( + EventProducerConsumerType.OUTPUT, cData.vName, output + .getDestinationVertexName(), cData.taId))); + } + } + } + TezCounters counters = null; + if (countersDelegate != null) { + counters = countersDelegate.getCounters(cData.taskSpec); + } cData.numUpdates++; float maxUpdates = (updatesToMake != null) ? updatesToMake.intValue() : 1; float progress = updateProgress ? cData.numUpdates/maxUpdates : 0f; - TezVertexID vertexId = cData.taId.getTaskID().getVertexID(); - getContext().getEventHandler().handle( - new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent( - new TaskStatusUpdateEvent(null, progress), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))))); + events.add(new TezEvent(new TaskStatusUpdateEvent(counters, progress), new EventMetaData( + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + TezHeartbeatRequest request = new TezHeartbeatRequest(cData.numUpdates, events, + cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); + doHeartbeat(request, cData); } else if (version != null && cData.taId.getId() <= version.intValue()) { preemptContainer(cData); } else { // send a done notification - TezVertexID vertexId = cData.taId.getTaskID().getVertexID(); cData.completed = true; - if (sendDMEvents) { - Event event = null; - for (OutputSpec output : cData.taskSpec.getOutputs()) { - if (output.getPhysicalEdgeCount() == 1) { - event = DataMovementEvent.create(0, null); - } else { - event = CompositeDataMovementEvent.create(0, output.getPhysicalEdgeCount(), null); - } - getContext().getEventHandler().handle( - new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent( - event, new EventMetaData(EventProducerConsumerType.OUTPUT, cData.vName, - output.getDestinationVertexName(), cData.taId))))); - } - } - getContext().getEventHandler().handle( - new VertexEventRouteEvent(vertexId, Collections.singletonList(new TezEvent( - new TaskAttemptCompletedEvent(), new EventMetaData( - EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))))); + List<TezEvent> events = Collections.singletonList(new TezEvent( + new TaskAttemptCompletedEvent(), new EventMetaData( + EventProducerConsumerType.SYSTEM, cData.vName, "", cData.taId))); + TezHeartbeatRequest request = new TezHeartbeatRequest(++cData.numUpdates, events, + cData.cIdStr, cData.taId, cData.nextFromEventId, 10000); + doHeartbeat(request, cData); cData.clear(); } } + } catch (Exception e) { + // exception from TA listener. Behave like real. Die and continue with others + LOG.warn("Exception in mock container launcher thread for cId: " + cData.cIdStr, e); + cData.remove(); } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - System.out.println("Interrupted in mock container launcher thread"); - break; - } + return null; } + } - } + public class MockDAGAppMasterShutdownHandler extends DAGAppMasterShutdownHandler { public AtomicInteger shutdownInvoked = new AtomicInteger(0); @@ -329,7 +461,7 @@ public class MockDAGAppMaster extends DAGAppMaster { String nmHost, int nmPort, int nmHttpPort, Clock clock, long appSubmitTime, boolean isSession, String workingDirectory, String[] localDirs, String[] logDirs, AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean startFailFlag, - Credentials credentials, String jobUserName) { + Credentials credentials, String jobUserName, int handlerConcurrency, int numConcurrentContainers) { super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime, isSession, workingDirectory, localDirs, logDirs, new TezApiVersionInfo().getVersion(), 1, credentials, jobUserName); @@ -337,6 +469,9 @@ public class MockDAGAppMaster extends DAGAppMaster { shutdownHandler = new MockDAGAppMasterShutdownHandler(); this.initFailFlag = initFailFlag; this.startFailFlag = startFailFlag; + Preconditions.checkArgument(handlerConcurrency > 0); + this.handlerConcurrency = handlerConcurrency; + this.numConcurrentContainers = numConcurrentContainers; } // use mock container launcher for tests @@ -353,9 +488,16 @@ public class MockDAGAppMaster extends DAGAppMaster { public MockDAGAppMasterShutdownHandler getShutdownHandler() { return (MockDAGAppMasterShutdownHandler) this.shutdownHandler; } + + public void clearStats() { + heartbeatCpu.set(0); + heartbeatTime.set(0); + numHearbeats.set(0); + } @Override public synchronized void serviceInit(Configuration conf) throws Exception { + conf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, numConcurrentContainers); super.serviceInit(conf); if (initFailFlag) { throw new Exception("FailInit"); http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java index 3cb9d8c..5526516 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java @@ -32,20 +32,21 @@ public class MockLocalClient extends LocalClient { Clock mockClock; final boolean initFailFlag; final boolean startFailFlag; + final int concurrency; + final int containers; public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) { - this.mockAppLauncherGoFlag = mockAppLauncherGoFlag; - this.mockClock = clock; - this.initFailFlag = false; - this.startFailFlag = false; + this(mockAppLauncherGoFlag, clock, false, false, 1, 1); } public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock, - boolean initFailFlag, boolean startFailFlag) { + boolean initFailFlag, boolean startFailFlag, int concurrency, int containers) { this.mockAppLauncherGoFlag = mockAppLauncherGoFlag; this.mockClock = clock; this.initFailFlag = initFailFlag; this.startFailFlag = startFailFlag; + this.concurrency = concurrency; + this.containers = containers; } @Override @@ -55,7 +56,8 @@ public class MockLocalClient extends LocalClient { String[] localDirs, String[] logDirs, Credentials credentials, String jobUserName) { mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort, (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, userDir, localDirs, logDirs, - mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, jobUserName); + mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, jobUserName, + concurrency, containers); return mockApp; } http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java index 89df25c..ff66970 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java @@ -41,9 +41,18 @@ public class MockTezClient extends TezClient { MockTezClient(String name, TezConfiguration tezConf, boolean isSession, Map<String, LocalResource> localResources, Credentials credentials, Clock clock, AtomicBoolean mockAppLauncherGoFlag, - boolean initFailFlag, boolean startFailFlag) { + boolean initFailFlag, boolean startFailFlag) { + this(name, tezConf, isSession, localResources, credentials, clock, mockAppLauncherGoFlag, + initFailFlag, startFailFlag, 1, 1); + } + + MockTezClient(String name, TezConfiguration tezConf, boolean isSession, + Map<String, LocalResource> localResources, Credentials credentials, + Clock clock, AtomicBoolean mockAppLauncherGoFlag, + boolean initFailFlag, boolean startFailFlag, int concurrency, int containers) { super(name, tezConf, isSession, localResources, credentials); - this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, initFailFlag, startFailFlag); + this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, initFailFlag, startFailFlag, + concurrency, containers); } protected FrameworkClient createFrameworkClient() { http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index f4734d5..a7feef8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.EdgeProperty; @@ -45,6 +46,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData; import org.apache.tez.dag.app.dag.DAGState; @@ -57,17 +59,18 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.impl.InputSpec; +import org.apache.tez.runtime.api.impl.OutputSpec; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; import org.junit.Test; import com.google.common.collect.Maps; -@SuppressWarnings("deprecation") public class TestMockDAGAppMaster { static Configuration defaultConf; static FileSystem localFs; - static Path workDir; static { try { @@ -75,8 +78,8 @@ public class TestMockDAGAppMaster { defaultConf.set("fs.defaultFS", "file:///"); defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); localFs = FileSystem.getLocal(defaultConf); - workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), - "TestDAGAppMaster").makeQualified(localFs); + String stagingDir = "target" + Path.SEPARATOR + TestMockDAGAppMaster.class.getName() + "-tmpDir"; + defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir); } catch (IOException e) { throw new RuntimeException("init failure", e); } @@ -221,6 +224,66 @@ public class TestMockDAGAppMaster { tezClient.stop(); } + + @Test (timeout = 10000) + public void testBasicCounters() throws Exception { + TezConfiguration tezconf = new TezConfiguration(defaultConf); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false); + tezClient.start(); + + final String vAName = "A"; + final String vBName = "B"; + final String procCounterName = "Proc"; + final String globalCounterName = "Global"; + DAG dag = DAG.create("testBasicCounters"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10); + Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 1); + dag.addVertex(vA) + .addVertex(vB) + .addEdge( + Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER, + DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, + OutputDescriptor.create("Out"), InputDescriptor.create("In")))); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + mockApp.countersDelegate = new CountersDelegate() { + @Override + public TezCounters getCounters(TaskSpec taskSpec) { + String vName = taskSpec.getVertexName(); + TezCounters counters = new TezCounters(); + counters.findCounter(globalCounterName, globalCounterName).increment(1); + counters.findCounter(vName, procCounterName).increment(1); + for (OutputSpec output : taskSpec.getOutputs()) { + counters.findCounter(vName, output.getDestinationVertexName()).increment(1); + } + for (InputSpec input : taskSpec.getInputs()) { + counters.findCounter(vName, input.getSourceVertexName()).increment(1); + } + return counters; + } + }; + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockLauncher.startScheduling(true); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + TezCounters counters = dagImpl.getAllCounters(); + // verify processor counters + Assert.assertEquals(10, counters.findCounter(vAName, procCounterName).getValue()); + Assert.assertEquals(1, counters.findCounter(vBName, procCounterName).getValue()); + // verify edge counters + Assert.assertEquals(10, counters.findCounter(vAName, vBName).getValue()); + Assert.assertEquals(1, counters.findCounter(vBName, vAName).getValue()); + // verify global counters + Assert.assertEquals(11, counters.findCounter(globalCounterName, globalCounterName).getValue()); + + tezClient.stop(); + } @Test (timeout = 10000) public void testMultipleSubmissions() throws Exception { http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java index 8cc2e8b..10917f0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java @@ -48,12 +48,10 @@ import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; import org.junit.Test; -@SuppressWarnings("deprecation") public class TestPreemption { static Configuration defaultConf; static FileSystem localFs; - static Path workDir; static { try { @@ -61,8 +59,8 @@ public class TestPreemption { defaultConf.set("fs.defaultFS", "file:///"); defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); localFs = FileSystem.getLocal(defaultConf); - workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), - "TestDAGAppMaster").makeQualified(localFs); + String stagingDir = "target" + Path.SEPARATOR + TestPreemption.class.getName() + "-tmpDir"; + defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir); } catch (IOException e) { throw new RuntimeException("init failure", e); } @@ -93,7 +91,7 @@ public class TestPreemption { tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0); AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, false, null, null, - null, mockAppLauncherGoFlag); + null, mockAppLauncherGoFlag, false, false, 2, 2); tezClient.start(); DAGClient dagClient = tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER)); http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java index c349957..3413762 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.ProcessorDescriptor; @@ -45,12 +44,9 @@ import org.junit.Test; import com.google.common.base.Joiner; - -@SuppressWarnings("deprecation") public class TestSpeculation { static Configuration defaultConf; static FileSystem localFs; - static Path workDir; MockDAGAppMaster mockApp; MockContainerLauncher mockLauncher; @@ -61,10 +57,9 @@ public class TestSpeculation { defaultConf.set("fs.defaultFS", "file:///"); defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, true); - defaultConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 2); localFs = FileSystem.getLocal(defaultConf); - workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), - "TestSpeculation").makeQualified(localFs); + String stagingDir = "target" + Path.SEPARATOR + TestSpeculation.class.getName() + "-tmpDir"; + defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir); } catch (IOException e) { throw new RuntimeException("init failure", e); } @@ -74,7 +69,7 @@ public class TestSpeculation { TezConfiguration tezconf = new TezConfiguration(defaultConf); AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false); MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, true, null, null, - new MockClock(), mockAppLauncherGoFlag); + new MockClock(), mockAppLauncherGoFlag, false, false, 1, 2); tezClient.start(); syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient); return tezClient; http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index 0050961..1205d41 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -18,7 +18,6 @@ package org.apache.tez.test; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Set; @@ -332,7 +331,7 @@ public class TestInput extends AbstractLogicalInput { if (event instanceof DataMovementEvent) { DataMovementEvent dmEvent = (DataMovementEvent) event; numCompletedInputs++; - LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + + LOG.info(getContext().getSourceVertexName() + " Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + " targetId: " + dmEvent.getTargetIndex() + " version: " + dmEvent.getVersion() + " numInputs: " + getNumPhysicalInputs() +
