YARN-5654. Not be able to run SLS with FairScheduler (yufeigu via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5cfe3957 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5cfe3957 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5cfe3957 Branch: refs/heads/HDFS-10467 Commit: 5cfe39577249c3a78bf176f0691a4489af045185 Parents: 954e761 Author: Robert Kanter <rkan...@apache.org> Authored: Wed Mar 29 16:18:13 2017 -0700 Committer: Inigo <inigo...@apache.org> Committed: Wed Mar 29 19:32:12 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 22 +- .../hadoop/yarn/sls/appmaster/AMSimulator.java | 24 +- .../sls/scheduler/ResourceSchedulerWrapper.java | 969 ------------------- .../sls/scheduler/SLSCapacityScheduler.java | 685 ++----------- .../yarn/sls/scheduler/SLSFairScheduler.java | 339 +++++++ .../yarn/sls/scheduler/SchedulerMetrics.java | 533 +++++++++- .../yarn/sls/scheduler/SchedulerWrapper.java | 18 +- .../hadoop/yarn/sls/scheduler/Tracker.java | 46 + .../apache/hadoop/yarn/sls/utils/SLSUtils.java | 1 + .../apache/hadoop/yarn/sls/web/SLSWebApp.java | 12 +- .../yarn/sls/appmaster/TestAMSimulator.java | 53 +- .../yarn/sls/nodemanager/TestNMSimulator.java | 32 +- 12 files changed, 1103 insertions(+), 1631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 61b7f36..ba43816 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -59,16 +59,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.appmaster.AMSimulator; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper; -import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; -import org.apache.hadoop.yarn.sls.scheduler.TaskRunner; +import org.apache.hadoop.yarn.sls.scheduler.*; import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -152,9 +150,9 @@ public class SLSRunner { // start application masters startAM(); // set queue & tracked apps information - ((SchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setQueueSet(this.queueAppNumMap.keySet()); - ((SchedulerWrapper) rm.getResourceScheduler()) + ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); @@ -164,7 +162,7 @@ public class SLSRunner { runner.start(); } - private void startRM() throws IOException, ClassNotFoundException { + private void startRM() throws Exception { Configuration rmConf = new YarnConfiguration(); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); @@ -175,10 +173,12 @@ public class SLSRunner { if(Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); - } else { + } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, - ResourceSchedulerWrapper.class.getName()); - rmConf.set(SLSConfiguration.RM_SCHEDULER, schedulerClass); + SLSFairScheduler.class.getName()); + } else if (Class.forName(schedulerClass) == FifoScheduler.class){ + // TODO add support for FifoScheduler + throw new Exception("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 0573bae..a62f2b6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @@ -219,10 +220,12 @@ public abstract class AMSimulator extends TaskRunner.Task { simulateFinishTimeMS = System.currentTimeMillis() - SLSRunner.getRunner().getStartTimeMS(); // record job running information - ((SchedulerWrapper)rm.getResourceScheduler()) - .addAMRuntime(appId, - traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, + simulateStartTimeMS, simulateFinishTimeMS); + } } protected ResourceRequest createResourceRequest( @@ -330,13 +333,20 @@ public abstract class AMSimulator extends TaskRunner.Task { private void trackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()) - .addTrackedApp(appId, oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.addTrackedApp(appId, oldAppId); + } } } public void untrackApp() { if (isTracked) { - ((SchedulerWrapper) rm.getResourceScheduler()).removeTrackedApp(oldAppId); + SchedulerMetrics schedulerMetrics = + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + if (schedulerMetrics != null) { + schedulerMetrics.removeTrackedApp(oldAppId); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java deleted file mode 100644 index a4b8e64..0000000 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ /dev/null @@ -1,969 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.scheduler; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; -import org.apache.hadoop.yarn.sls.SLSRunner; -import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Logger; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; -import com.codahale.metrics.Timer; - -@Private -@Unstable -final public class ResourceSchedulerWrapper - extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> - implements SchedulerWrapper, ResourceScheduler, Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; - private List<Histogram> schedulerHistogramList; - private Map<Histogram, Timer> histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; - - private Configuration conf; - private ResourceScheduler scheduler; - private Map<ApplicationId, String> appQueueMap = - new ConcurrentHashMap<ApplicationId, String>(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; - - private Map<ContainerId, Resource> preemptionContainerMap = - new ConcurrentHashMap<ContainerId, Resource>(); - - // metrics - private MetricRegistry metrics; - private SchedulerMetrics schedulerMetrics; - private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map<Class, Class> defaultSchedulerMetricsMap = - new HashMap<Class, Class>(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set<String> queueSet; - private Set<String> trackedAppSet; - - public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); - - public ResourceSchedulerWrapper() { - super(ResourceSchedulerWrapper.class.getName()); - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - // set scheduler - Class<? extends ResourceScheduler> klass = conf.getClass( - SLSConfiguration.RM_SCHEDULER, null, ResourceScheduler.class); - - scheduler = ReflectionUtils.newInstance(klass, conf); - // start metrics - metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); - if (metricsON) { - try { - initMetrics(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); - } - if (web != null) { - web.stop(); - } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SHUTDOWN_HOOK_PRIORITY); - } - - @Override - public Allocation allocate(ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, - List<String> strings, List<String> strings2, - ContainerUpdates updateRequests) { - if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); - Allocation allocation = null; - try { - allocation = scheduler.allocate(attemptId, resourceRequests, - containerIds, strings, strings2, updateRequests); - return allocation; - } finally { - context.stop(); - schedulerAllocateCounter.inc(); - try { - updateQueueWithAllocateRequest(allocation, attemptId, - resourceRequests, containerIds); - } catch (IOException e) { - e.printStackTrace(); - } - } - } else { - return scheduler.allocate(attemptId, - resourceRequests, containerIds, strings, strings2, updateRequests); - } - } - - @Override - public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - scheduler.handle(schedulerEvent); - return; - } - if(!running) running = true; - - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; - - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId.getApplicationId()); - SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemorySize(), - rmc.getContainer().getResource().getVirtualCores()); - } - } - - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); - - scheduler.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); - - if (schedulerEvent.getType() == SchedulerEventType.APP_REMOVED - && schedulerEvent instanceof AppRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppRemovedSchedulerEvent appRemoveEvent = - (AppRemovedSchedulerEvent) schedulerEvent; - appQueueMap.remove(appRemoveEvent.getApplicationID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED - && schedulerEvent instanceof AppAddedSchedulerEvent) { - AppAddedSchedulerEvent appAddEvent = - (AppAddedSchedulerEvent) schedulerEvent; - String queueName = appAddEvent.getQueue(); - appQueueMap.put(appAddEvent.getApplicationId(), queueName); - } - } - } - - private void updateQueueWithNodeUpdate( - NodeUpdateSchedulerEventWrapper eventWrapper) { - RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); - List<UpdatedContainerInfo> containerList = node.getContainerUpdates(); - for (UpdatedContainerInfo info : containerList) { - for (ContainerStatus status : info.getCompletedContainers()) { - ContainerId containerId = status.getContainerId(); - SchedulerAppReport app = scheduler.getSchedulerAppInfo( - containerId.getApplicationAttemptId()); - - if (app == null) { - // this happens for the AM container - // The app have already removed when the NM sends the release - // information. - continue; - } - - String queue = - appQueueMap.get(containerId.getApplicationAttemptId() - .getApplicationId()); - int releasedMemory = 0, releasedVCores = 0; - if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { - for (RMContainer rmc : app.getLiveContainers()) { - if (rmc.getContainerId() == containerId) { - releasedMemory += rmc.getContainer().getResource().getMemorySize(); - releasedVCores += rmc.getContainer() - .getResource().getVirtualCores(); - break; - } - } - } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { - if (preemptionContainerMap.containsKey(containerId)) { - Resource preResource = preemptionContainerMap.get(containerId); - releasedMemory += preResource.getMemorySize(); - releasedVCores += preResource.getVirtualCores(); - preemptionContainerMap.remove(containerId); - } - } - // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); - } - } - } - - private void updateQueueWithAllocateRequest(Allocation allocation, - ApplicationAttemptId attemptId, - List<ResourceRequest> resourceRequests, - List<ContainerId> containerIds) throws IOException { - // update queue information - Resource pendingResource = Resources.createResource(0, 0); - Resource allocatedResource = Resources.createResource(0, 0); - String queueName = appQueueMap.get(attemptId.getApplicationId()); - // container requested - for (ResourceRequest request : resourceRequests) { - if (request.getResourceName().equals(ResourceRequest.ANY)) { - Resources.addTo(pendingResource, - Resources.multiply(request.getCapability(), - request.getNumContainers())); - } - } - // container allocated - for (Container container : allocation.getContainers()) { - Resources.addTo(allocatedResource, container.getResource()); - Resources.subtractFrom(pendingResource, container.getResource()); - } - // container released from AM - SchedulerAppReport report = scheduler.getSchedulerAppInfo(attemptId); - for (ContainerId containerId : containerIds) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released allocated containers - Resources.subtractFrom(allocatedResource, container.getResource()); - } else { - for (RMContainer c : report.getReservedContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - // released reserved containers - Resources.subtractFrom(pendingResource, container.getResource()); - } - } - } - // containers released/preemption from scheduler - Set<ContainerId> preemptionContainers = new HashSet<ContainerId>(); - if (allocation.getContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getContainerPreemptions()); - } - if (allocation.getStrictContainerPreemptions() != null) { - preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); - } - if (! preemptionContainers.isEmpty()) { - for (ContainerId containerId : preemptionContainers) { - if (! preemptionContainerMap.containsKey(containerId)) { - Container container = null; - for (RMContainer c : report.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - container = c.getContainer(); - break; - } - } - if (container != null) { - preemptionContainerMap.put(containerId, container.getResource()); - } - } - - } - } - - // update metrics - SortedMap<String, Counter> counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - long values[] = new long[]{pendingResource.getMemorySize(), - pendingResource.getVirtualCores(), - allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings("unchecked") - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(scheduler.getClass().getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(scheduler.getClass()) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(scheduler, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), "UTF-8")); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0L; - } else { - return scheduler.getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); - } - - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if (scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(scheduler == null || scheduler.getRootQueueMetrics() == null) { - return 0; - } else { - return scheduler.getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList<Histogram>(); - histogramTimerMap = new HashMap<Histogram, Timer>(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); - } - } - - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory " + dir.getAbsoluteFile()); - } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); - } - - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), "UTF-8")); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - if (metricsON) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void updateQueueMetrics(String queue, - long releasedMemory, int releasedVCores) { - // update queue counters - SortedMap<String, Counter> counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } - } - - public void setQueueSet(Set<String> queues) { - this.queueSet = queues; - } - - public Set<String> getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set<String> apps) { - this.trackedAppSet = apps; - } - - public Set<String> getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; - } - - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - // API open to out classes - public void addTrackedApp(ApplicationId appId, String oldAppId) { - if (metricsON) { - schedulerMetrics.trackApp(appId, oldAppId); - } - } - - public void removeTrackedApp(String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(oldAppId); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @SuppressWarnings("unchecked") - @Override - public void serviceInit(Configuration conf) throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).init(conf); - super.serviceInit(conf); - initScheduler(conf); - } - - private synchronized void initScheduler(Configuration configuration) throws - IOException { - this.applications = - new ConcurrentHashMap<ApplicationId, - SchedulerApplication<SchedulerApplicationAttempt>>(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStart() throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).start(); - super.serviceStart(); - } - - @SuppressWarnings("unchecked") - @Override - public void serviceStop() throws Exception { - ((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) - scheduler).stop(); - super.serviceStop(); - } - - @Override - public void setRMContext(RMContext rmContext) { - scheduler.setRMContext(rmContext); - } - - @Override - public void reinitialize(Configuration conf, RMContext rmContext) - throws IOException { - scheduler.reinitialize(conf, rmContext); - } - - @Override - public void recover(RMStateStore.RMState rmState) throws Exception { - scheduler.recover(rmState); - } - - @Override - public QueueInfo getQueueInfo(String s, boolean b, boolean b2) - throws IOException { - return scheduler.getQueueInfo(s, b, b2); - } - - @Override - public List<QueueUserACLInfo> getQueueUserAclInfo() { - return scheduler.getQueueUserAclInfo(); - } - - @Override - public Resource getMinimumResourceCapability() { - return scheduler.getMinimumResourceCapability(); - } - - @Override - public Resource getMaximumResourceCapability() { - return scheduler.getMaximumResourceCapability(); - } - - @Override - public ResourceCalculator getResourceCalculator() { - return scheduler.getResourceCalculator(); - } - - @Override - public int getNumClusterNodes() { - return scheduler.getNumClusterNodes(); - } - - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - return scheduler.getNodeReport(nodeId); - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId attemptId) { - return scheduler.getSchedulerAppInfo(attemptId); - } - - @Override - public QueueMetrics getRootQueueMetrics() { - return scheduler.getRootQueueMetrics(); - } - - @Override - public synchronized boolean checkAccess(UserGroupInformation callerUGI, - QueueACL acl, String queueName) { - return scheduler.checkAccess(callerUGI, acl, queueName); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - return scheduler.getAppResourceUsageReport(appAttemptId); - } - - @Override - public List<ApplicationAttemptId> getAppsInQueue(String queue) { - return scheduler.getAppsInQueue(queue); - } - - @Override - public RMContainer getRMContainer(ContainerId containerId) { - return null; - } - - @Override - public String moveApplication(ApplicationId appId, String newQueue) - throws YarnException { - return scheduler.moveApplication(appId, newQueue); - } - - @Override - @LimitedPrivate("yarn") - @Unstable - public Resource getClusterResource() { - return super.getClusterResource(); - } - - @Override - public synchronized List<Container> getTransferredContainers( - ApplicationAttemptId currentAttempt) { - return new ArrayList<Container>(); - } - - @Override - public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> - getSchedulerApplications() { - return new HashMap<ApplicationId, - SchedulerApplication<SchedulerApplicationAttempt>>(); - } - - @Override - protected void completedContainerInternal(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - // do nothing - } - - @Override - public Priority checkAndGetApplicationPriority(Priority priority, - UserGroupInformation user, String queueName, ApplicationId applicationId) - throws YarnException { - // TODO Dummy implementation. - return Priority.newInstance(0); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6ea2ab0..7c37465 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -17,34 +17,19 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -65,117 +50,63 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.log4j.Logger; -import com.codahale.metrics.Counter; -import com.codahale.metrics.CsvReporter; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; @Private @Unstable public class SLSCapacityScheduler extends CapacityScheduler implements SchedulerWrapper,Configurable { - private static final String EOL = System.getProperty("line.separator"); - private static final String QUEUE_COUNTER_PREFIX = "counter.queue."; - private static final int SAMPLING_SIZE = 60; - private ScheduledExecutorService pool; - // counters for scheduler allocate/handle operations - private Counter schedulerAllocateCounter; - private Counter schedulerHandleCounter; - private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; - // Timers for scheduler allocate/handle operations - private Timer schedulerAllocateTimer; - private Timer schedulerHandleTimer; - private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; - private List<Histogram> schedulerHistogramList; - private Map<Histogram, Timer> histogramTimerMap; - private Lock samplerLock; - private Lock queueLock; private Configuration conf; private Map<ApplicationAttemptId, String> appQueueMap = new ConcurrentHashMap<ApplicationAttemptId, String>(); - private BufferedWriter jobRuntimeLogBW; - - // Priority of the ResourceSchedulerWrapper shutdown hook. - public static final int SHUTDOWN_HOOK_PRIORITY = 30; - - // web app - private SLSWebApp web; private Map<ContainerId, Resource> preemptionContainerMap = new ConcurrentHashMap<ContainerId, Resource>(); // metrics - private MetricRegistry metrics; private SchedulerMetrics schedulerMetrics; private boolean metricsON; - private String metricsOutputDir; - private BufferedWriter metricsLogBW; - private boolean running = false; - private static Map<Class, Class> defaultSchedulerMetricsMap = - new HashMap<Class, Class>(); - static { - defaultSchedulerMetricsMap.put(FairScheduler.class, - FairSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(FifoScheduler.class, - FifoSchedulerMetrics.class); - defaultSchedulerMetricsMap.put(CapacityScheduler.class, - CapacitySchedulerMetrics.class); - } - // must set by outside - private Set<String> queueSet; - private Set<String> trackedAppSet; + private Tracker tracker; - public final Logger LOG = Logger.getLogger(SLSCapacityScheduler.class); + public Tracker getTracker() { + return tracker; + } public SLSCapacityScheduler() { - samplerLock = new ReentrantLock(); - queueLock = new ReentrantLock(); + tracker = new Tracker(); } @Override public void setConf(Configuration conf) { this.conf = conf; super.setConf(conf); - // start metrics metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); if (metricsON) { try { - initMetrics(); + schedulerMetrics = SchedulerMetrics.getInstance(conf, + CapacityScheduler.class); + schedulerMetrics.init(this, conf); } catch (Exception e) { e.printStackTrace(); } - } - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - try { - if (metricsLogBW != null) { - metricsLogBW.write("]"); - metricsLogBW.close(); - } - if (web != null) { - web.stop(); + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override public void run() { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); } - tearDown(); - } catch (Exception e) { - e.printStackTrace(); } - } - }, SHUTDOWN_HOOK_PRIORITY); + }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); + } } @Override @@ -184,7 +115,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements List<String> strings, List<String> strings2, ContainerUpdates updateRequests) { if (metricsON) { - final Timer.Context context = schedulerAllocateTimer.time(); + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); Allocation allocation = null; try { allocation = super @@ -193,7 +125,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements return allocation; } finally { context.stop(); - schedulerAllocateCounter.inc(); + schedulerMetrics.increaseSchedulerAllocationCounter(); try { updateQueueWithAllocateRequest(allocation, attemptId, resourceRequests, containerIds); @@ -209,74 +141,76 @@ public class SLSCapacityScheduler extends CapacityScheduler implements @Override public void handle(SchedulerEvent schedulerEvent) { - // metrics off - if (! metricsON) { - super.handle(schedulerEvent); - return; - } - if(!running) running = true; + if (!metricsON) { + super.handle(schedulerEvent); + return; + } - // metrics on - Timer.Context handlerTimer = null; - Timer.Context operationTimer = null; + if (!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } - NodeUpdateSchedulerEventWrapper eventWrapper; - try { - //if (schedulerEvent instanceof NodeUpdateSchedulerEvent) { - if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE - && schedulerEvent instanceof NodeUpdateSchedulerEvent) { - eventWrapper = new NodeUpdateSchedulerEventWrapper( - (NodeUpdateSchedulerEvent)schedulerEvent); - schedulerEvent = eventWrapper; - updateQueueWithNodeUpdate(eventWrapper); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - // check if having AM Container, update resource usage information - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - String queue = appQueueMap.get(appAttemptId); - SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); - if (! app.getLiveContainers().isEmpty()) { // have 0 or 1 - // should have one container which is AM container - RMContainer rmc = app.getLiveContainers().iterator().next(); - updateQueueMetrics(queue, - rmc.getContainer().getResource().getMemorySize(), - rmc.getContainer().getResource().getVirtualCores()); - } - } + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; - handlerTimer = schedulerHandleTimer.time(); - operationTimer = schedulerHandleTimerMap - .get(schedulerEvent.getType()).time(); + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queue = appQueueMap.get(appAttemptId); + SchedulerAppReport app = super.getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queue); + } + } - super.handle(schedulerEvent); - } finally { - if (handlerTimer != null) handlerTimer.stop(); - if (operationTimer != null) operationTimer.stop(); - schedulerHandleCounter.inc(); - schedulerHandleCounterMap.get(schedulerEvent.getType()).inc(); + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); - if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED - && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { - SLSRunner.decreaseRemainingApps(); - AppAttemptRemovedSchedulerEvent appRemoveEvent = - (AppAttemptRemovedSchedulerEvent) schedulerEvent; - ApplicationAttemptId appAttemptId = - appRemoveEvent.getApplicationAttemptID(); - appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); - } else if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_ADDED - && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { - AppAttemptAddedSchedulerEvent appAddEvent = - (AppAttemptAddedSchedulerEvent) schedulerEvent; - SchedulerApplication app = - applications.get(appAddEvent.getApplicationAttemptId() + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + appQueueMap.remove(appRemoveEvent.getApplicationAttemptID()); + } else if (schedulerEvent.getType() == + SchedulerEventType.APP_ATTEMPT_ADDED + && schedulerEvent instanceof AppAttemptAddedSchedulerEvent) { + AppAttemptAddedSchedulerEvent appAddEvent = + (AppAttemptAddedSchedulerEvent) schedulerEvent; + SchedulerApplication app = + applications.get(appAddEvent.getApplicationAttemptId() .getApplicationId()); - appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() - .getQueueName()); - } - } + appQueueMap.put(appAddEvent.getApplicationAttemptId(), app.getQueue() + .getQueueName()); + } + } } private void updateQueueWithNodeUpdate( @@ -316,7 +250,8 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } // update queue counters - updateQueueMetrics(queue, releasedMemory, releasedVCores); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); } } } @@ -395,410 +330,13 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } // update metrics - SortedMap<String, Counter> counterMap = metrics.getCounters(); - String names[] = new String[]{ - "counter.queue." + queueName + ".pending.memory", - "counter.queue." + queueName + ".pending.cores", - "counter.queue." + queueName + ".allocated.memory", - "counter.queue." + queueName + ".allocated.cores"}; - long values[] = new long[]{pendingResource.getMemorySize(), - pendingResource.getVirtualCores(), - allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()}; - for (int i = names.length - 1; i >= 0; i --) { - if (! counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - counterMap.get(names[i]).inc(values[i]); - } - - queueLock.lock(); - try { - if (! schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - } - - private void tearDown() throws IOException { - // close job runtime writer - if (jobRuntimeLogBW != null) { - jobRuntimeLogBW.close(); - } - // shut pool - if (pool != null) pool.shutdown(); - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - private void initMetrics() throws Exception { - metrics = new MetricRegistry(); - // configuration - metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); - int metricsWebAddressPort = conf.getInt( - SLSConfiguration.METRICS_WEB_ADDRESS_PORT, - SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); - // create SchedulerMetrics for current scheduler - String schedulerMetricsType = conf.get(CapacityScheduler.class.getName()); - Class schedulerMetricsClass = schedulerMetricsType == null? - defaultSchedulerMetricsMap.get(CapacityScheduler.class) : - Class.forName(schedulerMetricsType); - schedulerMetrics = (SchedulerMetrics)ReflectionUtils - .newInstance(schedulerMetricsClass, new Configuration()); - schedulerMetrics.init(this, metrics); - - // register various metrics - registerJvmMetrics(); - registerClusterResourceMetrics(); - registerContainerAppNumMetrics(); - registerSchedulerMetrics(); - - // .csv output - initMetricsCSVOutput(); - - // start web app to provide real-time tracking - web = new SLSWebApp(this, metricsWebAddressPort); - web.start(); - - // a thread to update histogram timer - pool = new ScheduledThreadPoolExecutor(2); - pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // a thread to output metrics for real-tiem tracking - pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, - TimeUnit.MILLISECONDS); - - // application running information - jobRuntimeLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/jobruntime.csv"), "UTF-8")); - jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + - "simulate_start_time,simulate_end_time" + EOL); - jobRuntimeLogBW.flush(); - } - - private void registerJvmMetrics() { - // add JVM gauges - metrics.register("variable.jvm.free.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().freeMemory(); - } - } - ); - metrics.register("variable.jvm.max.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().maxMemory(); - } - } - ); - metrics.register("variable.jvm.total.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - return Runtime.getRuntime().totalMemory(); - } - } - ); - } - - private void registerClusterResourceMetrics() { - metrics.register("variable.cluster.allocated.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if( getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAllocatedMB(); - } - } - } - ); - metrics.register("variable.cluster.allocated.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.available.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getAvailableMB(); - } - } - } - ); - metrics.register("variable.cluster.available.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAvailableVirtualCores(); - } - } - } - ); - metrics.register("variable.cluster.reserved.memory", - new Gauge<Long>() { - @Override - public Long getValue() { - if(getRootQueueMetrics() == null) { - return 0L; - } else { - return getRootQueueMetrics().getReservedMB(); - } - } - } - ); - metrics.register("variable.cluster.reserved.vcores", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getReservedVirtualCores(); - } - } - } - ); - } - - private void registerContainerAppNumMetrics() { - metrics.register("variable.running.application", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAppsRunning(); - } - } - } - ); - metrics.register("variable.running.container", - new Gauge<Integer>() { - @Override - public Integer getValue() { - if(getRootQueueMetrics() == null) { - return 0; - } else { - return getRootQueueMetrics().getAllocatedContainers(); - } - } - } - ); - } - - private void registerSchedulerMetrics() { - samplerLock.lock(); - try { - // counters for scheduler operations - schedulerAllocateCounter = metrics.counter( - "counter.scheduler.operation.allocate"); - schedulerHandleCounter = metrics.counter( - "counter.scheduler.operation.handle"); - schedulerHandleCounterMap = new HashMap<SchedulerEventType, Counter>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Counter counter = metrics.counter( - "counter.scheduler.operation.handle." + e); - schedulerHandleCounterMap.put(e, counter); - } - // timers for scheduler operations - int timeWindowSize = conf.getInt( - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, - SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); - schedulerAllocateTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimer = new Timer( - new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap = new HashMap<SchedulerEventType, Timer>(); - for (SchedulerEventType e : SchedulerEventType.values()) { - Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); - schedulerHandleTimerMap.put(e, timer); - } - // histogram for scheduler operations (Samplers) - schedulerHistogramList = new ArrayList<Histogram>(); - histogramTimerMap = new HashMap<Histogram, Timer>(); - Histogram schedulerAllocateHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.allocate.timecost", - schedulerAllocateHistogram); - schedulerHistogramList.add(schedulerAllocateHistogram); - histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); - Histogram schedulerHandleHistogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register("sampler.scheduler.operation.handle.timecost", - schedulerHandleHistogram); - schedulerHistogramList.add(schedulerHandleHistogram); - histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); - for (SchedulerEventType e : SchedulerEventType.values()) { - Histogram histogram = new Histogram( - new SlidingWindowReservoir(SAMPLING_SIZE)); - metrics.register( - "sampler.scheduler.operation.handle." + e + ".timecost", - histogram); - schedulerHistogramList.add(histogram); - histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); - } - } finally { - samplerLock.unlock(); - } - } - - private void initMetricsCSVOutput() { - int timeIntervalMS = conf.getInt( - SLSConfiguration.METRICS_RECORD_INTERVAL_MS, - SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); - File dir = new File(metricsOutputDir + "/metrics"); - if(! dir.exists() - && ! dir.mkdirs()) { - LOG.error("Cannot create directory " + dir.getAbsoluteFile()); - } - final CsvReporter reporter = CsvReporter.forRegistry(metrics) - .formatFor(Locale.US) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(new File(metricsOutputDir + "/metrics")); - reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); - } - - class HistogramsRunnable implements Runnable { - @Override - public void run() { - samplerLock.lock(); - try { - for (Histogram histogram : schedulerHistogramList) { - Timer timer = histogramTimerMap.get(histogram); - histogram.update((int) timer.getSnapshot().getMean()); - } - } finally { - samplerLock.unlock(); - } - } - } - - class MetricsLogRunnable implements Runnable { - private boolean firstLine = true; - public MetricsLogRunnable() { - try { - metricsLogBW = - new BufferedWriter(new OutputStreamWriter(new FileOutputStream( - metricsOutputDir + "/realtimetrack.json"), "UTF-8")); - metricsLogBW.write("["); - } catch (IOException e) { - e.printStackTrace(); - } - } - - @Override - public void run() { - if(running) { - // all WebApp to get real tracking json - String metrics = web.generateRealTimeTrackingMetrics(); - // output - try { - if(firstLine) { - metricsLogBW.write(metrics + EOL); - firstLine = false; - } else { - metricsLogBW.write("," + metrics + EOL); - } - metricsLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - } - - // the following functions are used by AMSimulator - public void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS) { - - if (metricsON) { - try { - // write job runtime information - StringBuilder sb = new StringBuilder(); - sb.append(appId).append(",").append(traceStartTimeMS).append(",") - .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) - .append(",").append(simulateEndTimeMS); - jobRuntimeLogBW.write(sb.toString() + EOL); - jobRuntimeLogBW.flush(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - private void updateQueueMetrics(String queue, - long releasedMemory, int releasedVCores) { - // update queue counters - SortedMap<String, Counter> counterMap = metrics.getCounters(); - if (releasedMemory != 0) { - String name = "counter.queue." + queue + ".allocated.memory"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedMemory); - } - if (releasedVCores != 0) { - String name = "counter.queue." + queue + ".allocated.cores"; - if (! counterMap.containsKey(name)) { - metrics.counter(name); - counterMap = metrics.getCounters(); - } - counterMap.get(name).inc(-releasedVCores); - } + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); } private void initQueueMetrics(CSQueue queue) { if (queue instanceof LeafQueue) { - SortedMap<String, Counter> counterMap = metrics.getCounters(); - String queueName = queue.getQueueName(); - String[] names = new String[]{ - QUEUE_COUNTER_PREFIX + queueName + ".pending.memory", - QUEUE_COUNTER_PREFIX + queueName + ".pending.cores", - QUEUE_COUNTER_PREFIX + queueName + ".allocated.memory", - QUEUE_COUNTER_PREFIX + queueName + ".allocated.cores" }; - - for (int i = names.length - 1; i >= 0; i--) { - if (!counterMap.containsKey(names[i])) { - metrics.counter(names[i]); - counterMap = metrics.getCounters(); - } - } - - queueLock.lock(); - try { - if (!schedulerMetrics.isTracked(queueName)) { - schedulerMetrics.trackQueue(queueName); - } - } finally { - queueLock.unlock(); - } - + schedulerMetrics.initQueueMetric(queue.getQueueName()); return; } @@ -811,54 +349,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements public void serviceInit(Configuration configuration) throws Exception { super.serviceInit(configuration); - initQueueMetrics(getRootQueue()); - } - - public void setQueueSet(Set<String> queues) { - this.queueSet = queues; - } - - public Set<String> getQueueSet() { - return this.queueSet; - } - - public void setTrackedAppSet(Set<String> apps) { - this.trackedAppSet = apps; - } - - public Set<String> getTrackedAppSet() { - return this.trackedAppSet; - } - - public MetricRegistry getMetrics() { - return metrics; - } - - public SchedulerMetrics getSchedulerMetrics() { - return schedulerMetrics; - } - - // API open to out classes - public void addTrackedApp(ApplicationId appId, - String oldAppId) { if (metricsON) { - schedulerMetrics.trackApp(appId, oldAppId); + initQueueMetrics(getRootQueue()); } } - public void removeTrackedApp(String oldAppId) { - if (metricsON) { - schedulerMetrics.untrackApp(oldAppId); - } + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; } @Override public Configuration getConf() { return conf; } - - - - -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5cfe3957/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java new file mode 100644 index 0000000..572dacf --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Private +@Unstable +public class SLSFairScheduler extends FairScheduler + implements SchedulerWrapper, Configurable { + private SchedulerMetrics schedulerMetrics; + private boolean metricsON; + private Tracker tracker; + + private Map<ContainerId, Resource> preemptionContainerMap = + new ConcurrentHashMap<>(); + + public SchedulerMetrics getSchedulerMetrics() { + return schedulerMetrics; + } + + public Tracker getTracker() { + return tracker; + } + + public SLSFairScheduler() { + tracker = new Tracker(); + } + + @Override + public void setConf(Configuration conf) { + super.setConfig(conf); + + metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true); + if (metricsON) { + try { + schedulerMetrics = SchedulerMetrics.getInstance(conf, + FairScheduler.class); + schedulerMetrics.init(this, conf); + } catch (Exception e) { + e.printStackTrace(); + } + + ShutdownHookManager.get().addShutdownHook(new Runnable() { + @Override public void run() { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); + } + } + + @Override + public Allocation allocate(ApplicationAttemptId attemptId, + List<ResourceRequest> resourceRequests, List<ContainerId> containerIds, + List<String> blacklistAdditions, List<String> blacklistRemovals, + ContainerUpdates updateRequests) { + if (metricsON) { + final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() + .time(); + Allocation allocation = null; + try { + allocation = super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + return allocation; + } finally { + context.stop(); + schedulerMetrics.increaseSchedulerAllocationCounter(); + try { + updateQueueWithAllocateRequest(allocation, attemptId, + resourceRequests, containerIds); + } catch (IOException e) { + e.printStackTrace(); + } + } + } else { + return super.allocate(attemptId, resourceRequests, containerIds, + blacklistAdditions, blacklistRemovals, updateRequests); + } + } + + @Override + public void handle(SchedulerEvent schedulerEvent) { + // metrics off + if (!metricsON) { + super.handle(schedulerEvent); + return; + } + + // metrics on + if(!schedulerMetrics.isRunning()) { + schedulerMetrics.setRunning(true); + } + + Timer.Context handlerTimer = null; + Timer.Context operationTimer = null; + + NodeUpdateSchedulerEventWrapper eventWrapper; + try { + if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE + && schedulerEvent instanceof NodeUpdateSchedulerEvent) { + eventWrapper = new NodeUpdateSchedulerEventWrapper( + (NodeUpdateSchedulerEvent)schedulerEvent); + schedulerEvent = eventWrapper; + updateQueueWithNodeUpdate(eventWrapper); + } else if ( + schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + // check if having AM Container, update resource usage information + AppAttemptRemovedSchedulerEvent appRemoveEvent = + (AppAttemptRemovedSchedulerEvent) schedulerEvent; + ApplicationAttemptId appAttemptId = + appRemoveEvent.getApplicationAttemptID(); + String queueName = getSchedulerApp(appAttemptId).getQueue().getName(); + SchedulerAppReport app = getSchedulerAppInfo(appAttemptId); + if (!app.getLiveContainers().isEmpty()) { // have 0 or 1 + // should have one container which is AM container + RMContainer rmc = app.getLiveContainers().iterator().next(); + schedulerMetrics.updateQueueMetricsByRelease( + rmc.getContainer().getResource(), queueName); + } + } + + handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time(); + operationTimer = schedulerMetrics.getSchedulerHandleTimer( + schedulerEvent.getType()).time(); + + super.handle(schedulerEvent); + } finally { + if (handlerTimer != null) { + handlerTimer.stop(); + } + if (operationTimer != null) { + operationTimer.stop(); + } + schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType()); + + if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED + && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) { + SLSRunner.decreaseRemainingApps(); + } + } + } + + private void updateQueueWithNodeUpdate( + NodeUpdateSchedulerEventWrapper eventWrapper) { + RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode(); + List<UpdatedContainerInfo> containerList = node.getContainerUpdates(); + for (UpdatedContainerInfo info : containerList) { + for (ContainerStatus status : info.getCompletedContainers()) { + ContainerId containerId = status.getContainerId(); + SchedulerAppReport app = super.getSchedulerAppInfo( + containerId.getApplicationAttemptId()); + + if (app == null) { + // this happens for the AM container + // The app have already removed when the NM sends the release + // information. + continue; + } + + int releasedMemory = 0, releasedVCores = 0; + if (status.getExitStatus() == ContainerExitStatus.SUCCESS) { + for (RMContainer rmc : app.getLiveContainers()) { + if (rmc.getContainerId() == containerId) { + Resource resource = rmc.getContainer().getResource(); + releasedMemory += resource.getMemorySize(); + releasedVCores += resource.getVirtualCores(); + break; + } + } + } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) { + if (preemptionContainerMap.containsKey(containerId)) { + Resource preResource = preemptionContainerMap.get(containerId); + releasedMemory += preResource.getMemorySize(); + releasedVCores += preResource.getVirtualCores(); + preemptionContainerMap.remove(containerId); + } + } + // update queue counters + String queue = getSchedulerApp(containerId.getApplicationAttemptId()). + getQueueName(); + schedulerMetrics.updateQueueMetricsByRelease( + Resource.newInstance(releasedMemory, releasedVCores), queue); + } + } + } + + private void updateQueueWithAllocateRequest(Allocation allocation, + ApplicationAttemptId attemptId, + List<ResourceRequest> resourceRequests, + List<ContainerId> containerIds) throws IOException { + // update queue information + Resource pendingResource = Resources.createResource(0, 0); + Resource allocatedResource = Resources.createResource(0, 0); + // container requested + for (ResourceRequest request : resourceRequests) { + if (request.getResourceName().equals(ResourceRequest.ANY)) { + Resources.addTo(pendingResource, + Resources.multiply(request.getCapability(), + request.getNumContainers())); + } + } + // container allocated + for (Container container : allocation.getContainers()) { + Resources.addTo(allocatedResource, container.getResource()); + Resources.subtractFrom(pendingResource, container.getResource()); + } + // container released from AM + SchedulerAppReport report = super.getSchedulerAppInfo(attemptId); + for (ContainerId containerId : containerIds) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released allocated containers + Resources.subtractFrom(allocatedResource, container.getResource()); + } else { + for (RMContainer c : report.getReservedContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + // released reserved containers + Resources.subtractFrom(pendingResource, container.getResource()); + } + } + } + // containers released/preemption from scheduler + Set<ContainerId> preemptionContainers = new HashSet<ContainerId>(); + if (allocation.getContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getContainerPreemptions()); + } + if (allocation.getStrictContainerPreemptions() != null) { + preemptionContainers.addAll(allocation.getStrictContainerPreemptions()); + } + if (!preemptionContainers.isEmpty()) { + for (ContainerId containerId : preemptionContainers) { + if (!preemptionContainerMap.containsKey(containerId)) { + Container container = null; + for (RMContainer c : report.getLiveContainers()) { + if (c.getContainerId().equals(containerId)) { + container = c.getContainer(); + break; + } + } + if (container != null) { + preemptionContainerMap.put(containerId, container.getResource()); + } + } + + } + } + + // update metrics + String queueName = getSchedulerApp(attemptId).getQueueName(); + schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource, + queueName); + } + + private void initQueueMetrics(FSQueue queue) { + if (queue instanceof FSLeafQueue) { + schedulerMetrics.initQueueMetric(queue.getQueueName()); + return; + } + + for (FSQueue child : queue.getChildQueues()) { + initQueueMetrics(child); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + if (metricsON) { + initQueueMetrics(getQueueManager().getRootQueue()); + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org