YARN-5654. Not be able to run SLS with FairScheduler (yufeigu via rkanter)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cfe3957
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cfe3957
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cfe3957

Branch: refs/heads/HDFS-10467
Commit: 5cfe39577249c3a78bf176f0691a4489af045185
Parents: 954e761
Author: Robert Kanter <rkan...@apache.org>
Authored: Wed Mar 29 16:18:13 2017 -0700
Committer: Inigo <inigo...@apache.org>
Committed: Wed Mar 29 19:32:12 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   |  22 +-
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |  24 +-
 .../sls/scheduler/ResourceSchedulerWrapper.java | 969 -------------------
 .../sls/scheduler/SLSCapacityScheduler.java     | 685 ++-----------
 .../yarn/sls/scheduler/SLSFairScheduler.java    | 339 +++++++
 .../yarn/sls/scheduler/SchedulerMetrics.java    | 533 +++++++++-
 .../yarn/sls/scheduler/SchedulerWrapper.java    |  18 +-
 .../hadoop/yarn/sls/scheduler/Tracker.java      |  46 +
 .../apache/hadoop/yarn/sls/utils/SLSUtils.java  |   1 +
 .../apache/hadoop/yarn/sls/web/SLSWebApp.java   |  12 +-
 .../yarn/sls/appmaster/TestAMSimulator.java     |  53 +-
 .../yarn/sls/nodemanager/TestNMSimulator.java   |  32 +-
 12 files changed, 1103 insertions(+), 1631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 61b7f36..ba43816 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -59,16 +59,14 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
 import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
 import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
-import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
-import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
-import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler;
-import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
-import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
+import org.apache.hadoop.yarn.sls.scheduler.*;
 import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
