http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java index 8645a69..a8792e8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java @@ -18,40 +18,170 @@ package org.apache.hadoop.yarn.sls.scheduler; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedMap; +import java.util.Locale; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; +import com.codahale.metrics.Counter; +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Timer; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.web.SLSWebApp; +import org.apache.log4j.Logger; @Private @Unstable public abstract class SchedulerMetrics { + private static final String EOL = System.getProperty("line.separator"); + private static final int SAMPLING_SIZE = 60; + private static final Logger LOG = Logger.getLogger(SchedulerMetrics.class); + protected ResourceScheduler scheduler; protected Set<String> trackedQueues; protected MetricRegistry metrics; protected Set<String> appTrackedMetrics; protected Set<String> queueTrackedMetrics; - + + private Configuration conf; + private ScheduledExecutorService pool; + private SLSWebApp web; + + // metrics + private String metricsOutputDir; + private BufferedWriter metricsLogBW; + private BufferedWriter jobRuntimeLogBW; + private boolean running = false; + + // counters for scheduler allocate/handle operations + private Counter schedulerAllocateCounter; + private Counter schedulerHandleCounter; + private Map<SchedulerEventType, Counter> schedulerHandleCounterMap; + + // Timers for scheduler allocate/handle operations + private Timer schedulerAllocateTimer; + private Timer schedulerHandleTimer; + private Map<SchedulerEventType, Timer> schedulerHandleTimerMap; + private List<Histogram> schedulerHistogramList; + private Map<Histogram, Timer> histogramTimerMap; + private Lock samplerLock; + private Lock queueLock; + + static Class getSchedulerMetricsClass(Configuration conf, + Class schedulerClass) throws ClassNotFoundException { + Class metricClass = null; + String schedulerMetricsType = conf.get(schedulerClass.getName()); + if (schedulerMetricsType != null) { + metricClass = Class.forName(schedulerMetricsType); + } + + if (schedulerClass.equals(FairScheduler.class)) { + metricClass = FairSchedulerMetrics.class; + } else if (schedulerClass.equals(CapacityScheduler.class)) { + metricClass = CapacitySchedulerMetrics.class; + } else if (schedulerClass.equals(FifoScheduler.class)) { + metricClass = FifoSchedulerMetrics.class; + } + + return metricClass; + } + + static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass) + throws ClassNotFoundException { + Class schedulerMetricClass = getSchedulerMetricsClass(conf, schedulerClass); + return (SchedulerMetrics) ReflectionUtils + .newInstance(schedulerMetricClass, new Configuration()); + } + public SchedulerMetrics() { + metrics = new MetricRegistry(); + appTrackedMetrics = new HashSet<>(); appTrackedMetrics.add("live.containers"); appTrackedMetrics.add("reserved.containers"); + queueTrackedMetrics = new HashSet<>(); + trackedQueues = new HashSet<>(); + + samplerLock = new ReentrantLock(); + queueLock = new ReentrantLock(); } - - public void init(ResourceScheduler scheduler, MetricRegistry metrics) { - this.scheduler = scheduler; - this.trackedQueues = new HashSet<>(); - this.metrics = metrics; + + void init(ResourceScheduler resourceScheduler, Configuration config) + throws Exception { + this.scheduler = resourceScheduler; + this.conf = config; + + metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR); + + // register various metrics + registerJvmMetrics(); + registerClusterResourceMetrics(); + registerContainerAppNumMetrics(); + registerSchedulerMetrics(); + + // .csv output + initMetricsCSVOutput(); + + // start web app to provide real-time tracking + int metricsWebAddressPort = conf.getInt( + SLSConfiguration.METRICS_WEB_ADDRESS_PORT, + SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT); + web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort); + web.start(); + + // a thread to update histogram timer + pool = new ScheduledThreadPoolExecutor(2); + pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // a thread to output metrics for real-tiem tracking + pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000, + TimeUnit.MILLISECONDS); + + // application running information + jobRuntimeLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/jobruntime.csv"), "UTF-8")); + jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," + + "simulate_start_time,simulate_end_time" + EOL); + jobRuntimeLogBW.flush(); + } + + public MetricRegistry getMetrics() { + return metrics; } protected SchedulerApplicationAttempt getSchedulerAppAttempt( @@ -117,7 +247,392 @@ public abstract class SchedulerMetrics { public Set<String> getAppTrackedMetrics() { return appTrackedMetrics; } + public Set<String> getQueueTrackedMetrics() { return queueTrackedMetrics; } + + private void registerJvmMetrics() { + // add JVM gauges + metrics.register("variable.jvm.free.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().freeMemory(); + } + } + ); + metrics.register("variable.jvm.max.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().maxMemory(); + } + } + ); + metrics.register("variable.jvm.total.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + return Runtime.getRuntime().totalMemory(); + } + } + ); + } + + private void registerClusterResourceMetrics() { + metrics.register("variable.cluster.allocated.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAllocatedMB(); + } + } + } + ); + metrics.register("variable.cluster.allocated.vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); + } + } + } + ); + metrics.register("variable.cluster.available.memory", + new Gauge<Long>() { + @Override + public Long getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0L; + } else { + return scheduler.getRootQueueMetrics().getAvailableMB(); + } + } + } + ); + metrics.register("variable.cluster.available.vcores", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAvailableVirtualCores(); + } + } + } + ); + } + + private void registerContainerAppNumMetrics() { + metrics.register("variable.running.application", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAppsRunning(); + } + } + } + ); + metrics.register("variable.running.container", + new Gauge<Integer>() { + @Override + public Integer getValue() { + if (scheduler.getRootQueueMetrics() == null) { + return 0; + } else { + return scheduler.getRootQueueMetrics().getAllocatedContainers(); + } + } + } + ); + } + + private void registerSchedulerMetrics() { + samplerLock.lock(); + try { + // counters for scheduler operations + schedulerAllocateCounter = metrics.counter( + "counter.scheduler.operation.allocate"); + schedulerHandleCounter = metrics.counter( + "counter.scheduler.operation.handle"); + schedulerHandleCounterMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Counter counter = metrics.counter( + "counter.scheduler.operation.handle." + e); + schedulerHandleCounterMap.put(e, counter); + } + // timers for scheduler operations + int timeWindowSize = conf.getInt( + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE, + SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT); + schedulerAllocateTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimer = new Timer( + new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap = new HashMap<>(); + for (SchedulerEventType e : SchedulerEventType.values()) { + Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize)); + schedulerHandleTimerMap.put(e, timer); + } + // histogram for scheduler operations (Samplers) + schedulerHistogramList = new ArrayList<>(); + histogramTimerMap = new HashMap<>(); + Histogram schedulerAllocateHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.allocate.timecost", + schedulerAllocateHistogram); + schedulerHistogramList.add(schedulerAllocateHistogram); + histogramTimerMap.put(schedulerAllocateHistogram, schedulerAllocateTimer); + Histogram schedulerHandleHistogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register("sampler.scheduler.operation.handle.timecost", + schedulerHandleHistogram); + schedulerHistogramList.add(schedulerHandleHistogram); + histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer); + for (SchedulerEventType e : SchedulerEventType.values()) { + Histogram histogram = new Histogram( + new SlidingWindowReservoir(SAMPLING_SIZE)); + metrics.register( + "sampler.scheduler.operation.handle." + e + ".timecost", + histogram); + schedulerHistogramList.add(histogram); + histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e)); + } + } finally { + samplerLock.unlock(); + } + } + + private void initMetricsCSVOutput() { + int timeIntervalMS = conf.getInt( + SLSConfiguration.METRICS_RECORD_INTERVAL_MS, + SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT); + File dir = new File(metricsOutputDir + "/metrics"); + if(!dir.exists() && !dir.mkdirs()) { + LOG.error("Cannot create directory " + dir.getAbsoluteFile()); + } + final CsvReporter reporter = CsvReporter.forRegistry(metrics) + .formatFor(Locale.US) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(new File(metricsOutputDir + "/metrics")); + reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS); + } + + boolean isRunning() { + return running; + } + + void setRunning(boolean running) { + this.running = running; + } + + class HistogramsRunnable implements Runnable { + @Override + public void run() { + samplerLock.lock(); + try { + for (Histogram histogram : schedulerHistogramList) { + Timer timer = histogramTimerMap.get(histogram); + histogram.update((int) timer.getSnapshot().getMean()); + } + } finally { + samplerLock.unlock(); + } + } + } + + class MetricsLogRunnable implements Runnable { + private boolean firstLine = true; + + MetricsLogRunnable() { + try { + metricsLogBW = + new BufferedWriter(new OutputStreamWriter(new FileOutputStream( + metricsOutputDir + "/realtimetrack.json"), "UTF-8")); + metricsLogBW.write("["); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + + @Override + public void run() { + if(running) { + // all WebApp to get real tracking json + String trackingMetrics = web.generateRealTimeTrackingMetrics(); + // output + try { + if(firstLine) { + metricsLogBW.write(trackingMetrics + EOL); + firstLine = false; + } else { + metricsLogBW.write("," + trackingMetrics + EOL); + } + metricsLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } + } + } + + void tearDown() throws Exception { + if (metricsLogBW != null) { + metricsLogBW.write("]"); + metricsLogBW.close(); + } + + if (web != null) { + web.stop(); + } + + if (jobRuntimeLogBW != null) { + jobRuntimeLogBW.close(); + } + + if (pool != null) { + pool.shutdown(); + } + } + + void increaseSchedulerAllocationCounter() { + schedulerAllocateCounter.inc(); + } + + void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) { + schedulerHandleCounter.inc(); + schedulerHandleCounterMap.get(schedulerEventType).inc(); + } + + Timer getSchedulerAllocateTimer() { + return schedulerAllocateTimer; + } + + Timer getSchedulerHandleTimer() { + return schedulerHandleTimer; + } + + Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) { + return schedulerHandleTimerMap.get(schedulerEventType); + } + + private enum QueueMetric { + PENDING_MEMORY("pending.memory"), + PENDING_VCORES("pending.cores"), + ALLOCATED_MEMORY("allocated.memory"), + ALLOCATED_VCORES("allocated.cores"); + + private String value; + + QueueMetric(String value) { + this.value = value; + } + } + + private String getQueueMetricName(String queue, QueueMetric metric) { + return "counter.queue." + queue + "." + metric.value; + } + + private void traceQueueIfNotTraced(String queue) { + queueLock.lock(); + try { + if (!isTracked(queue)) { + trackQueue(queue); + } + } finally { + queueLock.unlock(); + } + } + + void initQueueMetric(String queueName){ + SortedMap<String, Counter> counterMap = metrics.getCounters(); + + for (QueueMetric queueMetric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, queueMetric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetrics(Resource pendingResource, Resource allocatedResource, + String queueName) { + SortedMap<String, Counter> counterMap = metrics.getCounters(); + for(QueueMetric metric : QueueMetric.values()) { + String metricName = getQueueMetricName(queueName, metric); + if (!counterMap.containsKey(metricName)) { + metrics.counter(metricName); + counterMap = metrics.getCounters(); + } + + if (metric == QueueMetric.PENDING_MEMORY) { + counterMap.get(metricName).inc(pendingResource.getMemorySize()); + } else if (metric == QueueMetric.PENDING_VCORES) { + counterMap.get(metricName).inc(pendingResource.getVirtualCores()); + } else if (metric == QueueMetric.ALLOCATED_MEMORY) { + counterMap.get(metricName).inc(allocatedResource.getMemorySize()); + } else if (metric == QueueMetric.ALLOCATED_VCORES){ + counterMap.get(metricName).inc(allocatedResource.getVirtualCores()); + } + } + + traceQueueIfNotTraced(queueName); + } + + void updateQueueMetricsByRelease(Resource releaseResource, String queue) { + SortedMap<String, Counter> counterMap = metrics.getCounters(); + String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY); + if (!counterMap.containsKey(name)) { + metrics.counter(name); + counterMap = metrics.getCounters(); + } + counterMap.get(name).inc(-releaseResource.getMemorySize()); + + String vcoreMetric = + getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES); + if (!counterMap.containsKey(vcoreMetric)) { + metrics.counter(vcoreMetric); + counterMap = metrics.getCounters(); + } + counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores()); + } + + public void addTrackedApp(ApplicationId appId, + String oldAppId) { + trackApp(appId, oldAppId); + } + + public void removeTrackedApp(String oldAppId) { + untrackApp(oldAppId); + } + + public void addAMRuntime(ApplicationId appId, long traceStartTimeMS, + long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) { + try { + // write job runtime information + StringBuilder sb = new StringBuilder(); + sb.append(appId).append(",").append(traceStartTimeMS).append(",") + .append(traceEndTimeMS).append(",").append(simulateStartTimeMS) + .append(",").append(simulateEndTimeMS); + jobRuntimeLogBW.write(sb.toString() + EOL); + jobRuntimeLogBW.flush(); + } catch (IOException e) { + LOG.info(e.getMessage()); + } + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java index 962b137..406f050 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java @@ -17,28 +17,12 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.util.Set; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import com.codahale.metrics.MetricRegistry; @Private @Unstable public interface SchedulerWrapper { - - MetricRegistry getMetrics(); SchedulerMetrics getSchedulerMetrics(); - Set<String> getQueueSet(); - void setQueueSet(Set<String> queues); - Set<String> getTrackedAppSet(); - void setTrackedAppSet(Set<String> apps); - void addTrackedApp(ApplicationId appId, String oldAppId); - void removeTrackedApp(String oldAppId); - void addAMRuntime(ApplicationId appId, - long traceStartTimeMS, long traceEndTimeMS, - long simulateStartTimeMS, long simulateEndTimeMS); - + Tracker getTracker(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java new file mode 100644 index 0000000..42a5c3c --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import java.util.Set; + +@Private +@Unstable +public class Tracker { + private Set<String> queueSet; + private Set<String> trackedAppSet; + + public void setQueueSet(Set<String> queues) { + queueSet = queues; + } + + public Set<String> getQueueSet() { + return queueSet; + } + + public void setTrackedAppSet(Set<String> apps) { + trackedAppSet = apps; + } + + public Set<String> getTrackedAppSet() { + return trackedAppSet; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java index e5f7cd0..085edc0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; @Private @Unstable public class SLSUtils { + public static final int SHUTDOWN_HOOK_PRIORITY = 30; // hostname includes the network path and the host name. for example // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar". http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java index 33d4846..2d2ffc5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/web/SLSWebApp.java @@ -107,12 +107,12 @@ public class SLSWebApp extends HttpServlet { public SLSWebApp(SchedulerWrapper wrapper, int metricsAddressPort) { this.wrapper = wrapper; - metrics = wrapper.getMetrics(); handleOperTimecostHistogramMap = new HashMap<SchedulerEventType, Histogram>(); queueAllocatedMemoryCounterMap = new HashMap<String, Counter>(); queueAllocatedVCoresCounterMap = new HashMap<String, Counter>(); schedulerMetrics = wrapper.getSchedulerMetrics(); + metrics = schedulerMetrics.getMetrics(); port = metricsAddressPort; } @@ -226,7 +226,7 @@ public class SLSWebApp extends HttpServlet { response.setStatus(HttpServletResponse.SC_OK); // queues {0} - Set<String> queues = wrapper.getQueueSet(); + Set<String> queues = wrapper.getTracker().getQueueSet(); StringBuilder queueInfo = new StringBuilder(); int i = 0; @@ -265,7 +265,7 @@ public class SLSWebApp extends HttpServlet { // tracked queues {0} StringBuilder trackedQueueInfo = new StringBuilder(); - Set<String> trackedQueues = wrapper.getQueueSet(); + Set<String> trackedQueues = wrapper.getTracker().getQueueSet(); for(String queue : trackedQueues) { trackedQueueInfo.append("<option value='Queue ").append(queue) .append("'>").append(queue).append("</option>"); @@ -273,7 +273,7 @@ public class SLSWebApp extends HttpServlet { // tracked apps {1} StringBuilder trackedAppInfo = new StringBuilder(); - Set<String> trackedApps = wrapper.getTrackedAppSet(); + Set<String> trackedApps = wrapper.getTracker().getTrackedAppSet(); for(String job : trackedApps) { trackedAppInfo.append("<option value='Job ").append(job) .append("'>").append(job).append("</option>"); @@ -422,7 +422,7 @@ public class SLSWebApp extends HttpServlet { // allocated resource for each queue Map<String, Double> queueAllocatedMemoryMap = new HashMap<String, Double>(); Map<String, Long> queueAllocatedVCoresMap = new HashMap<String, Long>(); - for (String queue : wrapper.getQueueSet()) { + for (String queue : wrapper.getTracker().getQueueSet()) { // memory String key = "counter.queue." + queue + ".allocated.memory"; if (! queueAllocatedMemoryCounterMap.containsKey(queue) && @@ -462,7 +462,7 @@ public class SLSWebApp extends HttpServlet { .append(",\"cluster.available.memory\":").append(availableMemoryGB) .append(",\"cluster.available.vcores\":").append(availableVCoresGB); - for (String queue : wrapper.getQueueSet()) { + for (String queue : wrapper.getTracker().getQueueSet()) { sb.append(",\"queue.").append(queue).append(".allocated.memory\":") .append(queueAllocatedMemoryMap.get(queue)); sb.append(",\"queue.").append(queue).append(".allocated.vcores\":") http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index f0d8e6f..ca3d195 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -22,37 +22,56 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; -import org.apache.hadoop.yarn.sls.scheduler.FairSchedulerMetrics; -import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper; +import org.apache.hadoop.yarn.sls.scheduler.*; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +@RunWith(Parameterized.class) public class TestAMSimulator { private ResourceManager rm; private YarnConfiguration conf; private Path metricOutputDir; + private Class slsScheduler; + private Class scheduler; + + @Parameterized.Parameters + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][] { + {SLSFairScheduler.class, FairScheduler.class}, + {SLSCapacityScheduler.class, CapacityScheduler.class} + }); + } + + public TestAMSimulator(Class slsScheduler, Class scheduler) { + this.slsScheduler = slsScheduler; + this.scheduler = scheduler; + } + @Before public void setup() { createMetricOutputDir(); conf = new YarnConfiguration(); conf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricOutputDir.toString()); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper"); - conf.set(SLSConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); + conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, true); rm = new ResourceManager(); rm.init(conf); @@ -76,15 +95,17 @@ public class TestAMSimulator { } private void verifySchedulerMetrics(String appId) { - SchedulerWrapper schedulerWrapper = (SchedulerWrapper) - rm.getResourceScheduler(); - MetricRegistry metricRegistry = schedulerWrapper.getMetrics(); - for (FairSchedulerMetrics.Metric metric : - FairSchedulerMetrics.Metric.values()) { - String key = "variable.app." + appId + "." + metric.getValue() - + ".memory"; - Assert.assertTrue(metricRegistry.getGauges().containsKey(key)); - Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue()); + if (scheduler.equals(FairScheduler.class)) { + SchedulerMetrics schedulerMetrics = ((SchedulerWrapper) + rm.getResourceScheduler()).getSchedulerMetrics(); + MetricRegistry metricRegistry = schedulerMetrics.getMetrics(); + for (FairSchedulerMetrics.Metric metric : + FairSchedulerMetrics.Metric.values()) { + String key = "variable.app." + appId + "." + metric.getValue() + + ".memory"; + Assert.assertTrue(metricRegistry.getGauges().containsKey(key)); + Assert.assertNotNull(metricRegistry.getGauges().get(key).getValue()); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a5516c2/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java index f9a3932..2f10f7d 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java @@ -21,26 +21,50 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; +import org.apache.hadoop.yarn.sls.scheduler.SLSCapacityScheduler; +import org.apache.hadoop.yarn.sls.scheduler.SLSFairScheduler; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) public class TestNMSimulator { private final int GB = 1024; private ResourceManager rm; private YarnConfiguration conf; + private Class slsScheduler; + private Class scheduler; + + @Parameterized.Parameters + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][] { + {SLSFairScheduler.class, FairScheduler.class}, + {SLSCapacityScheduler.class, CapacityScheduler.class} + }); + } + + public TestNMSimulator(Class slsScheduler, Class scheduler) { + this.slsScheduler = slsScheduler; + this.scheduler = scheduler; + } + @Before public void setup() { conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper"); - conf.set(SLSConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_SCHEDULER, slsScheduler.getName()); + conf.set(SLSConfiguration.RM_SCHEDULER, scheduler.getName()); conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false); rm = new ResourceManager(); rm.init(conf); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
