TEZ-2191. Simulation improvements to MockDAGAppMaster (bikas)

Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b18552b2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b18552b2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b18552b2

Branch: refs/heads/TEZ-2003
Commit: b18552b2a426a72ea7ab7e24137bc14580c7cd7e
Parents: a809f96
Author: Bikas Saha <[email protected]>
Authored: Fri Mar 13 17:23:54 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Mar 13 17:23:54 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  12 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    | 250 +++++++++++++++----
 .../org/apache/tez/dag/app/MockLocalClient.java |  14 +-
 .../org/apache/tez/dag/app/MockTezClient.java   |  13 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       |  71 +++++-
 .../org/apache/tez/dag/app/TestPreemption.java  |   8 +-
 .../org/apache/tez/dag/app/TestSpeculation.java |  11 +-
 .../java/org/apache/tez/test/TestInput.java     |   3 +-
 9 files changed, 294 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ccf804e..d222619 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2191. Simulation improvements to MockDAGAppMaster
   TEZ-2195. 
TestTezJobs::testInvalidQueueSubmission/testInvalidQueueSubmissionToSession
     fail with hadoop branch-2.
   TEZ-1827. MiniTezCluster takes 10 minutes to shut down.

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f6256d7..67efefc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1252,12 +1252,7 @@ public class DAGAppMaster extends AbstractService {
 
     @Override
     public DAG getCurrentDAG() {
-      try {
-        rLock.lock();
-        return dag;
-      } finally {
-        rLock.unlock();
-      }
+      return dag;
     }
     
     @Override
@@ -1676,7 +1671,6 @@ public class DAGAppMaster extends AbstractService {
     if (this.dagSubmissionTimer != null) {
       this.dagSubmissionTimer.cancel();
     }
-        
     stopServices();
 
     // Given pre-emption, we should delete tez scratch dir only if unregister 
is
@@ -1715,7 +1709,9 @@ public class DAGAppMaster extends AbstractService {
       }
     }
 
-    execService.shutdownNow();
+    if (execService != null) {
+      execService.shutdownNow();
+    }
 
     super.serviceStop();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 04a47c6..9943232 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -18,14 +18,21 @@
 
 package org.apache.tez.dag.app;
 
-import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
 import java.net.UnknownHostException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,8 +48,9 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.client.TezApiVersionInfo;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
@@ -52,8 +60,6 @@ import 
org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
@@ -63,8 +69,18 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @SuppressWarnings("unchecked")
 public class MockDAGAppMaster extends DAGAppMaster {
@@ -74,6 +90,20 @@ public class MockDAGAppMaster extends DAGAppMaster {
   boolean initFailFlag;
   boolean startFailFlag;
   boolean sendDMEvents;
+  CountersDelegate countersDelegate;
+  long launcherSleepTime = 1;
+  boolean doSleep = true;
+  int handlerConcurrency = 1;
+  int numConcurrentContainers = 1;
+  
+  ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+  AtomicLong heartbeatCpu = new AtomicLong(0);
+  AtomicLong heartbeatTime = new AtomicLong(0);
+  AtomicLong numHearbeats = new AtomicLong(0);
+  
+  public static interface CountersDelegate {
+    public TezCounters getCounters(TaskSpec taskSpec);
+  }
 
   // mock container launcher does not launch real tasks.
   // Upon, launch of a container is simulates the container asking for tasks
@@ -83,14 +113,18 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
     BlockingQueue<NMCommunicatorEvent> eventQueue = new 
LinkedBlockingQueue<NMCommunicatorEvent>();
     Thread eventHandlingThread;
+    ListeningExecutorService executorService;
     
     Map<ContainerId, ContainerData> containers = Maps.newConcurrentMap();
+    ArrayBlockingQueue<Worker> workers;
     TaskAttemptListenerImpTezDag taListener;
     
     AtomicBoolean startScheduling = new AtomicBoolean(true);
     AtomicBoolean goFlag;
     boolean updateProgress = true;
     
+    LinkedBlockingQueue<ContainerData> containersToProcess = new 
LinkedBlockingQueue<ContainerData>();
+    
     Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
     
     Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = 
Maps.newConcurrentMap();
@@ -107,19 +141,31 @@ public class MockDAGAppMaster extends DAGAppMaster {
       TaskSpec taskSpec;
       ContainerLaunchContext launchContext;
       int numUpdates = 0;
+      int nextFromEventId = 0;
       boolean completed;
+      String cIdStr;
+      AtomicBoolean remove = new AtomicBoolean(false);
       
       public ContainerData(ContainerId cId, ContainerLaunchContext context) {
         this.cId = cId;
+        this.cIdStr = cId.toString();
         this.launchContext = context;
       }
       
+      void remove() {
+        remove.set(true);
+      }
+      
       void clear() {
         taId = null;
         vName = null;
         taskSpec = null;
         completed = false;
         launchContext = null;
+        numUpdates = 0;
+        nextFromEventId = 0;
+        cIdStr = null;
+        remove.set(false);
       }
     }
     
@@ -128,6 +174,15 @@ public class MockDAGAppMaster extends DAGAppMaster {
       taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
       eventHandlingThread = new Thread(this);
       eventHandlingThread.start();
+      ExecutorService rawExecutor = 
Executors.newFixedThreadPool(handlerConcurrency,
+          new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("MockLauncherExecutionThread
 [%d]")
+              .build());
+      this.executorService = MoreExecutors.listeningDecorator(rawExecutor);
+      int numWorkers = numConcurrentContainers*2; // handle races that cause 
extra
+      workers = new ArrayBlockingQueue<Worker>(numWorkers);
+      for (int i=0; i<numWorkers; ++i) {
+        workers.add(new Worker());
+      }
     }
 
     @Override
@@ -136,6 +191,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
         eventHandlingThread.interrupt();
         eventHandlingThread.join(2000l);
       }
+      if (executorService != null) {
+        executorService.shutdownNow();
+      }
     }
     
     @Override
@@ -201,8 +259,10 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
     void launch(NMCommunicatorLaunchRequestEvent event) {
       // launch container by putting it in simulated container list
-      containers.put(event.getContainerId(), new 
ContainerData(event.getContainerId(), 
-          event.getContainerLaunchContext()));
+      ContainerData cData = new ContainerData(event.getContainerId(),
+          event.getContainerLaunchContext());
+      containers.put(event.getContainerId(), cData);
+      containersToProcess.add(cData);
       getContext().getEventHandler().handle(new 
AMContainerEventLaunched(event.getContainerId()));      
     }
     
@@ -218,37 +278,102 @@ public class MockDAGAppMaster extends DAGAppMaster {
         ((MockClock) clock).incrementTime(inc);
       }
     }
-
+    
     @Override
     public void run() {
+      Thread.currentThread().setName("MockLauncher");
       // wait for test to sync with us and get a reference to us. Go when sync 
is done
       LOG.info("Waiting to go");
       waitToGo();
       LOG.info("Signal to go");
-      while(true) {
-        if (!startScheduling.get()) { // schedule when asked to do so by the 
test code
-          continue;
+      try {
+        while (true) {
+          if (!startScheduling.get()) { // schedule when asked to do so by the 
test code
+            Thread.sleep(launcherSleepTime);
+            continue;
+          }
+          incrementTime(1000);
+          ContainerData cData = containersToProcess.take();
+          if (!cData.remove.get()) {
+            Worker worker = workers.remove();
+            worker.setContainerData(cData);
+            ListenableFuture<Void> future = executorService.submit(worker);
+            Futures.addCallback(future, worker.getCallback());            
+          } else {
+            containers.remove(cData.cId);
+          }
+          if (doSleep) {
+            Thread.sleep(launcherSleepTime);
+          }
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Exception in mock container launcher thread", ie);
+      }
+    }
+    
+    private void doHeartbeat(TezHeartbeatRequest request, ContainerData cData) 
throws Exception {
+      long startTime = System.nanoTime();
+      long startCpuTime = threadMxBean.getCurrentThreadCpuTime();
+      TezHeartbeatResponse response = taListener.heartbeat(request);
+      if (response.shouldDie()) {
+        cData.remove();
+      } else {
+        cData.nextFromEventId += response.getEvents().size();
+        if (!response.getEvents().isEmpty()) {
+          long stopTime = System.nanoTime();
+          long stopCpuTime = threadMxBean.getCurrentThreadCpuTime();
+          heartbeatTime.addAndGet((stopTime-startTime)/1000);
+          heartbeatCpu.addAndGet((stopCpuTime-startCpuTime)/1000);
+          numHearbeats.incrementAndGet();
+        }
+      }
+    }
+        
+    class Worker implements Callable<Void> {
+      class WorkerCallback implements FutureCallback<Void> {
+        @Override
+        public void onSuccess(Void arg) {
+          completeOperation();
+        }
+
+        @Override
+        public void onFailure(Throwable t) {
+          LOG.fatal("Unexpected error during processing", t);
+          Worker.this.cData.remove();
+          completeOperation();
+        }
+
+        void completeOperation() {
+          workers.add(Worker.this);
+          containersToProcess.add(Worker.this.cData);
         }
-        incrementTime(1000);
-        for (Map.Entry<ContainerId, ContainerData> entry : 
containers.entrySet()) {
-          ContainerData cData = entry.getValue();
-          ContainerId cId = entry.getKey();
+      }
+
+      volatile ContainerData cData;
+      WorkerCallback callback = new WorkerCallback();
+      
+      WorkerCallback getCallback() {
+        return callback;
+      }
+      
+      void setContainerData(ContainerData cData) {
+        this.cData = cData;
+      }
+      
+      @Override
+      public Void call() throws Exception {
+        try {
           if (cData.taId == null) {
             // if container is not assigned a task, ask for a task
-            try {
-              ContainerTask cTask = taListener.getTask(new 
ContainerContext(cId.toString()));
-              if (cTask == null) {
-                continue;
-              }
+            ContainerTask cTask = taListener.getTask(new 
ContainerContext(cData.cIdStr));
+            if (cTask != null) {
               if (cTask.shouldDie()) {
-                containers.remove(cId);
+                cData.remove();
               } else {
                 cData.taId = cTask.getTaskSpec().getTaskAttemptID();
                 cData.vName = cTask.getTaskSpec().getVertexName();
                 cData.taskSpec = cTask.getTaskSpec();
               }
-            } catch (IOException e) {
-              e.printStackTrace();
             }
           } else if (!cData.completed) {
             // container is assigned a task and task is not completed
@@ -257,52 +382,59 @@ public class MockDAGAppMaster extends DAGAppMaster {
             Integer updatesToMake = tasksWithStatusUpdates.get(cData.taId);
             if (cData.numUpdates == 0 || // do at least one update
                 updatesToMake != null && cData.numUpdates < updatesToMake) {
+              List<TezEvent> events = Lists.newArrayListWithCapacity(
+                                      cData.taskSpec.getOutputs().size() + 1);
+              if (sendDMEvents) {
+                for (OutputSpec output : cData.taskSpec.getOutputs()) {
+                  if (output.getPhysicalEdgeCount() == 1) {
+                    events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, 
null), new EventMetaData(
+                        EventProducerConsumerType.OUTPUT, cData.vName, output
+                            .getDestinationVertexName(), cData.taId)));
+                  } else {
+                    events.add(new 
TezEvent(CompositeDataMovementEvent.create(0,
+                        output.getPhysicalEdgeCount(), null), new 
EventMetaData(
+                        EventProducerConsumerType.OUTPUT, cData.vName, output
+                            .getDestinationVertexName(), cData.taId)));
+                  }
+                }
+              }
+              TezCounters counters = null;
+              if (countersDelegate != null) {
+                 counters = countersDelegate.getCounters(cData.taskSpec);
+              }
               cData.numUpdates++;
               float maxUpdates = (updatesToMake != null) ? 
updatesToMake.intValue() : 1;
               float progress = updateProgress ? cData.numUpdates/maxUpdates : 
0f;
-              TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
-              getContext().getEventHandler().handle(
-                  new VertexEventRouteEvent(vertexId, 
Collections.singletonList(new TezEvent(
-                      new TaskStatusUpdateEvent(null, progress), new 
EventMetaData(
-                          EventProducerConsumerType.SYSTEM, cData.vName, "", 
cData.taId)))));
+              events.add(new TezEvent(new TaskStatusUpdateEvent(counters, 
progress), new EventMetaData(
+                  EventProducerConsumerType.SYSTEM, cData.vName, "", 
cData.taId)));
+              TezHeartbeatRequest request = new 
TezHeartbeatRequest(cData.numUpdates, events,
+                  cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+              doHeartbeat(request, cData);
             } else if (version != null && cData.taId.getId() <= 
version.intValue()) {
               preemptContainer(cData);
             } else {
               // send a done notification
-              TezVertexID vertexId = cData.taId.getTaskID().getVertexID();
               cData.completed = true;
-              if (sendDMEvents) {
-                Event event = null;
-                for (OutputSpec output : cData.taskSpec.getOutputs()) {
-                  if (output.getPhysicalEdgeCount() == 1) {
-                    event = DataMovementEvent.create(0, null);
-                  } else {
-                    event = CompositeDataMovementEvent.create(0, 
output.getPhysicalEdgeCount(), null);
-                  }
-                  getContext().getEventHandler().handle(
-                      new VertexEventRouteEvent(vertexId, 
Collections.singletonList(new TezEvent(
-                          event, new 
EventMetaData(EventProducerConsumerType.OUTPUT, cData.vName,
-                              output.getDestinationVertexName(), 
cData.taId)))));
-                }
-              }
-              getContext().getEventHandler().handle(
-                  new VertexEventRouteEvent(vertexId, 
Collections.singletonList(new TezEvent(
-                      new TaskAttemptCompletedEvent(), new EventMetaData(
-                          EventProducerConsumerType.SYSTEM, cData.vName, "", 
cData.taId)))));
+              List<TezEvent> events = Collections.singletonList(new TezEvent(
+                  new TaskAttemptCompletedEvent(), new EventMetaData(
+                      EventProducerConsumerType.SYSTEM, cData.vName, "", 
cData.taId)));
+              TezHeartbeatRequest request = new 
TezHeartbeatRequest(++cData.numUpdates, events,
+                  cData.cIdStr, cData.taId, cData.nextFromEventId, 10000);
+              doHeartbeat(request, cData);
               cData.clear();
             }
           }
+        } catch (Exception e) {
+          // exception from TA listener. Behave like real. Die and continue 
with others 
+          LOG.warn("Exception in mock container launcher thread for cId: " + 
cData.cIdStr, e);
+          cData.remove();
         }
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException e) {
-          System.out.println("Interrupted in mock container launcher thread");
-          break;
-        }
+        return null;
       }
+      
     }
-    
   }
+  
 
   public class MockDAGAppMasterShutdownHandler extends 
DAGAppMasterShutdownHandler {
     public AtomicInteger shutdownInvoked = new AtomicInteger(0);
@@ -329,7 +461,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
       String nmHost, int nmPort, int nmHttpPort, Clock clock, long 
appSubmitTime,
       boolean isSession, String workingDirectory, String[] localDirs, String[] 
logDirs,
       AtomicBoolean launcherGoFlag, boolean initFailFlag, boolean 
startFailFlag,
-      Credentials credentials, String jobUserName) {
+      Credentials credentials, String jobUserName, int handlerConcurrency, int 
numConcurrentContainers) {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, 
clock, appSubmitTime,
         isSession, workingDirectory, localDirs, logDirs,  new 
TezApiVersionInfo().getVersion(), 1,
         credentials, jobUserName);
@@ -337,6 +469,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
     this.initFailFlag = initFailFlag;
     this.startFailFlag = startFailFlag;
+    Preconditions.checkArgument(handlerConcurrency > 0);
+    this.handlerConcurrency = handlerConcurrency;
+    this.numConcurrentContainers = numConcurrentContainers;
   }
   
   // use mock container launcher for tests
@@ -353,9 +488,16 @@ public class MockDAGAppMaster extends DAGAppMaster {
   public MockDAGAppMasterShutdownHandler getShutdownHandler() {
     return (MockDAGAppMasterShutdownHandler) this.shutdownHandler;
   }
+  
+  public void clearStats() {
+    heartbeatCpu.set(0);
+    heartbeatTime.set(0);
+    numHearbeats.set(0);
+  }
 
   @Override
   public synchronized void serviceInit(Configuration conf) throws Exception {
+    conf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 
numConcurrentContainers);
     super.serviceInit(conf);
     if (initFailFlag) {
       throw new Exception("FailInit");

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
index 3cb9d8c..5526516 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockLocalClient.java
@@ -32,20 +32,21 @@ public class MockLocalClient extends LocalClient {
   Clock mockClock;
   final boolean initFailFlag;
   final boolean startFailFlag;
+  final int concurrency;
+  final int containers;
 
   public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock) {
-    this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
-    this.mockClock = clock;
-    this.initFailFlag = false;
-    this.startFailFlag = false;
+    this(mockAppLauncherGoFlag, clock, false, false, 1, 1);
   }
   
   public MockLocalClient(AtomicBoolean mockAppLauncherGoFlag, Clock clock,
-      boolean initFailFlag, boolean startFailFlag) {
+      boolean initFailFlag, boolean startFailFlag, int concurrency, int 
containers) {
     this.mockAppLauncherGoFlag = mockAppLauncherGoFlag;
     this.mockClock = clock;
     this.initFailFlag = initFailFlag;
     this.startFailFlag = startFailFlag;
+    this.concurrency = concurrency;
+    this.containers = containers;
   }
 
   @Override
@@ -55,7 +56,8 @@ public class MockLocalClient extends LocalClient {
       String[] localDirs, String[] logDirs, Credentials credentials, String 
jobUserName) {
     mockApp = new MockDAGAppMaster(applicationAttemptId, cId, currentHost, 
nmPort, nmHttpPort,
         (mockClock!=null ? mockClock : clock), appSubmitTime, isSession, 
userDir, localDirs, logDirs,
-        mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, 
jobUserName);
+        mockAppLauncherGoFlag, initFailFlag, startFailFlag, credentials, 
jobUserName, 
+        concurrency, containers);
     return mockApp;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
index 89df25c..ff66970 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockTezClient.java
@@ -41,9 +41,18 @@ public class MockTezClient extends TezClient {
   MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
       Map<String, LocalResource> localResources, Credentials credentials,
       Clock clock, AtomicBoolean mockAppLauncherGoFlag,
-      boolean initFailFlag, boolean startFailFlag) {
+ boolean initFailFlag, boolean startFailFlag) {
+    this(name, tezConf, isSession, localResources, credentials, clock, 
mockAppLauncherGoFlag,
+        initFailFlag, startFailFlag, 1, 1);
+  }
+  
+  MockTezClient(String name, TezConfiguration tezConf, boolean isSession,
+      Map<String, LocalResource> localResources, Credentials credentials,
+      Clock clock, AtomicBoolean mockAppLauncherGoFlag,
+      boolean initFailFlag, boolean startFailFlag, int concurrency, int 
containers) {
     super(name, tezConf, isSession, localResources, credentials);
-    this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, 
initFailFlag, startFailFlag);
+    this.client = new MockLocalClient(mockAppLauncherGoFlag, clock, 
initFailFlag, startFailFlag, 
+        concurrency, containers);
   }
 
   protected FrameworkClient createFrameworkClient() {

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index f4734d5..a7feef8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -45,6 +46,7 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import 
org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -57,17 +59,18 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
 
-@SuppressWarnings("deprecation")
 public class TestMockDAGAppMaster {
   static Configuration defaultConf;
   static FileSystem localFs;
-  static Path workDir;
   
   static {
     try {
@@ -75,8 +78,8 @@ public class TestMockDAGAppMaster {
       defaultConf.set("fs.defaultFS", "file:///");
       defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
       localFs = FileSystem.getLocal(defaultConf);
-      workDir = new Path(new Path(System.getProperty("test.build.data", 
"/tmp")),
-          "TestDAGAppMaster").makeQualified(localFs);
+      String stagingDir = "target" + Path.SEPARATOR + 
TestMockDAGAppMaster.class.getName() + "-tmpDir";
+      defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
@@ -221,6 +224,66 @@ public class TestMockDAGAppMaster {
 
     tezClient.stop();
   }
+
+  @Test (timeout = 10000)
+  public void testBasicCounters() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, 
null, null, null,
+        null, false, false);
+    tezClient.start();
+
+    final String vAName = "A";
+    final String vBName = "B";
+    final String procCounterName = "Proc";
+    final String globalCounterName = "Global";
+    DAG dag = DAG.create("testBasicCounters");
+    Vertex vA = Vertex.create(vAName, 
ProcessorDescriptor.create("Proc.class"), 10);
+    Vertex vB = Vertex.create(vBName, 
ProcessorDescriptor.create("Proc.class"), 1);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, 
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                OutputDescriptor.create("Out"), 
InputDescriptor.create("In"))));
+
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.countersDelegate = new CountersDelegate() {
+      @Override
+      public TezCounters getCounters(TaskSpec taskSpec) {
+        String vName = taskSpec.getVertexName();
+        TezCounters counters = new TezCounters();
+        counters.findCounter(globalCounterName, 
globalCounterName).increment(1);
+        counters.findCounter(vName, procCounterName).increment(1);
+        for (OutputSpec output : taskSpec.getOutputs()) {
+          counters.findCounter(vName, 
output.getDestinationVertexName()).increment(1);
+        }
+        for (InputSpec input : taskSpec.getInputs()) {
+          counters.findCounter(vName, 
input.getSourceVertexName()).increment(1);
+        }
+        return counters;
+      }
+    };
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    TezCounters counters = dagImpl.getAllCounters();
+    // verify processor counters
+    Assert.assertEquals(10, counters.findCounter(vAName, 
procCounterName).getValue());
+    Assert.assertEquals(1, counters.findCounter(vBName, 
procCounterName).getValue());
+    // verify edge counters
+    Assert.assertEquals(10, counters.findCounter(vAName, vBName).getValue());
+    Assert.assertEquals(1, counters.findCounter(vBName, vAName).getValue());
+    // verify global counters
+    Assert.assertEquals(11, counters.findCounter(globalCounterName, 
globalCounterName).getValue());
+    
+    tezClient.stop();
+  }
   
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 8cc2e8b..10917f0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -48,12 +48,10 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class TestPreemption {
   
   static Configuration defaultConf;
   static FileSystem localFs;
-  static Path workDir;
   
   static {
     try {
@@ -61,8 +59,8 @@ public class TestPreemption {
       defaultConf.set("fs.defaultFS", "file:///");
       defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
       localFs = FileSystem.getLocal(defaultConf);
-      workDir = new Path(new Path(System.getProperty("test.build.data", 
"/tmp")),
-          "TestDAGAppMaster").makeQualified(localFs);
+      String stagingDir = "target" + Path.SEPARATOR + 
TestPreemption.class.getName() + "-tmpDir";
+      defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
@@ -93,7 +91,7 @@ public class TestPreemption {
     tezconf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 0);
     AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
     MockTezClient tezClient = new MockTezClient("testPreemption", tezconf, 
false, null, null,
-        null, mockAppLauncherGoFlag);
+        null, mockAppLauncherGoFlag, false, false, 2, 2);
     tezClient.start();
     
     DAGClient dagClient = 
tezClient.submitDAG(createDAG(DataMovementType.SCATTER_GATHER));

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index c349957..3413762 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -45,12 +44,9 @@ import org.junit.Test;
 
 import com.google.common.base.Joiner;
 
-
-@SuppressWarnings("deprecation")
 public class TestSpeculation {
   static Configuration defaultConf;
   static FileSystem localFs;
-  static Path workDir;
   
   MockDAGAppMaster mockApp;
   MockContainerLauncher mockLauncher;
@@ -61,10 +57,9 @@ public class TestSpeculation {
       defaultConf.set("fs.defaultFS", "file:///");
       defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
       defaultConf.setBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED, 
true);
-      
defaultConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, 2);
       localFs = FileSystem.getLocal(defaultConf);
-      workDir = new Path(new Path(System.getProperty("test.build.data", 
"/tmp")),
-          "TestSpeculation").makeQualified(localFs);
+      String stagingDir = "target" + Path.SEPARATOR + 
TestSpeculation.class.getName() + "-tmpDir";
+      defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
     } catch (IOException e) {
       throw new RuntimeException("init failure", e);
     }
@@ -74,7 +69,7 @@ public class TestSpeculation {
     TezConfiguration tezconf = new TezConfiguration(defaultConf);
     AtomicBoolean mockAppLauncherGoFlag = new AtomicBoolean(false);
     MockTezClient tezClient = new MockTezClient("testspeculation", tezconf, 
true, null, null,
-        new MockClock(), mockAppLauncherGoFlag);
+        new MockClock(), mockAppLauncherGoFlag, false, false, 1, 2);
     tezClient.start();
     syncWithMockAppLauncher(false, mockAppLauncherGoFlag, tezClient);
     return tezClient;

http://git-wip-us.apache.org/repos/asf/tez/blob/b18552b2/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 0050961..1205d41 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.test;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -332,7 +331,7 @@ public class TestInput extends AbstractLogicalInput {
       if (event instanceof DataMovementEvent) {
         DataMovementEvent dmEvent = (DataMovementEvent) event;
         numCompletedInputs++;
-        LOG.info("Received DataMovement event sourceId : " + 
dmEvent.getSourceIndex() + 
+        LOG.info(getContext().getSourceVertexName() + " Received DataMovement 
event sourceId : " + dmEvent.getSourceIndex() + 
             " targetId: " + dmEvent.getTargetIndex() +
             " version: " + dmEvent.getVersion() +
             " numInputs: " + getNumPhysicalInputs() +

Reply via email to