@@ -152,9 +150,9 @@ public class SLSRunner {
     // start application masters
     startAM();
     // set queue & tracked apps information
-    ((SchedulerWrapper) rm.getResourceScheduler())
+    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
                             .setQueueSet(this.queueAppNumMap.keySet());
-    ((SchedulerWrapper) rm.getResourceScheduler())
+    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
                             .setTrackedAppSet(this.trackedApps);
     // print out simulation info
     printSimulationInfo();
@@ -164,7 +162,7 @@ public class SLSRunner {
     runner.start();
   }
 
-  private void startRM() throws IOException, ClassNotFoundException {
+  private void startRM() throws Exception {
     Configuration rmConf = new YarnConfiguration();
     String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
 
@@ -175,10 +173,12 @@ public class SLSRunner {
     if(Class.forName(schedulerClass) == CapacityScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
           SLSCapacityScheduler.class.getName());
-    } else {
+    } else if (Class.forName(schedulerClass) == FairScheduler.class) {
       rmConf.set(YarnConfiguration.RM_SCHEDULER,
-              ResourceSchedulerWrapper.class.getName());
-      rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass);
+          SLSFairScheduler.class.getName());
+    } else if (Class.forName(schedulerClass) == FifoScheduler.class){
+      // TODO add support for FifoScheduler
+      throw new Exception("Fifo Scheduler is not supported yet.");
     }
 
     rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 0573bae..a62f2b6 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Logger;
@@ -219,10 +220,12 @@ public abstract class AMSimulator extends TaskRunner.Task 
{
     simulateFinishTimeMS = System.currentTimeMillis() -
         SLSRunner.getRunner().getStartTimeMS();
     // record job running information
-    ((SchedulerWrapper)rm.getResourceScheduler())
-         .addAMRuntime(appId, 
-                      traceStartTimeMS, traceFinishTimeMS, 
-                      simulateStartTimeMS, simulateFinishTimeMS);
+    SchedulerMetrics schedulerMetrics =
+        ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+    if (schedulerMetrics != null) {
+      schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS,
+          simulateStartTimeMS, simulateFinishTimeMS);
+    }
   }
   
   protected ResourceRequest createResourceRequest(
@@ -330,13 +333,20 @@ public abstract class AMSimulator extends TaskRunner.Task 
{
 
   private void trackApp() {
     if (isTracked) {
-      ((SchedulerWrapper) rm.getResourceScheduler())
-          .addTrackedApp(appId, oldAppId);
+      SchedulerMetrics schedulerMetrics =
+          ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+      if (schedulerMetrics != null) {
+        schedulerMetrics.addTrackedApp(appId, oldAppId);
+      }
     }
   }
   public void untrackApp() {
     if (isTracked) {
-      ((SchedulerWrapper) 
rm.getResourceScheduler()).removeTrackedApp(oldAppId);
+      SchedulerMetrics schedulerMetrics =
+          ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
+      if (schedulerMetrics != null) {
+        schedulerMetrics.removeTrackedApp(oldAppId);
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
deleted file mode 100644
index a4b8e64..0000000
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java
+++ /dev/null
@@ -1,969 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.scheduler;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
-import org.apache.hadoop.yarn.sls.SLSRunner;
-import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.web.SLSWebApp;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.log4j.Logger;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SlidingWindowReservoir;
-import com.codahale.metrics.Timer;
-
-@Private
-@Unstable
-final public class ResourceSchedulerWrapper
-    extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
-    implements SchedulerWrapper, ResourceScheduler, Configurable {
-  private static final String EOL = System.getProperty("line.separator");
-  private static final int SAMPLING_SIZE = 60;
-  private ScheduledExecutorService pool;
-  // counters for scheduler allocate/handle operations
-  private Counter schedulerAllocateCounter;
-  private Counter schedulerHandleCounter;
-  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
-  // Timers for scheduler allocate/handle operations
-  private Timer schedulerAllocateTimer;
-  private Timer schedulerHandleTimer;
-  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
-  private List<Histogram> schedulerHistogramList;
-  private Map<Histogram, Timer> histogramTimerMap;
-  private Lock samplerLock;
-  private Lock queueLock;
-
-  private Configuration conf;
-  private ResourceScheduler scheduler;
-  private Map<ApplicationId, String> appQueueMap =
-          new ConcurrentHashMap<ApplicationId, String>();
-  private BufferedWriter jobRuntimeLogBW;
-
-  // Priority of the ResourceSchedulerWrapper shutdown hook.
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  // web app
-  private SLSWebApp web;
-
-  private Map<ContainerId, Resource> preemptionContainerMap =
-          new ConcurrentHashMap<ContainerId, Resource>();
-
-  // metrics
-  private MetricRegistry metrics;
-  private SchedulerMetrics schedulerMetrics;
-  private boolean metricsON;
-  private String metricsOutputDir;
-  private BufferedWriter metricsLogBW;
-  private boolean running = false;
-  private static Map<Class, Class> defaultSchedulerMetricsMap =
-          new HashMap<Class, Class>();
-  static {
-    defaultSchedulerMetricsMap.put(FairScheduler.class,
-            FairSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(FifoScheduler.class,
-            FifoSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(CapacityScheduler.class,
-            CapacitySchedulerMetrics.class);
-  }
-  // must set by outside
-  private Set<String> queueSet;
-  private Set<String> trackedAppSet;
-
-  public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class);
-
-  public ResourceSchedulerWrapper() {
-    super(ResourceSchedulerWrapper.class.getName());
-    samplerLock = new ReentrantLock();
-    queueLock = new ReentrantLock();
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    // set scheduler
-    Class<? extends ResourceScheduler> klass = conf.getClass(
-        SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class);
-
-    scheduler = ReflectionUtils.newInstance(klass, conf);
-    // start metrics
-    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
-    if (metricsON) {
-      try {
-        initMetrics();
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-
-    ShutdownHookManager.get().addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          if (metricsLogBW != null)  {
-            metricsLogBW.write("]");
-            metricsLogBW.close();
-          }
-          if (web != null) {
-            web.stop();
-          }
-          tearDown();
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-      }
-    }, SHUTDOWN_HOOK_PRIORITY);
-  }
-
-  @Override
-  public Allocation allocate(ApplicationAttemptId attemptId,
-      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
-      List<String> strings, List<String> strings2,
-      ContainerUpdates updateRequests) {
-    if (metricsON) {
-      final Timer.Context context = schedulerAllocateTimer.time();
-      Allocation allocation = null;
-      try {
-        allocation = scheduler.allocate(attemptId, resourceRequests,
-                containerIds, strings, strings2, updateRequests);
-        return allocation;
-      } finally {
-        context.stop();
-        schedulerAllocateCounter.inc();
-        try {
-          updateQueueWithAllocateRequest(allocation, attemptId,
-                  resourceRequests, containerIds);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    } else {
-      return scheduler.allocate(attemptId,
-              resourceRequests, containerIds, strings, strings2, 
updateRequests);
-    }
-  }
-
-  @Override
-  public void handle(SchedulerEvent schedulerEvent) {
-    // metrics off
-    if (! metricsON) {
-      scheduler.handle(schedulerEvent);
-      return;
-    }
-    if(!running)    running = true;
-
-    // metrics on
-    Timer.Context handlerTimer = null;
-    Timer.Context operationTimer = null;
-
-    NodeUpdateSchedulerEventWrapper eventWrapper;
-    try {
-      //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-              && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-        eventWrapper = new NodeUpdateSchedulerEventWrapper(
-                (NodeUpdateSchedulerEvent)schedulerEvent);
-        schedulerEvent = eventWrapper;
-        updateQueueWithNodeUpdate(eventWrapper);
-      } else if (schedulerEvent.getType() == 
SchedulerEventType.APP_ATTEMPT_REMOVED
-          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
-        // check if having AM Container, update resource usage information
-        AppAttemptRemovedSchedulerEvent appRemoveEvent =
-            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-        ApplicationAttemptId appAttemptId =
-                appRemoveEvent.getApplicationAttemptID();
-        String queue = appQueueMap.get(appAttemptId.getApplicationId());
-        SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
-        if (! app.getLiveContainers().isEmpty()) {  // have 0 or 1
-          // should have one container which is AM container
-          RMContainer rmc = app.getLiveContainers().iterator().next();
-          updateQueueMetrics(queue,
-                  rmc.getContainer().getResource().getMemorySize(),
-                  rmc.getContainer().getResource().getVirtualCores());
-        }
-      }
-
-      handlerTimer = schedulerHandleTimer.time();
-      operationTimer = schedulerHandleTimerMap
-              .get(schedulerEvent.getType()).time();
-
-      scheduler.handle(schedulerEvent);
-    } finally {
-      if (handlerTimer != null)     handlerTimer.stop();
-      if (operationTimer != null)   operationTimer.stop();
-      schedulerHandleCounter.inc();
-      schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
-
-      if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED
-          && schedulerEvent instanceof AppRemovedSchedulerEvent) {
-        SLSRunner.decreaseRemainingApps();
-        AppRemovedSchedulerEvent appRemoveEvent =
-                (AppRemovedSchedulerEvent) schedulerEvent;
-        appQueueMap.remove(appRemoveEvent.getApplicationID());
-      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
-          && schedulerEvent instanceof AppAddedSchedulerEvent) {
-        AppAddedSchedulerEvent appAddEvent =
-                (AppAddedSchedulerEvent) schedulerEvent;
-        String queueName = appAddEvent.getQueue();
-        appQueueMap.put(appAddEvent.getApplicationId(), queueName);
-      }
-    }
-  }
-
-  private void updateQueueWithNodeUpdate(
-          NodeUpdateSchedulerEventWrapper eventWrapper) {
-    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
-    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
-    for (UpdatedContainerInfo info : containerList) {
-      for (ContainerStatus status : info.getCompletedContainers()) {
-        ContainerId containerId = status.getContainerId();
-        SchedulerAppReport app = scheduler.getSchedulerAppInfo(
-                containerId.getApplicationAttemptId());
-
-        if (app == null) {
-          // this happens for the AM container
-          // The app have already removed when the NM sends the release
-          // information.
-          continue;
-        }
-
-        String queue =
-            appQueueMap.get(containerId.getApplicationAttemptId()
-              .getApplicationId());
-        int releasedMemory = 0, releasedVCores = 0;
-        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
-          for (RMContainer rmc : app.getLiveContainers()) {
-            if (rmc.getContainerId() == containerId) {
-              releasedMemory += 
rmc.getContainer().getResource().getMemorySize();
-              releasedVCores += rmc.getContainer()
-                      .getResource().getVirtualCores();
-              break;
-            }
-          }
-        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
-          if (preemptionContainerMap.containsKey(containerId)) {
-            Resource preResource = preemptionContainerMap.get(containerId);
-            releasedMemory += preResource.getMemorySize();
-            releasedVCores += preResource.getVirtualCores();
-            preemptionContainerMap.remove(containerId);
-          }
-        }
-        // update queue counters
-        updateQueueMetrics(queue, releasedMemory, releasedVCores);
-      }
-    }
-  }
-
-  private void updateQueueWithAllocateRequest(Allocation allocation,
-                        ApplicationAttemptId attemptId,
-                        List<ResourceRequest> resourceRequests,
-                        List<ContainerId> containerIds) throws IOException {
-    // update queue information
-    Resource pendingResource = Resources.createResource(0, 0);
-    Resource allocatedResource = Resources.createResource(0, 0);
-    String queueName = appQueueMap.get(attemptId.getApplicationId());
-    // container requested
-    for (ResourceRequest request : resourceRequests) {
-      if (request.getResourceName().equals(ResourceRequest.ANY)) {
-        Resources.addTo(pendingResource,
-                Resources.multiply(request.getCapability(),
-                        request.getNumContainers()));
-      }
-    }
-    // container allocated
-    for (Container container : allocation.getContainers()) {
-      Resources.addTo(allocatedResource, container.getResource());
-      Resources.subtractFrom(pendingResource, container.getResource());
-    }
-    // container released from AM
-    SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId);
-    for (ContainerId containerId : containerIds) {
-      Container container = null;
-      for (RMContainer c : report.getLiveContainers()) {
-        if (c.getContainerId().equals(containerId)) {
-          container = c.getContainer();
-          break;
-        }
-      }
-      if (container != null) {
-        // released allocated containers
-        Resources.subtractFrom(allocatedResource, container.getResource());
-      } else {
-        for (RMContainer c : report.getReservedContainers()) {
-          if (c.getContainerId().equals(containerId)) {
-            container = c.getContainer();
-            break;
-          }
-        }
-        if (container != null) {
-          // released reserved containers
-          Resources.subtractFrom(pendingResource, container.getResource());
-        }
-      }
-    }
-    // containers released/preemption from scheduler
-    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
-    if (allocation.getContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getContainerPreemptions());
-    }
-    if (allocation.getStrictContainerPreemptions() != null) {
-      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
-    }
-    if (! preemptionContainers.isEmpty()) {
-      for (ContainerId containerId : preemptionContainers) {
-        if (! preemptionContainerMap.containsKey(containerId)) {
-          Container container = null;
-          for (RMContainer c : report.getLiveContainers()) {
-            if (c.getContainerId().equals(containerId)) {
-              container = c.getContainer();
-              break;
-            }
-          }
-          if (container != null) {
-            preemptionContainerMap.put(containerId, container.getResource());
-          }
-        }
-
-      }
-    }
-
-    // update metrics
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    String names[] = new String[]{
-            "counter.queue." + queueName + ".pending.memory",
-            "counter.queue." + queueName + ".pending.cores",
-            "counter.queue." + queueName + ".allocated.memory",
-            "counter.queue." + queueName + ".allocated.cores"};
-    long values[] = new long[]{pendingResource.getMemorySize(),
-            pendingResource.getVirtualCores(),
-            allocatedResource.getMemorySize(), 
allocatedResource.getVirtualCores()};
-    for (int i = names.length - 1; i >= 0; i --) {
-      if (! counterMap.containsKey(names[i])) {
-        metrics.counter(names[i]);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(names[i]).inc(values[i]);
-    }
-
-    queueLock.lock();
-    try {
-      if (! schedulerMetrics.isTracked(queueName)) {
-        schedulerMetrics.trackQueue(queueName);
-      }
-    } finally {
-      queueLock.unlock();
-    }
-  }
-
-  private void tearDown() throws IOException {
-    // close job runtime writer
-    if (jobRuntimeLogBW != null) {
-      jobRuntimeLogBW.close();
-    }
-    // shut pool
-    if (pool != null)  pool.shutdown();
-  }
-
-  @SuppressWarnings("unchecked")
-  private void initMetrics() throws Exception {
-    metrics = new MetricRegistry();
-    // configuration
-    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
-    int metricsWebAddressPort = conf.getInt(
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
-    // create SchedulerMetrics for current scheduler
-    String schedulerMetricsType = conf.get(scheduler.getClass().getName());
-    Class schedulerMetricsClass = schedulerMetricsType == null?
-            defaultSchedulerMetricsMap.get(scheduler.getClass()) :
-            Class.forName(schedulerMetricsType);
-    schedulerMetrics = (SchedulerMetrics)ReflectionUtils
-            .newInstance(schedulerMetricsClass, new Configuration());
-    schedulerMetrics.init(scheduler, metrics);
-
-    // register various metrics
-    registerJvmMetrics();
-    registerClusterResourceMetrics();
-    registerContainerAppNumMetrics();
-    registerSchedulerMetrics();
-
-    // .csv output
-    initMetricsCSVOutput();
-
-    // start web app to provide real-time tracking
-    web = new SLSWebApp(this, metricsWebAddressPort);
-    web.start();
-
-    // a thread to update histogram timer
-    pool = new ScheduledThreadPoolExecutor(2);
-    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // a thread to output metrics for real-tiem tracking
-    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // application running information
-    jobRuntimeLogBW =
-        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-            metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
-    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
-            "simulate_start_time,simulate_end_time" + EOL);
-    jobRuntimeLogBW.flush();
-  }
-
-  private void registerJvmMetrics() {
-    // add JVM gauges
-    metrics.register("variable.jvm.free.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().freeMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.max.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().maxMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.total.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().totalMemory();
-        }
-      }
-    );
-  }
-
-  private void registerClusterResourceMetrics() {
-    metrics.register("variable.cluster.allocated.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.allocated.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return scheduler.getRootQueueMetrics().getAvailableMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerContainerAppNumMetrics() {
-    metrics.register("variable.running.application",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if (scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAppsRunning();
-          }
-        }
-      }
-    );
-    metrics.register("variable.running.container",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return scheduler.getRootQueueMetrics().getAllocatedContainers();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerSchedulerMetrics() {
-    samplerLock.lock();
-    try {
-      // counters for scheduler operations
-      schedulerAllocateCounter = metrics.counter(
-              "counter.scheduler.operation.allocate");
-      schedulerHandleCounter = metrics.counter(
-              "counter.scheduler.operation.handle");
-      schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Counter counter = metrics.counter(
-                "counter.scheduler.operation.handle." + e);
-        schedulerHandleCounterMap.put(e, counter);
-      }
-      // timers for scheduler operations
-      int timeWindowSize = conf.getInt(
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
-      schedulerAllocateTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
-        schedulerHandleTimerMap.put(e, timer);
-      }
-      // histogram for scheduler operations (Samplers)
-      schedulerHistogramList = new ArrayList<Histogram>();
-      histogramTimerMap = new HashMap<Histogram, Timer>();
-      Histogram schedulerAllocateHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.allocate.timecost",
-              schedulerAllocateHistogram);
-      schedulerHistogramList.add(schedulerAllocateHistogram);
-      histogramTimerMap.put(schedulerAllocateHistogram, 
schedulerAllocateTimer);
-      Histogram schedulerHandleHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.handle.timecost",
-              schedulerHandleHistogram);
-      schedulerHistogramList.add(schedulerHandleHistogram);
-      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Histogram histogram = new Histogram(
-                new SlidingWindowReservoir(SAMPLING_SIZE));
-        metrics.register(
-                "sampler.scheduler.operation.handle." + e + ".timecost",
-                histogram);
-        schedulerHistogramList.add(histogram);
-        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
-      }
-    } finally {
-      samplerLock.unlock();
-    }
-  }
-
-  private void initMetricsCSVOutput() {
-    int timeIntervalMS = conf.getInt(
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
-    File dir = new File(metricsOutputDir + "/metrics");
-    if(! dir.exists()
-            && ! dir.mkdirs()) {
-      LOG.error("Cannot create directory " + dir.getAbsoluteFile());
-    }
-    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
-            .formatFor(Locale.US)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build(new File(metricsOutputDir + "/metrics"));
-    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
-  }
-
-  class HistogramsRunnable implements Runnable {
-    @Override
-    public void run() {
-      samplerLock.lock();
-      try {
-        for (Histogram histogram : schedulerHistogramList) {
-          Timer timer = histogramTimerMap.get(histogram);
-          histogram.update((int) timer.getSnapshot().getMean());
-        }
-      } finally {
-        samplerLock.unlock();
-      }
-    }
-  }
-
-  class MetricsLogRunnable implements Runnable {
-    private boolean firstLine = true;
-    public MetricsLogRunnable() {
-      try {
-        metricsLogBW =
-            new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-                metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
-        metricsLogBW.write("[");
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void run() {
-      if(running) {
-        // all WebApp to get real tracking json
-        String metrics = web.generateRealTimeTrackingMetrics();
-        // output
-        try {
-          if(firstLine) {
-            metricsLogBW.write(metrics + EOL);
-            firstLine = false;
-          } else {
-            metricsLogBW.write("," + metrics + EOL);
-          }
-          metricsLogBW.flush();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-  }
-
-  // the following functions are used by AMSimulator
-  public void addAMRuntime(ApplicationId appId,
-                           long traceStartTimeMS, long traceEndTimeMS,
-                           long simulateStartTimeMS, long simulateEndTimeMS) {
-    if (metricsON) {
-      try {
-        // write job runtime information
-        StringBuilder sb = new StringBuilder();
-        sb.append(appId).append(",").append(traceStartTimeMS).append(",")
-            .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
-            .append(",").append(simulateEndTimeMS);
-        jobRuntimeLogBW.write(sb.toString() + EOL);
-        jobRuntimeLogBW.flush();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private void updateQueueMetrics(String queue,
-                                  long releasedMemory, int releasedVCores) {
-    // update queue counters
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    if (releasedMemory != 0) {
-      String name = "counter.queue." + queue + ".allocated.memory";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedMemory);
-    }
-    if (releasedVCores != 0) {
-      String name = "counter.queue." + queue + ".allocated.cores";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedVCores);
-    }
-  }
-
-  public void setQueueSet(Set<String> queues) {
-    this.queueSet = queues;
-  }
-
-  public Set<String> getQueueSet() {
-    return this.queueSet;
-  }
-
-  public void setTrackedAppSet(Set<String> apps) {
-    this.trackedAppSet = apps;
-  }
-
-  public Set<String> getTrackedAppSet() {
-    return this.trackedAppSet;
-  }
-
-  public MetricRegistry getMetrics() {
-    return metrics;
-  }
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  // API open to out classes
-  public void addTrackedApp(ApplicationId appId, String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.trackApp(appId, oldAppId);
-    }
-  }
-
-  public void removeTrackedApp(String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.untrackApp(oldAppId);
-    }
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).init(conf);
-    super.serviceInit(conf);
-    initScheduler(conf);
-  }
-
-  private synchronized void initScheduler(Configuration configuration) throws
-  IOException {
-    this.applications =
-        new ConcurrentHashMap<ApplicationId,
-        SchedulerApplication<SchedulerApplicationAttempt>>();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceStart() throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).start();
-    super.serviceStart();
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public void serviceStop() throws Exception {
-    ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>)
-        scheduler).stop();
-    super.serviceStop();
-  }
-
-  @Override
-  public void setRMContext(RMContext rmContext) {
-    scheduler.setRMContext(rmContext);
-  }
-
-  @Override
-  public void reinitialize(Configuration conf, RMContext rmContext)
-      throws IOException {
-    scheduler.reinitialize(conf, rmContext);
-  }
-
-  @Override
-  public void recover(RMStateStore.RMState rmState) throws Exception {
-    scheduler.recover(rmState);
-  }
-
-  @Override
-  public QueueInfo getQueueInfo(String s, boolean b, boolean b2)
-          throws IOException {
-    return scheduler.getQueueInfo(s, b, b2);
-  }
-
-  @Override
-  public List<QueueUserACLInfo> getQueueUserAclInfo() {
-    return scheduler.getQueueUserAclInfo();
-  }
-
-  @Override
-  public Resource getMinimumResourceCapability() {
-    return scheduler.getMinimumResourceCapability();
-  }
-
-  @Override
-  public Resource getMaximumResourceCapability() {
-    return scheduler.getMaximumResourceCapability();
-  }
-
-  @Override
-  public ResourceCalculator getResourceCalculator() {
-    return scheduler.getResourceCalculator();
-  }
-
-  @Override
-  public int getNumClusterNodes() {
-    return scheduler.getNumClusterNodes();
-  }
-
-  @Override
-  public SchedulerNodeReport getNodeReport(NodeId nodeId) {
-    return scheduler.getNodeReport(nodeId);
-  }
-
-  @Override
-  public SchedulerAppReport getSchedulerAppInfo(
-          ApplicationAttemptId attemptId) {
-    return scheduler.getSchedulerAppInfo(attemptId);
-  }
-
-  @Override
-  public QueueMetrics getRootQueueMetrics() {
-    return scheduler.getRootQueueMetrics();
-  }
-
-  @Override
-  public synchronized boolean checkAccess(UserGroupInformation callerUGI,
-      QueueACL acl, String queueName) {
-    return scheduler.checkAccess(callerUGI, acl, queueName);
-  }
-
-  @Override
-  public ApplicationResourceUsageReport getAppResourceUsageReport(
-      ApplicationAttemptId appAttemptId) {
-    return scheduler.getAppResourceUsageReport(appAttemptId);
-  }
-
-  @Override
-  public List<ApplicationAttemptId> getAppsInQueue(String queue) {
-    return scheduler.getAppsInQueue(queue);
-  }
-
-  @Override
-  public RMContainer getRMContainer(ContainerId containerId) {
-    return null;
-  }
-
-  @Override
-  public String moveApplication(ApplicationId appId, String newQueue)
-      throws YarnException {
-    return scheduler.moveApplication(appId, newQueue);
-  }
-
-  @Override
-  @LimitedPrivate("yarn")
-  @Unstable
-  public Resource getClusterResource() {
-    return super.getClusterResource();
-  }
-
-  @Override
-  public synchronized List<Container> getTransferredContainers(
-      ApplicationAttemptId currentAttempt) {
-    return new ArrayList<Container>();
-  }
-
-  @Override
-  public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
-      getSchedulerApplications() {
-    return new HashMap<ApplicationId,
-        SchedulerApplication<SchedulerApplicationAttempt>>();
-  }
-
-  @Override
-  protected void completedContainerInternal(RMContainer rmContainer,
-      ContainerStatus containerStatus, RMContainerEventType event) {
-    // do nothing
-  }
-
-  @Override
-  public Priority checkAndGetApplicationPriority(Priority priority,
-      UserGroupInformation user, String queueName, ApplicationId applicationId)
-      throws YarnException {
-    // TODO Dummy implementation.
-    return Priority.newInstance(0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
index 6ea2ab0..7c37465 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java
@@ -17,34 +17,19 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -65,117 +50,63 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
-import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import org.apache.hadoop.yarn.sls.utils.SLSUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.log4j.Logger;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.SlidingWindowReservoir;
 import com.codahale.metrics.Timer;
 
 @Private
 @Unstable
 public class SLSCapacityScheduler extends CapacityScheduler implements
         SchedulerWrapper,Configurable {
-  private static final String EOL = System.getProperty("line.separator");
-  private static final String QUEUE_COUNTER_PREFIX = "counter.queue.";
-  private static final int SAMPLING_SIZE = 60;
-  private ScheduledExecutorService pool;
-  // counters for scheduler allocate/handle operations
-  private Counter schedulerAllocateCounter;
-  private Counter schedulerHandleCounter;
-  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
-  // Timers for scheduler allocate/handle operations
-  private Timer schedulerAllocateTimer;
-  private Timer schedulerHandleTimer;
-  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
-  private List<Histogram> schedulerHistogramList;
-  private Map<Histogram, Timer> histogramTimerMap;
-  private Lock samplerLock;
-  private Lock queueLock;
 
   private Configuration conf;
  
   private Map<ApplicationAttemptId, String> appQueueMap =
           new ConcurrentHashMap<ApplicationAttemptId, String>();
-  private BufferedWriter jobRuntimeLogBW;
-
-  // Priority of the ResourceSchedulerWrapper shutdown hook.
-  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-
-  // web app
-  private SLSWebApp web;
 
   private Map<ContainerId, Resource> preemptionContainerMap =
           new ConcurrentHashMap<ContainerId, Resource>();
 
   // metrics
-  private MetricRegistry metrics;
   private SchedulerMetrics schedulerMetrics;
   private boolean metricsON;
-  private String metricsOutputDir;
-  private BufferedWriter metricsLogBW;
-  private boolean running = false;
-  private static Map<Class, Class> defaultSchedulerMetricsMap =
-          new HashMap<Class, Class>();
-  static {
-    defaultSchedulerMetricsMap.put(FairScheduler.class,
-            FairSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(FifoScheduler.class,
-            FifoSchedulerMetrics.class);
-    defaultSchedulerMetricsMap.put(CapacityScheduler.class,
-            CapacitySchedulerMetrics.class);
-  }
-  // must set by outside
-  private Set<String> queueSet;
-  private Set<String> trackedAppSet;
+  private Tracker tracker;
 
-  public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class);
+  public Tracker getTracker() {
+    return tracker;
+  }
 
   public SLSCapacityScheduler() {
-    samplerLock = new ReentrantLock();
-    queueLock = new ReentrantLock();
+    tracker = new Tracker();
   }
 
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
     super.setConf(conf);
-    // start metrics
     metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
     if (metricsON) {
       try {
-        initMetrics();
+        schedulerMetrics = SchedulerMetrics.getInstance(conf,
+            CapacityScheduler.class);
+        schedulerMetrics.init(this, conf);
       } catch (Exception e) {
         e.printStackTrace();
       }
-    }
 
-    ShutdownHookManager.get().addShutdownHook(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          if (metricsLogBW != null)  {
-            metricsLogBW.write("]");
-            metricsLogBW.close();
-          }
-          if (web != null) {
-            web.stop();
+      ShutdownHookManager.get().addShutdownHook(new Runnable() {
+        @Override public void run() {
+          try {
+            schedulerMetrics.tearDown();
+          } catch (Exception e) {
+            e.printStackTrace();
           }
-          tearDown();
-        } catch (Exception e) {
-          e.printStackTrace();
         }
-      }
-    }, SHUTDOWN_HOOK_PRIORITY);
+      }, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
+    }
   }
 
   @Override
@@ -184,7 +115,8 @@ public class SLSCapacityScheduler extends CapacityScheduler 
implements
       List<String> strings, List<String> strings2,
       ContainerUpdates updateRequests) {
     if (metricsON) {
-      final Timer.Context context = schedulerAllocateTimer.time();
+      final Timer.Context context = 
schedulerMetrics.getSchedulerAllocateTimer()
+          .time();
       Allocation allocation = null;
       try {
         allocation = super
@@ -193,7 +125,7 @@ public class SLSCapacityScheduler extends CapacityScheduler 
implements
         return allocation;
       } finally {
         context.stop();
-        schedulerAllocateCounter.inc();
+        schedulerMetrics.increaseSchedulerAllocationCounter();
         try {
           updateQueueWithAllocateRequest(allocation, attemptId,
                   resourceRequests, containerIds);
@@ -209,74 +141,76 @@ public class SLSCapacityScheduler extends 
CapacityScheduler implements
 
   @Override
   public void handle(SchedulerEvent schedulerEvent) {
-           // metrics off
-           if (! metricsON) {
-             super.handle(schedulerEvent);
-             return;
-           }
-           if(!running)    running = true;
+    if (!metricsON) {
+      super.handle(schedulerEvent);
+      return;
+    }
 
-           // metrics on
-           Timer.Context handlerTimer = null;
-           Timer.Context operationTimer = null;
+    if (!schedulerMetrics.isRunning()) {
+      schedulerMetrics.setRunning(true);
+    }
 
-           NodeUpdateSchedulerEventWrapper eventWrapper;
-           try {
-             //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-             if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
-                     && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
-               eventWrapper = new NodeUpdateSchedulerEventWrapper(
-                       (NodeUpdateSchedulerEvent)schedulerEvent);
-               schedulerEvent = eventWrapper;
-               updateQueueWithNodeUpdate(eventWrapper);
-             } else if (schedulerEvent.getType() == 
SchedulerEventType.APP_ATTEMPT_REMOVED
-                 && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) 
{
-               // check if having AM Container, update resource usage 
information
-               AppAttemptRemovedSchedulerEvent appRemoveEvent =
-                   (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-               ApplicationAttemptId appAttemptId =
-                       appRemoveEvent.getApplicationAttemptID();
-               String queue = appQueueMap.get(appAttemptId);
-               SchedulerAppReport app = 
super.getSchedulerAppInfo(appAttemptId);
-               if (! app.getLiveContainers().isEmpty()) {  // have 0 or 1
-                 // should have one container which is AM container
-                 RMContainer rmc = app.getLiveContainers().iterator().next();
-                 updateQueueMetrics(queue,
-                         rmc.getContainer().getResource().getMemorySize(),
-                         rmc.getContainer().getResource().getVirtualCores());
-               }
-             }
+    Timer.Context handlerTimer = null;
+    Timer.Context operationTimer = null;
 
-             handlerTimer = schedulerHandleTimer.time();
-             operationTimer = schedulerHandleTimerMap
-                     .get(schedulerEvent.getType()).time();
+    NodeUpdateSchedulerEventWrapper eventWrapper;
+    try {
+      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+            (NodeUpdateSchedulerEvent)schedulerEvent);
+        schedulerEvent = eventWrapper;
+        updateQueueWithNodeUpdate(eventWrapper);
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        // check if having AM Container, update resource usage information
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        ApplicationAttemptId appAttemptId =
+            appRemoveEvent.getApplicationAttemptID();
+        String queue = appQueueMap.get(appAttemptId);
+        SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId);
+        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
+          // should have one container which is AM container
+          RMContainer rmc = app.getLiveContainers().iterator().next();
+          schedulerMetrics.updateQueueMetricsByRelease(
+              rmc.getContainer().getResource(), queue);
+        }
+      }
 
-             super.handle(schedulerEvent);
-           } finally {
-             if (handlerTimer != null)     handlerTimer.stop();
-             if (operationTimer != null)   operationTimer.stop();
-             schedulerHandleCounter.inc();
-             schedulerHandleCounterMap.get(schedulerEvent.getType()).inc();
+      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
+      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
+          schedulerEvent.getType()).time();
 
-             if (schedulerEvent.getType() == 
SchedulerEventType.APP_ATTEMPT_REMOVED
-                 && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) 
{
-               SLSRunner.decreaseRemainingApps();
-               AppAttemptRemovedSchedulerEvent appRemoveEvent =
-                       (AppAttemptRemovedSchedulerEvent) schedulerEvent;
-               ApplicationAttemptId appAttemptId =
-                       appRemoveEvent.getApplicationAttemptID();
-               appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
-             } else if (schedulerEvent.getType() == 
SchedulerEventType.APP_ATTEMPT_ADDED
-                 && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
-          AppAttemptAddedSchedulerEvent appAddEvent =
-              (AppAttemptAddedSchedulerEvent) schedulerEvent;
-          SchedulerApplication app =
-              applications.get(appAddEvent.getApplicationAttemptId()
+      super.handle(schedulerEvent);
+    } finally {
+      if (handlerTimer != null) {
+        handlerTimer.stop();
+      }
+      if (operationTimer != null) {
+        operationTimer.stop();
+      }
+      
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
+
+      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        SLSRunner.decreaseRemainingApps();
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
+      } else if (schedulerEvent.getType() ==
+          SchedulerEventType.APP_ATTEMPT_ADDED
+          && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) {
+        AppAttemptAddedSchedulerEvent appAddEvent =
+            (AppAttemptAddedSchedulerEvent) schedulerEvent;
+        SchedulerApplication app =
+            applications.get(appAddEvent.getApplicationAttemptId()
                 .getApplicationId());
-          appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
-              .getQueueName());
-             }
-           }
+        appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue()
+            .getQueueName());
+      }
+    }
   }
 
   private void updateQueueWithNodeUpdate(
@@ -316,7 +250,8 @@ public class SLSCapacityScheduler extends CapacityScheduler 
implements
           }
         }
         // update queue counters
-        updateQueueMetrics(queue, releasedMemory, releasedVCores);
+        schedulerMetrics.updateQueueMetricsByRelease(
+            Resource.newInstance(releasedMemory, releasedVCores), queue);
       }
     }
   }
@@ -395,410 +330,13 @@ public class SLSCapacityScheduler extends 
CapacityScheduler implements
     }
 
     // update metrics
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    String names[] = new String[]{
-            "counter.queue." + queueName + ".pending.memory",
-            "counter.queue." + queueName + ".pending.cores",
-            "counter.queue." + queueName + ".allocated.memory",
-            "counter.queue." + queueName + ".allocated.cores"};
-    long values[] = new long[]{pendingResource.getMemorySize(),
-            pendingResource.getVirtualCores(),
-            allocatedResource.getMemorySize(), 
allocatedResource.getVirtualCores()};
-    for (int i = names.length - 1; i >= 0; i --) {
-      if (! counterMap.containsKey(names[i])) {
-        metrics.counter(names[i]);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(names[i]).inc(values[i]);
-    }
-
-    queueLock.lock();
-    try {
-      if (! schedulerMetrics.isTracked(queueName)) {
-        schedulerMetrics.trackQueue(queueName);
-      }
-    } finally {
-      queueLock.unlock();
-    }
-  }
-
-  private void tearDown() throws IOException {
-    // close job runtime writer
-    if (jobRuntimeLogBW != null) {
-      jobRuntimeLogBW.close();
-    }
-    // shut pool
-    if (pool != null)  pool.shutdown();
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void initMetrics() throws Exception {
-    metrics = new MetricRegistry();
-    // configuration
-    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
-    int metricsWebAddressPort = conf.getInt(
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
-            SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
-    // create SchedulerMetrics for current scheduler
-    String schedulerMetricsType = conf.get(CapacityScheduler.class.getName());
-    Class schedulerMetricsClass = schedulerMetricsType == null?
-            defaultSchedulerMetricsMap.get(CapacityScheduler.class) :
-            Class.forName(schedulerMetricsType);
-    schedulerMetrics = (SchedulerMetrics)ReflectionUtils
-            .newInstance(schedulerMetricsClass, new Configuration());
-    schedulerMetrics.init(this, metrics);
-
-    // register various metrics
-    registerJvmMetrics();
-    registerClusterResourceMetrics();
-    registerContainerAppNumMetrics();
-    registerSchedulerMetrics();
-
-    // .csv output
-    initMetricsCSVOutput();
-
-    // start web app to provide real-time tracking
-    web = new SLSWebApp(this, metricsWebAddressPort);
-    web.start();
-
-    // a thread to update histogram timer
-    pool = new ScheduledThreadPoolExecutor(2);
-    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // a thread to output metrics for real-tiem tracking
-    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
-            TimeUnit.MILLISECONDS);
-
-    // application running information
-    jobRuntimeLogBW =
-        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-            metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
-    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
-            "simulate_start_time,simulate_end_time" + EOL);
-    jobRuntimeLogBW.flush();
-  }
-
-  private void registerJvmMetrics() {
-    // add JVM gauges
-    metrics.register("variable.jvm.free.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().freeMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.max.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().maxMemory();
-        }
-      }
-    );
-    metrics.register("variable.jvm.total.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          return Runtime.getRuntime().totalMemory();
-        }
-      }
-    );
-  }
-
-  private void registerClusterResourceMetrics() {
-    metrics.register("variable.cluster.allocated.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if( getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return getRootQueueMetrics().getAllocatedMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.allocated.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAllocatedVirtualCores();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.memory",
-      new Gauge<Long>() {
-        @Override
-        public Long getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0L;
-          } else {
-            return getRootQueueMetrics().getAvailableMB();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.available.vcores",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAvailableVirtualCores();
-          }
-        }
-      }
-    );
-    metrics.register("variable.cluster.reserved.memory",
-        new Gauge<Long>() {
-          @Override
-          public Long getValue() {
-            if(getRootQueueMetrics() == null) {
-              return 0L;
-            } else {
-              return getRootQueueMetrics().getReservedMB();
-            }
-          }
-        }
-    );
-    metrics.register("variable.cluster.reserved.vcores",
-        new Gauge<Integer>() {
-          @Override
-          public Integer getValue() {
-            if(getRootQueueMetrics() == null) {
-              return 0;
-            } else {
-              return getRootQueueMetrics().getReservedVirtualCores();
-            }
-          }
-        }
-    );
-  }
-
-  private void registerContainerAppNumMetrics() {
-    metrics.register("variable.running.application",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAppsRunning();
-          }
-        }
-      }
-    );
-    metrics.register("variable.running.container",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          if(getRootQueueMetrics() == null) {
-            return 0;
-          } else {
-            return getRootQueueMetrics().getAllocatedContainers();
-          }
-        }
-      }
-    );
-  }
-
-  private void registerSchedulerMetrics() {
-    samplerLock.lock();
-    try {
-      // counters for scheduler operations
-      schedulerAllocateCounter = metrics.counter(
-              "counter.scheduler.operation.allocate");
-      schedulerHandleCounter = metrics.counter(
-              "counter.scheduler.operation.handle");
-      schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Counter counter = metrics.counter(
-                "counter.scheduler.operation.handle." + e);
-        schedulerHandleCounterMap.put(e, counter);
-      }
-      // timers for scheduler operations
-      int timeWindowSize = conf.getInt(
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
-              SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
-      schedulerAllocateTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimer = new Timer(
-              new SlidingWindowReservoir(timeWindowSize));
-      schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>();
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
-        schedulerHandleTimerMap.put(e, timer);
-      }
-      // histogram for scheduler operations (Samplers)
-      schedulerHistogramList = new ArrayList<Histogram>();
-      histogramTimerMap = new HashMap<Histogram, Timer>();
-      Histogram schedulerAllocateHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.allocate.timecost",
-              schedulerAllocateHistogram);
-      schedulerHistogramList.add(schedulerAllocateHistogram);
-      histogramTimerMap.put(schedulerAllocateHistogram, 
schedulerAllocateTimer);
-      Histogram schedulerHandleHistogram = new Histogram(
-              new SlidingWindowReservoir(SAMPLING_SIZE));
-      metrics.register("sampler.scheduler.operation.handle.timecost",
-              schedulerHandleHistogram);
-      schedulerHistogramList.add(schedulerHandleHistogram);
-      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
-      for (SchedulerEventType e : SchedulerEventType.values()) {
-        Histogram histogram = new Histogram(
-                new SlidingWindowReservoir(SAMPLING_SIZE));
-        metrics.register(
-                "sampler.scheduler.operation.handle." + e + ".timecost",
-                histogram);
-        schedulerHistogramList.add(histogram);
-        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
-      }
-    } finally {
-      samplerLock.unlock();
-    }
-  }
-
-  private void initMetricsCSVOutput() {
-    int timeIntervalMS = conf.getInt(
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
-            SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
-    File dir = new File(metricsOutputDir + "/metrics");
-    if(! dir.exists()
-            && ! dir.mkdirs()) {
-      LOG.error("Cannot create directory " + dir.getAbsoluteFile());
-    }
-    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
-            .formatFor(Locale.US)
-            .convertRatesTo(TimeUnit.SECONDS)
-            .convertDurationsTo(TimeUnit.MILLISECONDS)
-            .build(new File(metricsOutputDir + "/metrics"));
-    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
-  }
-
-  class HistogramsRunnable implements Runnable {
-    @Override
-    public void run() {
-      samplerLock.lock();
-      try {
-        for (Histogram histogram : schedulerHistogramList) {
-          Timer timer = histogramTimerMap.get(histogram);
-          histogram.update((int) timer.getSnapshot().getMean());
-        }
-      } finally {
-        samplerLock.unlock();
-      }
-    }
-  }
-
-  class MetricsLogRunnable implements Runnable {
-    private boolean firstLine = true;
-    public MetricsLogRunnable() {
-      try {
-        metricsLogBW =
-            new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
-                metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
-        metricsLogBW.write("[");
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void run() {
-      if(running) {
-        // all WebApp to get real tracking json
-        String metrics = web.generateRealTimeTrackingMetrics();
-        // output
-        try {
-          if(firstLine) {
-            metricsLogBW.write(metrics + EOL);
-            firstLine = false;
-          } else {
-            metricsLogBW.write("," + metrics + EOL);
-          }
-          metricsLogBW.flush();
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
-      }
-    }
-  }
-
-  // the following functions are used by AMSimulator
-  public void addAMRuntime(ApplicationId appId,
-                           long traceStartTimeMS, long traceEndTimeMS,
-                           long simulateStartTimeMS, long simulateEndTimeMS) {
-
-    if (metricsON) {
-      try {
-        // write job runtime information
-        StringBuilder sb = new StringBuilder();
-        sb.append(appId).append(",").append(traceStartTimeMS).append(",")
-            .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
-            .append(",").append(simulateEndTimeMS);
-        jobRuntimeLogBW.write(sb.toString() + EOL);
-        jobRuntimeLogBW.flush();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-
-  private void updateQueueMetrics(String queue,
-                                  long releasedMemory, int releasedVCores) {
-    // update queue counters
-    SortedMap<String, Counter> counterMap = metrics.getCounters();
-    if (releasedMemory != 0) {
-      String name = "counter.queue." + queue + ".allocated.memory";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedMemory);
-    }
-    if (releasedVCores != 0) {
-      String name = "counter.queue." + queue + ".allocated.cores";
-      if (! counterMap.containsKey(name)) {
-        metrics.counter(name);
-        counterMap = metrics.getCounters();
-      }
-      counterMap.get(name).inc(-releasedVCores);
-    }
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
   }
 
   private void initQueueMetrics(CSQueue queue) {
     if (queue instanceof LeafQueue) {
-      SortedMap<String, Counter> counterMap = metrics.getCounters();
-      String queueName = queue.getQueueName();
-      String[] names = new String[]{
-          QUEUE_COUNTER_PREFIX + queueName + ".pending.memory",
-          QUEUE_COUNTER_PREFIX + queueName + ".pending.cores",
-          QUEUE_COUNTER_PREFIX + queueName + ".allocated.memory",
-          QUEUE_COUNTER_PREFIX + queueName + ".allocated.cores" };
-
-      for (int i = names.length - 1; i >= 0; i--) {
-        if (!counterMap.containsKey(names[i])) {
-          metrics.counter(names[i]);
-          counterMap = metrics.getCounters();
-        }
-      }
-
-      queueLock.lock();
-      try {
-        if (!schedulerMetrics.isTracked(queueName)) {
-          schedulerMetrics.trackQueue(queueName);
-        }
-      } finally {
-        queueLock.unlock();
-      }
-
+      schedulerMetrics.initQueueMetric(queue.getQueueName());
       return;
     }
 
@@ -811,54 +349,17 @@ public class SLSCapacityScheduler extends 
CapacityScheduler implements
   public void serviceInit(Configuration configuration) throws Exception {
     super.serviceInit(configuration);
 
-    initQueueMetrics(getRootQueue());
-  }
-
-  public void setQueueSet(Set<String> queues) {
-    this.queueSet = queues;
-  }
-
-  public Set<String> getQueueSet() {
-    return this.queueSet;
-  }
-
-  public void setTrackedAppSet(Set<String> apps) {
-    this.trackedAppSet = apps;
-  }
-
-  public Set<String> getTrackedAppSet() {
-    return this.trackedAppSet;
-  }
-
-  public MetricRegistry getMetrics() {
-    return metrics;
-  }
-
-  public SchedulerMetrics getSchedulerMetrics() {
-    return schedulerMetrics;
-  }
-
-  // API open to out classes
-  public void addTrackedApp(ApplicationId appId,
-                            String oldAppId) {
     if (metricsON) {
-      schedulerMetrics.trackApp(appId, oldAppId);
+      initQueueMetrics(getRootQueue());
     }
   }
 
-  public void removeTrackedApp(String oldAppId) {
-    if (metricsON) {
-      schedulerMetrics.untrackApp(oldAppId);
-    }
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
   }
 
   @Override
   public Configuration getConf() {
     return conf;
   }
-
-
-
-
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
new file mode 100644
index 0000000..572dacf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.utils.SLSUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Private
+@Unstable
+public class SLSFairScheduler extends FairScheduler
+    implements SchedulerWrapper, Configurable {
+  private SchedulerMetrics schedulerMetrics;
+  private boolean metricsON;
+  private Tracker tracker;
+
+  private Map<ContainerId, Resource> preemptionContainerMap =
+      new ConcurrentHashMap<>();
+
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
+  }
+
+  public Tracker getTracker() {
+    return tracker;
+  }
+
+  public SLSFairScheduler() {
+    tracker = new Tracker();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConfig(conf);
+
+    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
+    if (metricsON) {
+      try {
+        schedulerMetrics = SchedulerMetrics.getInstance(conf,
+            FairScheduler.class);
+        schedulerMetrics.init(this, conf);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+
+      ShutdownHookManager.get().addShutdownHook(new Runnable() {
+        @Override public void run() {
+          try {
+            schedulerMetrics.tearDown();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
+    }
+  }
+
+  @Override
+  public Allocation allocate(ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      ContainerUpdates updateRequests) {
+    if (metricsON) {
+      final Timer.Context context = 
schedulerMetrics.getSchedulerAllocateTimer()
+          .time();
+      Allocation allocation = null;
+      try {
+        allocation = super.allocate(attemptId, resourceRequests, containerIds,
+            blacklistAdditions, blacklistRemovals, updateRequests);
+        return allocation;
+      } finally {
+        context.stop();
+        schedulerMetrics.increaseSchedulerAllocationCounter();
+        try {
+          updateQueueWithAllocateRequest(allocation, attemptId,
+              resourceRequests, containerIds);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    } else {
+      return super.allocate(attemptId, resourceRequests, containerIds,
+          blacklistAdditions, blacklistRemovals, updateRequests);
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent schedulerEvent) {
+    // metrics off
+    if (!metricsON) {
+      super.handle(schedulerEvent);
+      return;
+    }
+
+    // metrics on
+    if(!schedulerMetrics.isRunning()) {
+      schedulerMetrics.setRunning(true);
+    }
+
+    Timer.Context handlerTimer = null;
+    Timer.Context operationTimer = null;
+
+    NodeUpdateSchedulerEventWrapper eventWrapper;
+    try {
+      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+            (NodeUpdateSchedulerEvent)schedulerEvent);
+        schedulerEvent = eventWrapper;
+        updateQueueWithNodeUpdate(eventWrapper);
+      } else if (
+          schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        // check if having AM Container, update resource usage information
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        ApplicationAttemptId appAttemptId =
+            appRemoveEvent.getApplicationAttemptID();
+        String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
+        SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
+        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
+          // should have one container which is AM container
+          RMContainer rmc = app.getLiveContainers().iterator().next();
+          schedulerMetrics.updateQueueMetricsByRelease(
+              rmc.getContainer().getResource(), queueName);
+        }
+      }
+
+      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
+      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
+          schedulerEvent.getType()).time();
+
+      super.handle(schedulerEvent);
+    } finally {
+      if (handlerTimer != null) {
+        handlerTimer.stop();
+      }
+      if (operationTimer != null) {
+        operationTimer.stop();
+      }
+      
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
+
+      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        SLSRunner.decreaseRemainingApps();
+      }
+    }
+  }
+
+  private void updateQueueWithNodeUpdate(
+      NodeUpdateSchedulerEventWrapper eventWrapper) {
+    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
+    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
+    for (UpdatedContainerInfo info : containerList) {
+      for (ContainerStatus status : info.getCompletedContainers()) {
+        ContainerId containerId = status.getContainerId();
+        SchedulerAppReport app = super.getSchedulerAppInfo(
+            containerId.getApplicationAttemptId());
+
+        if (app == null) {
+          // this happens for the AM container
+          // The app have already removed when the NM sends the release
+          // information.
+          continue;
+        }
+
+        int releasedMemory = 0, releasedVCores = 0;
+        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
+          for (RMContainer rmc : app.getLiveContainers()) {
+            if (rmc.getContainerId() == containerId) {
+              Resource resource = rmc.getContainer().getResource();
+              releasedMemory += resource.getMemorySize();
+              releasedVCores += resource.getVirtualCores();
+              break;
+            }
+          }
+        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
+          if (preemptionContainerMap.containsKey(containerId)) {
+            Resource preResource = preemptionContainerMap.get(containerId);
+            releasedMemory += preResource.getMemorySize();
+            releasedVCores += preResource.getVirtualCores();
+            preemptionContainerMap.remove(containerId);
+          }
+        }
+        // update queue counters
+        String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
+            getQueueName();
+        schedulerMetrics.updateQueueMetricsByRelease(
+            Resource.newInstance(releasedMemory, releasedVCores), queue);
+      }
+    }
+  }
+
+  private void updateQueueWithAllocateRequest(Allocation allocation,
+      ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests,
+      List<ContainerId> containerIds) throws IOException {
+    // update queue information
+    Resource pendingResource = Resources.createResource(0, 0);
+    Resource allocatedResource = Resources.createResource(0, 0);
+    // container requested
+    for (ResourceRequest request : resourceRequests) {
+      if (request.getResourceName().equals(ResourceRequest.ANY)) {
+        Resources.addTo(pendingResource,
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+      }
+    }
+    // container allocated
+    for (Container container : allocation.getContainers()) {
+      Resources.addTo(allocatedResource, container.getResource());
+      Resources.subtractFrom(pendingResource, container.getResource());
+    }
+    // container released from AM
+    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
+    for (ContainerId containerId : containerIds) {
+      Container container = null;
+      for (RMContainer c : report.getLiveContainers()) {
+        if (c.getContainerId().equals(containerId)) {
+          container = c.getContainer();
+          break;
+        }
+      }
+      if (container != null) {
+        // released allocated containers
+        Resources.subtractFrom(allocatedResource, container.getResource());
+      } else {
+        for (RMContainer c : report.getReservedContainers()) {
+          if (c.getContainerId().equals(containerId)) {
+            container = c.getContainer();
+            break;
+          }
+        }
+        if (container != null) {
+          // released reserved containers
+          Resources.subtractFrom(pendingResource, container.getResource());
+        }
+      }
+    }
+    // containers released/preemption from scheduler
+    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
+    if (allocation.getContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getContainerPreemptions());
+    }
+    if (allocation.getStrictContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
+    }
+    if (!preemptionContainers.isEmpty()) {
+      for (ContainerId containerId : preemptionContainers) {
+        if (!preemptionContainerMap.containsKey(containerId)) {
+          Container container = null;
+          for (RMContainer c : report.getLiveContainers()) {
+            if (c.getContainerId().equals(containerId)) {
+              container = c.getContainer();
+              break;
+            }
+          }
+          if (container != null) {
+            preemptionContainerMap.put(containerId, container.getResource());
+          }
+        }
+
+      }
+    }
+
+    // update metrics
+    String queueName = getSchedulerApp(attemptId).getQueueName();
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
+  }
+
+  private void initQueueMetrics(FSQueue queue) {
+    if (queue instanceof FSLeafQueue) {
+      schedulerMetrics.initQueueMetric(queue.getQueueName());
+      return;
+    }
+
+    for (FSQueue child : queue.getChildQueues()) {
+      initQueueMetrics(child);
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    if (metricsON) {
+      initQueueMetrics(getQueueManager().getRootQueue());
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to