YARN-6363. Extending SLS: Synthetic Load Generator. (Carlo Curino via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de69d6e8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de69d6e8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de69d6e8 Branch: refs/heads/YARN-5355 Commit: de69d6e81128470dd5d2fd865d4b3a79188f740b Parents: 667966c Author: Wangda Tan <[email protected]> Authored: Thu Apr 20 21:54:18 2017 -0700 Committer: Wangda Tan <[email protected]> Committed: Thu Apr 20 21:54:30 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/tools/rumen/TaskInfo.java | 29 +- .../apache/hadoop/tools/rumen/ZombieJob.java | 9 +- hadoop-tools/hadoop-sls/pom.xml | 3 + hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh | 36 +- .../hadoop/yarn/sls/ReservationClientUtil.java | 78 +++ .../org/apache/hadoop/yarn/sls/SLSRunner.java | 569 +++++++++++++------ .../hadoop/yarn/sls/appmaster/AMSimulator.java | 89 ++- .../yarn/sls/appmaster/MRAMSimulator.java | 9 +- .../sls/resourcemanager/MockAMLauncher.java | 5 + .../sls/scheduler/SLSCapacityScheduler.java | 24 +- .../yarn/sls/scheduler/SLSFairScheduler.java | 22 +- .../hadoop/yarn/sls/scheduler/TaskRunner.java | 9 +- .../hadoop/yarn/sls/synthetic/SynthJob.java | 306 ++++++++++ .../yarn/sls/synthetic/SynthJobClass.java | 180 ++++++ .../sls/synthetic/SynthTraceJobProducer.java | 316 ++++++++++ .../hadoop/yarn/sls/synthetic/SynthUtils.java | 101 ++++ .../yarn/sls/synthetic/SynthWorkload.java | 121 ++++ .../hadoop/yarn/sls/synthetic/package-info.java | 22 + .../apache/hadoop/yarn/sls/utils/SLSUtils.java | 9 + .../src/site/markdown/SchedulerLoadSimulator.md | 122 +++- .../hadoop/yarn/sls/BaseSLSRunnerTest.java | 120 ++++ .../apache/hadoop/yarn/sls/TestSLSRunner.java | 90 +-- .../hadoop/yarn/sls/TestSynthJobGeneration.java | 96 ++++ .../yarn/sls/appmaster/TestAMSimulator.java | 2 +- .../yarn/sls/scheduler/TestTaskRunner.java | 2 +- .../src/test/resources/capacity-scheduler.xml | 10 + .../src/test/resources/fair-scheduler.xml | 8 +- .../hadoop-sls/src/test/resources/inputsls.json | 55 ++ .../hadoop-sls/src/test/resources/nodes.json | 84 +++ .../src/test/resources/sls-runner.xml | 6 +- .../hadoop-sls/src/test/resources/syn.json | 53 ++ .../hadoop-sls/src/test/resources/yarn-site.xml | 10 +- 32 files changed, 2303 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java index 9aa6373..6159f85 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/TaskInfo.java @@ -23,21 +23,37 @@ public class TaskInfo { private final long bytesOut; private final int recsOut; private final long maxMemory; + private final long maxVcores; private final ResourceUsageMetrics metrics; + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, + new ResourceUsageMetrics()); + } + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory) { - this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, + long maxMemory, ResourceUsageMetrics + metrics) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics); + } + + public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, + long maxMemory, long maxVcores) { + this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores, new ResourceUsageMetrics()); } public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut, - long maxMemory, ResourceUsageMetrics metrics) { + long maxMemory, long maxVcores, ResourceUsageMetrics + metrics) { this.bytesIn = bytesIn; this.recsIn = recsIn; this.bytesOut = bytesOut; this.recsOut = recsOut; this.maxMemory = maxMemory; + this.maxVcores = maxVcores; this.metrics = metrics; } @@ -79,6 +95,13 @@ public class TaskInfo { } /** + * @return Vcores used by the task. + */ + public long getTaskVCores() { + return maxVcores; + } + + /** * @return Resource usage metrics */ public ResourceUsageMetrics getResourceUsageMetrics() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java index 3857e1f..6400840 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/ZombieJob.java @@ -426,7 +426,7 @@ public class ZombieJob implements JobStory { LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -473,7 +473,7 @@ public class ZombieJob implements JobStory { LoggedTask loggedTask = getLoggedTask(taskType, taskNumber); if (loggedTask == null) { // TODO insert parameters - TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0); + TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0); return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber, taskNumber, locality); } @@ -639,7 +639,7 @@ public class ZombieJob implements JobStory { private TaskInfo getTaskInfo(LoggedTask loggedTask) { if (loggedTask == null) { - return new TaskInfo(0, 0, 0, 0, 0); + return new TaskInfo(0, 0, 0, 0, 0, 0); } List<LoggedTaskAttempt> attempts = loggedTask.getAttempts(); @@ -688,9 +688,10 @@ public class ZombieJob implements JobStory { break; } + //note: hardcoding vCores, as they are not collected TaskInfo taskInfo = new TaskInfo(inputBytes, (int) inputRecords, outputBytes, - (int) outputRecords, (int) heapMegabytes, + (int) outputRecords, (int) heapMegabytes, 1, metrics); return taskInfo; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index 0d4ef58..8fb57f3 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -132,6 +132,9 @@ <exclude>src/test/resources/simulate.html.template</exclude> <exclude>src/test/resources/simulate.info.html.template</exclude> <exclude>src/test/resources/track.html.template</exclude> + <exclude>src/test/resources/syn.json</exclude> + <exclude>src/test/resources/inputsls.json</exclude> + <exclude>src/test/resources/nodes.json</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh index 5f8d9fc..cbc5bc9 100644 --- a/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh +++ b/hadoop-tools/hadoop-sls/src/main/bin/slsrun.sh @@ -16,7 +16,9 @@ function hadoop_usage() { echo "Usage: slsrun.sh <OPTIONS> " - echo " --input-rumen=<FILE1,FILE2,...> | --input-sls=<FILE1,FILE2,...>" + echo " --tracetype=<SYNTH | SLS | RUMEN>" + echo " --tracelocation=<FILE1,FILE2,...>" + echo " (deprecated --input-rumen=<FILE1,FILE2,...> | --input-sls=<FILE1,FILE2,...>)" echo " --output-dir=<SLS_SIMULATION_OUTPUT_DIRECTORY>" echo " [--nodes=<SLS_NODES_FILE>]" echo " [--track-jobs=<JOBID1,JOBID2,...>]" @@ -33,6 +35,12 @@ function parse_args() --input-sls=*) inputsls=${i#*=} ;; + --tracetype=*) + tracetype=${i#*=} + ;; + --tracelocation=*) + tracelocation=${i#*=} + ;; --output-dir=*) outputdir=${i#*=} ;; @@ -52,14 +60,12 @@ function parse_args() esac done - if [[ -z "${inputrumen}" && -z "${inputsls}" ]] ; then - hadoop_error "ERROR: Either --input-rumen or --input-sls must be specified." - hadoop_exit_with_usage 1 + if [[ -z "${inputrumen}" && -z "${inputsls}" && -z "${tracetype}" ]] ; then + hadoop_error "ERROR: Either --input-rumen, --input-sls, or --tracetype (with --tracelocation) must be specified." fi - if [[ -n "${inputrumen}" && -n "${inputsls}" ]] ; then - hadoop_error "ERROR: Only specify one of --input-rumen or --input-sls." - hadoop_exit_with_usage 1 + if [[ -n "${inputrumen}" && -n "${inputsls}" && -n "${tracetype}" ]] ; then + hadoop_error "ERROR: Only specify one of --input-rumen, --input-sls, or --tracetype (with --tracelocation)" fi if [[ -z "${outputdir}" ]] ; then @@ -74,11 +80,17 @@ function calculate_classpath } function run_simulation() { - if [[ "${inputsls}" == "" ]] ; then - hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}" - else - hadoop_add_param args -inputsls "-inputsls ${inputsls}" - fi + + if [[ "${inputsls}" != "" ]] ; then + hadoop_add_param args -inputsls "-inputsls ${inputsls}" + fi + if [[ "${inputrumen}" != "" ]] ; then + hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}" + fi + if [[ "${tracetype}" != "" ]] ; then + hadoop_add_param args -tracetype "-tracetype ${tracetype}" + hadoop_add_param args -tracelocation "-tracelocation ${tracelocation}" + fi hadoop_add_param args -output "-output ${outputdir}" http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java new file mode 100644 index 0000000..7c10a57 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/ReservationClientUtil.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls; + +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Simple support class, used to create reservation requests. + */ +public final class ReservationClientUtil { + + private ReservationClientUtil(){ + //avoid instantiation + } + + /** + * Creates a request that envelopes a MR jobs, picking max number of maps and + * reducers, max durations, and max resources per container. + * + * @param reservationId the id of the reservation + * @param name the name of a reservation + * @param maxMapRes maximum resources used by any mapper + * @param numberMaps number of mappers + * @param maxMapDur maximum duration of any mapper + * @param maxRedRes maximum resources used by any reducer + * @param numberReduces number of reducers + * @param maxRedDur maximum duration of any reducer + * @param arrival start time of valid range for reservation + * @param deadline deadline for this reservation + * @param queueName queue to submit to + * @return a submission request + */ + @SuppressWarnings("checkstyle:parameternumber") + public static ReservationSubmissionRequest createMRReservation( + ReservationId reservationId, String name, Resource maxMapRes, + int numberMaps, long maxMapDur, Resource maxRedRes, int numberReduces, + long maxRedDur, long arrival, long deadline, String queueName) { + + ReservationRequest mapRR = ReservationRequest.newInstance(maxMapRes, + numberMaps, numberMaps, maxMapDur); + ReservationRequest redRR = ReservationRequest.newInstance(maxRedRes, + numberReduces, numberReduces, maxRedDur); + + List<ReservationRequest> listResReq = new ArrayList<ReservationRequest>(); + listResReq.add(mapRR); + listResReq.add(redRR); + + ReservationRequests reservationRequests = ReservationRequests + .newInstance(listResReq, ReservationRequestInterpreter.R_ORDER_NO_GAP); + ReservationDefinition resDef = ReservationDefinition.newInstance(arrival, + deadline, reservationRequests, name); + + // outermost request + ReservationSubmissionRequest request = ReservationSubmissionRequest + .newInstance(resDef, queueName, reservationId); + + return request; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index ba43816..523d22a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -41,17 +41,25 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -67,25 +75,27 @@ import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator; import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher; import org.apache.hadoop.yarn.sls.scheduler.*; +import org.apache.hadoop.yarn.sls.synthetic.SynthJob; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; import org.apache.hadoop.yarn.sls.utils.SLSUtils; +import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.log4j.Logger; @Private @Unstable -public class SLSRunner { +public class SLSRunner extends Configured implements Tool { // RM, Runner private ResourceManager rm; private static TaskRunner runner = new TaskRunner(); private String[] inputTraces; - private Configuration conf; private Map<String, Integer> queueAppNumMap; - + // NM simulator private HashMap<NodeId, NMSimulator> nmMap; private int nmMemoryMB, nmVCores; private String nodeFile; - + // AM simulator private int AM_ID; private Map<String, AMSimulator> amMap; @@ -106,43 +116,64 @@ public class SLSRunner { // logger public final static Logger LOG = Logger.getLogger(SLSRunner.class); - // input traces, input-rumen or input-sls - private boolean isSLS; - - public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile, - String outputDir, Set<String> trackedApps, - boolean printsimulation) - throws IOException, ClassNotFoundException { - this.isSLS = isSLS; - this.inputTraces = inputTraces.clone(); - this.nodeFile = nodeFile; - this.trackedApps = trackedApps; - this.printSimulation = printsimulation; - metricsOutputDir = outputDir; - + /** + * The type of trace in input. + */ + public enum TraceType { + SLS, RUMEN, SYNTH + } + + private TraceType inputType; + private SynthTraceJobProducer stjp; + + public SLSRunner() throws ClassNotFoundException { + Configuration tempConf = new Configuration(false); + init(tempConf); + } + + public SLSRunner(Configuration tempConf) throws ClassNotFoundException { + init(tempConf); + } + + private void init(Configuration tempConf) throws ClassNotFoundException { nmMap = new HashMap<>(); queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); - + // runner configuration - conf = new Configuration(false); - conf.addResource("sls-runner.xml"); + tempConf.addResource("sls-runner.xml"); + super.setConf(tempConf); + // runner - int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, - SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); + int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, + SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSRunner.runner.setQueueSize(poolSize); // <AMType, Class> map - for (Map.Entry e : conf) { + for (Map.Entry e : tempConf) { String key = e.getKey().toString(); if (key.startsWith(SLSConfiguration.AM_TYPE)) { String amType = key.substring(SLSConfiguration.AM_TYPE.length()); - amClassMap.put(amType, Class.forName(conf.get(key))); + amClassMap.put(amType, Class.forName(tempConf.get(key))); } } } - - public void start() throws Exception { + + public void setSimulationParams(TraceType inType, String[] inTraces, + String nodes, String outDir, Set<String> trackApps, + boolean printsimulation) throws IOException, ClassNotFoundException { + + this.inputType = inType; + this.inputTraces = inTraces.clone(); + this.nodeFile = nodes; + this.trackedApps = trackApps; + this.printSimulation = printsimulation; + metricsOutputDir = outDir; + + } + + public void start() throws IOException, ClassNotFoundException, YarnException, + InterruptedException { // start resource manager startRM(); // start node managers @@ -151,9 +182,9 @@ public class SLSRunner { startAM(); // set queue & tracked apps information ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setQueueSet(this.queueAppNumMap.keySet()); + .setQueueSet(this.queueAppNumMap.keySet()); ((SchedulerWrapper) rm.getResourceScheduler()).getTracker() - .setTrackedAppSet(this.trackedApps); + .setTrackedAppSet(this.trackedApps); // print out simulation info printSimulationInfo(); // blocked until all nodes RUNNING @@ -162,23 +193,23 @@ public class SLSRunner { runner.start(); } - private void startRM() throws Exception { - Configuration rmConf = new YarnConfiguration(); + private void startRM() throws ClassNotFoundException, YarnException { + Configuration rmConf = new YarnConfiguration(getConf()); String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER); // For CapacityScheduler we use a sub-classing instead of wrapping // to allow scheduler-specific invocations from monitors to work // this can be used for other schedulers as well if we care to // exercise/track behaviors that are not common to the scheduler api - if(Class.forName(schedulerClass) == CapacityScheduler.class) { + if (Class.forName(schedulerClass) == CapacityScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSCapacityScheduler.class.getName()); } else if (Class.forName(schedulerClass) == FairScheduler.class) { rmConf.set(YarnConfiguration.RM_SCHEDULER, SLSFairScheduler.class.getName()); - } else if (Class.forName(schedulerClass) == FifoScheduler.class){ + } else if (Class.forName(schedulerClass) == FifoScheduler.class) { // TODO add support for FifoScheduler - throw new Exception("Fifo Scheduler is not supported yet."); + throw new YarnException("Fifo Scheduler is not supported yet."); } rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir); @@ -196,37 +227,47 @@ public class SLSRunner { private void startNM() throws YarnException, IOException { // nm configuration - nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB, - SLSConfiguration.NM_MEMORY_MB_DEFAULT); - nmVCores = conf.getInt(SLSConfiguration.NM_VCORES, - SLSConfiguration.NM_VCORES_DEFAULT); - int heartbeatInterval = conf.getInt( - SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, + nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB, + SLSConfiguration.NM_MEMORY_MB_DEFAULT); + nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES, + SLSConfiguration.NM_VCORES_DEFAULT); + int heartbeatInterval = + getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT); // nm information (fetch from topology file, or from sls/rumen json file) Set<String> nodeSet = new HashSet<String>(); if (nodeFile.isEmpty()) { - if (isSLS) { - for (String inputTrace : inputTraces) { + for (String inputTrace : inputTraces) { + + switch (inputType) { + case SLS: nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace)); - } - } else { - for (String inputTrace : inputTraces) { + break; + case RUMEN: nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace)); + break; + case SYNTH: + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(), + stjp.getNodesPerRack())); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } } - } else { nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile)); } + // create NM simulators Random random = new Random(); Set<String> rackSet = new HashSet<String>(); for (String hostName : nodeSet) { // we randomize the heartbeat start time from zero to 1 interval NMSimulator nm = new NMSimulator(); - nm.init(hostName, nmMemoryMB, nmVCores, - random.nextInt(heartbeatInterval), heartbeatInterval, rm); + nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval), + heartbeatInterval, rm); nmMap.put(nm.getNode().getNodeID(), nm); runner.schedule(nm); rackSet.add(nm.getNode().getRackName()); @@ -241,39 +282,50 @@ public class SLSRunner { int numRunningNodes = 0; for (RMNode node : rm.getRMContext().getRMNodes().values()) { if (node.getState() == NodeState.RUNNING) { - numRunningNodes ++; + numRunningNodes++; } } if (numRunningNodes == numNMs) { break; } - LOG.info(MessageFormat.format("SLSRunner is waiting for all " + - "nodes RUNNING. {0} of {1} NMs initialized.", - numRunningNodes, numNMs)); + LOG.info(MessageFormat.format( + "SLSRunner is waiting for all " + + "nodes RUNNING. {0} of {1} NMs initialized.", + numRunningNodes, numNMs)); Thread.sleep(1000); } LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.", - (System.currentTimeMillis() - startTimeMS))); + (System.currentTimeMillis() - startTimeMS))); } @SuppressWarnings("unchecked") private void startAM() throws YarnException, IOException { // application/container configuration - int heartbeatInterval = conf.getInt( - SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, + int heartbeatInterval = + getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT); - int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB, + int containerMemoryMB = + getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB, SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT); - int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES, - SLSConfiguration.CONTAINER_VCORES_DEFAULT); + int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES, + SLSConfiguration.CONTAINER_VCORES_DEFAULT); Resource containerResource = - BuilderUtils.newResource(containerMemoryMB, containerVCores); + BuilderUtils.newResource(containerMemoryMB, containerVCores); // application workload - if (isSLS) { + switch (inputType) { + case SLS: startAMFromSLSTraces(containerResource, heartbeatInterval); - } else { + break; + case RUMEN: startAMFromRumenTraces(containerResource, heartbeatInterval); + break; + case SYNTH: + startAMFromSynthGenerator(heartbeatInterval); + break; + default: + throw new YarnException("Input configuration not recognized, " + + "trace type should be SLS, RUMEN, or SYNTH"); } numAMs = amMap.size(); remainingApps = numAMs; @@ -284,7 +336,7 @@ public class SLSRunner { */ @SuppressWarnings("unchecked") private void startAMFromSLSTraces(Resource containerResource, - int heartbeatInterval) throws IOException { + int heartbeatInterval) throws IOException { // parse from sls traces JsonFactory jsonF = new JsonFactory(); ObjectMapper mapper = new ObjectMapper(); @@ -292,26 +344,28 @@ public class SLSRunner { Reader input = new InputStreamReader(new FileInputStream(inputTrace), "UTF-8"); try { - Iterator<Map> i = mapper.readValues(jsonF.createParser(input), - Map.class); + Iterator<Map> i = + mapper.readValues(jsonF.createParser(input), Map.class); while (i.hasNext()) { Map jsonJob = i.next(); // load job information - long jobStartTime = Long.parseLong( - jsonJob.get("job.start.ms").toString()); - long jobFinishTime = Long.parseLong( - jsonJob.get("job.end.ms").toString()); + long jobStartTime = + Long.parseLong(jsonJob.get("job.start.ms").toString()); + long jobFinishTime = + Long.parseLong(jsonJob.get("job.end.ms").toString()); String user = (String) jsonJob.get("job.user"); - if (user == null) user = "default"; + if (user == null) { + user = "default"; + } String queue = jsonJob.get("job.queue.name").toString(); String oldAppId = jsonJob.get("job.id").toString(); boolean isTracked = trackedApps.contains(oldAppId); - int queueSize = queueAppNumMap.containsKey(queue) ? - queueAppNumMap.get(queue) : 0; - queueSize ++; + int queueSize = + queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0; + queueSize++; queueAppNumMap.put(queue, queueSize); // tasks List tasks = (List) jsonJob.get("job.tasks"); @@ -319,45 +373,45 @@ public class SLSRunner { continue; } List<ContainerSimulator> containerList = - new ArrayList<ContainerSimulator>(); + new ArrayList<ContainerSimulator>(); for (Object o : tasks) { Map jsonTask = (Map) o; String hostname = jsonTask.get("container.host").toString(); - long taskStart = Long.parseLong( - jsonTask.get("container.start.ms").toString()); - long taskFinish = Long.parseLong( - jsonTask.get("container.end.ms").toString()); + long taskStart = + Long.parseLong(jsonTask.get("container.start.ms").toString()); + long taskFinish = + Long.parseLong(jsonTask.get("container.end.ms").toString()); long lifeTime = taskFinish - taskStart; // Set memory and vcores from job trace file Resource res = Resources.clone(containerResource); if (jsonTask.containsKey("container.memory")) { - int containerMemory = Integer.parseInt( - jsonTask.get("container.memory").toString()); + int containerMemory = + Integer.parseInt(jsonTask.get("container.memory").toString()); res.setMemorySize(containerMemory); } if (jsonTask.containsKey("container.vcores")) { - int containerVCores = Integer.parseInt( - jsonTask.get("container.vcores").toString()); + int containerVCores = + Integer.parseInt(jsonTask.get("container.vcores").toString()); res.setVirtualCores(containerVCores); } - int priority = Integer.parseInt( - jsonTask.get("container.priority").toString()); + int priority = + Integer.parseInt(jsonTask.get("container.priority").toString()); String type = jsonTask.get("container.type").toString(); - containerList.add(new ContainerSimulator(res, - lifeTime, hostname, priority, type)); + containerList.add(new ContainerSimulator(res, lifeTime, hostname, + priority, type)); } // create a new AM String amType = jsonJob.get("am.type").toString(); - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(amType), new Configuration()); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(amType), new Configuration()); if (amSim != null) { - amSim.init(AM_ID++, heartbeatInterval, containerList, rm, - this, jobStartTime, jobFinishTime, user, queue, - isTracked, oldAppId); + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId, + null, runner.getStartTimeMS()); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTime); numTasks += containerList.size(); @@ -375,22 +429,21 @@ public class SLSRunner { */ @SuppressWarnings("unchecked") private void startAMFromRumenTraces(Resource containerResource, - int heartbeatInterval) - throws IOException { + int heartbeatInterval) throws IOException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; for (String inputTrace : inputTraces) { File fin = new File(inputTrace); - JobTraceReader reader = new JobTraceReader( - new Path(fin.getAbsolutePath()), conf); + JobTraceReader reader = + new JobTraceReader(new Path(fin.getAbsolutePath()), conf); try { LoggedJob job = null; while ((job = reader.getNext()) != null) { // only support MapReduce currently String jobType = "mapreduce"; - String user = job.getUser() == null ? - "default" : job.getUser().getValue(); + String user = + job.getUser() == null ? "default" : job.getUser().getValue(); String jobQueue = job.getQueue().getValue(); String oldJobId = job.getJobID().toString(); long jobStartTimeMS = job.getSubmitTime(); @@ -407,48 +460,48 @@ public class SLSRunner { } boolean isTracked = trackedApps.contains(oldJobId); - int queueSize = queueAppNumMap.containsKey(jobQueue) ? - queueAppNumMap.get(jobQueue) : 0; - queueSize ++; + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; queueAppNumMap.put(jobQueue, queueSize); List<ContainerSimulator> containerList = - new ArrayList<ContainerSimulator>(); + new ArrayList<ContainerSimulator>(); // map tasks - for(LoggedTask mapTask : job.getMapTasks()) { + for (LoggedTask mapTask : job.getMapTasks()) { if (mapTask.getAttempts().size() == 0) { continue; } - LoggedTaskAttempt taskAttempt = mapTask.getAttempts() - .get(mapTask.getAttempts().size() - 1); + LoggedTaskAttempt taskAttempt = + mapTask.getAttempts().get(mapTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + long containerLifeTime = + taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 10, "map")); + containerLifeTime, hostname, 10, "map")); } // reduce tasks - for(LoggedTask reduceTask : job.getReduceTasks()) { + for (LoggedTask reduceTask : job.getReduceTasks()) { if (reduceTask.getAttempts().size() == 0) { continue; } LoggedTaskAttempt taskAttempt = reduceTask.getAttempts() - .get(reduceTask.getAttempts().size() - 1); + .get(reduceTask.getAttempts().size() - 1); String hostname = taskAttempt.getHostName().getValue(); - long containerLifeTime = taskAttempt.getFinishTime() - - taskAttempt.getStartTime(); + long containerLifeTime = + taskAttempt.getFinishTime() - taskAttempt.getStartTime(); containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, 20, "reduce")); + containerLifeTime, hostname, 20, "reduce")); } // create a new AM - AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( - amClassMap.get(jobType), conf); + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), conf); if (amSim != null) { - amSim.init(AM_ID ++, heartbeatInterval, containerList, - rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, - isTracked, oldJobId); + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, + oldJobId, null, runner.getStartTimeMS()); runner.schedule(amSim); maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); numTasks += containerList.size(); @@ -460,34 +513,168 @@ public class SLSRunner { } } } - + + /** + * parse workload information from synth-generator trace files. + */ + @SuppressWarnings("unchecked") + private void startAMFromSynthGenerator(int heartbeatInterval) + throws IOException { + Configuration localConf = new Configuration(); + localConf.set("fs.defaultFS", "file:///"); + long baselineTimeMS = 0; + + // reservations use wall clock time, so need to have a reference for that + UTCClock clock = new UTCClock(); + long now = clock.getTime(); + + try { + + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } + + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String jobType = "mapreduce"; + String user = job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); + + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = -1L; + + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job " + oldJobId + " start time to 0."); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; + } + + boolean isTracked = trackedApps.contains(oldJobId); + int queueSize = queueAppNumMap.containsKey(jobQueue) + ? queueAppNumMap.get(jobQueue) : 0; + queueSize++; + queueAppNumMap.put(jobQueue, queueSize); + + List<ContainerSimulator> containerList = + new ArrayList<ContainerSimulator>(); + ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + + Resource maxMapRes = Resource.newInstance(0, 0); + long maxMapDur = 0; + // map tasks + for (int i = 0; i < job.getNumberMaps(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 10, "map")); + maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource); + maxMapDur = + containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur; + + } + + Resource maxRedRes = Resource.newInstance(0, 0); + long maxRedDur = 0; + // reduce tasks + for (int i = 0; i < job.getNumberReduces(); i++) { + TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); + RMNode node = nmMap + .get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = tai.getRuntime(); + Resource containerResource = + Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), + (int) tai.getTaskInfo().getTaskVCores()); + containerList.add(new ContainerSimulator(containerResource, + containerLifeTime, hostname, 20, "reduce")); + maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource); + maxRedDur = + containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur; + + } + + // generating reservations for the jobs that require them + + ReservationSubmissionRequest rr = null; + if (job.hasDeadline()) { + ReservationId reservationId = + ReservationId.newInstance(this.rm.getStartTime(), AM_ID); + + rr = ReservationClientUtil.createMRReservation(reservationId, + "reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur, + maxRedRes, job.getNumberReduces(), maxRedDur, + now + jobStartTimeMS, now + job.getDeadline(), + job.getQueueName()); + + } + // create a new AM + AMSimulator amSim = (AMSimulator) ReflectionUtils + .newInstance(amClassMap.get(jobType), localConf); + if (amSim != null) { + amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this, + jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, + oldJobId, rr, runner.getStartTimeMS()); + runner.schedule(amSim); + maxRuntime = Math.max(maxRuntime, jobFinishTimeMS); + numTasks += containerList.size(); + amMap.put(oldJobId, amSim); + } + } + } finally { + stjp.close(); + } + + } + private void printSimulationInfo() { if (printSimulation) { // node LOG.info("------------------------------------"); - LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " + - "of each node {2} MB memory and {3} vcores.", - numNMs, numRacks, nmMemoryMB, nmVCores)); + LOG.info(MessageFormat.format( + "# nodes = {0}, # racks = {1}, capacity " + + "of each node {2} MB memory and {3} vcores.", + numNMs, numRacks, nmMemoryMB, nmVCores)); LOG.info("------------------------------------"); // job - LOG.info(MessageFormat.format("# applications = {0}, # total " + - "tasks = {1}, average # tasks per application = {2}", - numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs)))); + LOG.info(MessageFormat.format( + "# applications = {0}, # total " + + "tasks = {1}, average # tasks per application = {2}", + numAMs, numTasks, (int) (Math.ceil((numTasks + 0.0) / numAMs)))); LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks"); for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) { AMSimulator am = entry.getValue(); - LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + "\t" + am.getDuration() + "\t" + am.getNumTasks()); } LOG.info("------------------------------------"); // queue - LOG.info(MessageFormat.format("number of queues = {0} average " + - "number of apps = {1}", queueAppNumMap.size(), - (int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); + LOG.info(MessageFormat.format( + "number of queues = {0} average " + "number of apps = {1}", + queueAppNumMap.size(), + (int) (Math.ceil((numAMs + 0.0) / queueAppNumMap.size())))); LOG.info("------------------------------------"); // runtime - LOG.info(MessageFormat.format("estimated simulation time is {0}" + - " seconds", (long)(Math.ceil(maxRuntime / 1000.0)))); + LOG.info( + MessageFormat.format("estimated simulation time is {0}" + " seconds", + (long) (Math.ceil(maxRuntime / 1000.0)))); LOG.info("------------------------------------"); } // package these information in the simulateInfoMap used by other places @@ -510,69 +697,121 @@ public class SLSRunner { return nmMap; } - public static TaskRunner getRunner() { - return runner; - } - public static void decreaseRemainingApps() { - remainingApps --; + remainingApps--; if (remainingApps == 0) { LOG.info("SLSRunner tears down."); - System.exit(0); } } - public static void main(String args[]) throws Exception { + public void stop() throws InterruptedException { + rm.stop(); + runner.stop(); + } + + public int run(final String[] argv) throws IOException, InterruptedException, + ParseException, ClassNotFoundException, YarnException { + Options options = new Options(); + + // Left for compatibility options.addOption("inputrumen", true, "input rumen files"); options.addOption("inputsls", true, "input sls files"); + + // New more general format + options.addOption("tracetype", true, "the type of trace"); + options.addOption("tracelocation", true, "input trace files"); + options.addOption("nodes", true, "input topology"); options.addOption("output", true, "output directory"); options.addOption("trackjobs", true, - "jobs to be tracked during simulating"); + "jobs to be tracked during simulating"); options.addOption("printsimulation", false, - "print out simulation information"); - + "print out simulation information"); + CommandLineParser parser = new GnuParser(); - CommandLine cmd = parser.parse(options, args); + CommandLine cmd = parser.parse(options, argv); - String inputRumen = cmd.getOptionValue("inputrumen"); - String inputSLS = cmd.getOptionValue("inputsls"); - String output = cmd.getOptionValue("output"); - - if ((inputRumen == null && inputSLS == null) || output == null) { - System.err.println(); - System.err.println("ERROR: Missing input or output file"); - System.err.println(); - System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " + - "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + - "[-printsimulation]"); - System.err.println(); - System.exit(1); + String traceType = null; + String traceLocation = null; + + // compatibility with old commandline + if (cmd.hasOption("inputrumen")) { + traceType = "RUMEN"; + traceLocation = cmd.getOptionValue("inputrumen"); + } + if (cmd.hasOption("inputsls")) { + traceType = "SLS"; + traceLocation = cmd.getOptionValue("inputsls"); + } + + if (cmd.hasOption("tracetype")) { + traceType = cmd.getOptionValue("tracetype"); + traceLocation = cmd.getOptionValue("tracelocation"); } - + + String output = cmd.getOptionValue("output"); + File outputFile = new File(output); - if (! outputFile.exists() - && ! outputFile.mkdirs()) { + if (!outputFile.exists() && !outputFile.mkdirs()) { System.err.println("ERROR: Cannot create output directory " - + outputFile.getAbsolutePath()); - System.exit(1); + + outputFile.getAbsolutePath()); + throw new YarnException("Cannot create output directory"); } - + Set<String> trackedJobSet = new HashSet<String>(); if (cmd.hasOption("trackjobs")) { String trackjobs = cmd.getOptionValue("trackjobs"); String jobIds[] = trackjobs.split(","); trackedJobSet.addAll(Arrays.asList(jobIds)); } - - String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; - boolean isSLS = inputSLS != null; - String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(","); - SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output, + String tempNodeFile = + cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : ""; + + TraceType tempTraceType = TraceType.SLS; + switch (traceType) { + case "SLS": + tempTraceType = TraceType.SLS; + break; + case "RUMEN": + tempTraceType = TraceType.RUMEN; + break; + + case "SYNTH": + tempTraceType = TraceType.SYNTH; + break; + default: + printUsage(); + throw new YarnException("Misconfigured input"); + } + + String[] inputFiles = traceLocation.split(","); + + setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output, trackedJobSet, cmd.hasOption("printsimulation")); - sls.start(); + + start(); + + return 0; } + + public static void main(String[] argv) throws Exception { + ToolRunner.run(new Configuration(), new SLSRunner(), argv); + } + + static void printUsage() { + System.err.println(); + System.err.println("ERROR: Wrong tracetype"); + System.err.println(); + System.err.println( + "Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... " + + "(deprecated alternative options --inputsls FILE, FILE,... " + + " | --inputrumen FILE,FILE,...)" + + "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " + + "[-printsimulation]"); + System.err.println(); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index a62f2b6..45a3c07 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords .FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; @@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -97,6 +100,7 @@ public abstract class AMSimulator extends TaskRunner.Task { // am type protected String amtype; // job start/end time + private long baselineTimeMS; protected long traceStartTimeMS; protected long traceFinishTimeMS; protected long simulateStartTimeMS; @@ -117,25 +121,30 @@ public abstract class AMSimulator extends TaskRunner.Task { private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024; private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1; + private ReservationSubmissionRequest reservationRequest; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } - public void init(int id, int heartbeatInterval, - List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, - long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { - super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval, - heartbeatInterval); - this.user = user; - this.rm = rm; - this.se = se; - this.user = user; - this.queue = queue; - this.oldAppId = oldAppId; - this.isTracked = isTracked; - this.traceStartTimeMS = traceStartTime; - this.traceFinishTimeMS = traceFinishTime; + @SuppressWarnings("checkstyle:parameternumber") + public void init(int id, int heartbeatInterval, + List<ContainerSimulator> containerList, ResourceManager resourceManager, + SLSRunner slsRunnner, long startTime, long finishTime, String simUser, + String simQueue, boolean tracked, String oldApp, + ReservationSubmissionRequest rr, long baseTimeMS) { + super.init(startTime, startTime + 1000000L * heartbeatInterval, + heartbeatInterval); + this.user = simUser; + this.rm = resourceManager; + this.se = slsRunnner; + this.queue = simQueue; + this.oldAppId = oldApp; + this.isTracked = tracked; + this.baselineTimeMS = baseTimeMS; + this.traceStartTimeMS = startTime; + this.traceFinishTimeMS = finishTime; + this.reservationRequest = rr; } /** @@ -143,11 +152,21 @@ public abstract class AMSimulator extends TaskRunner.Task { */ @Override public void firstStep() throws Exception { - simulateStartTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS; + + ReservationId reservationId = null; + + // submit a reservation if one is required, exceptions naturally happen + // when the reservation does not fit, catch, log, and move on running job + // without reservation. + try { + reservationId = submitReservationWhenSpecified(); + } catch (UndeclaredThrowableException y) { + LOG.warn("Unable to place reservation: " + y.getMessage()); + } // submit application, waiting until ACCEPTED - submitApp(); + submitApp(reservationId); // track app metrics trackApp(); @@ -161,6 +180,26 @@ public abstract class AMSimulator extends TaskRunner.Task { isAMContainerRunning = true; } + private ReservationId submitReservationWhenSpecified() + throws IOException, InterruptedException { + if (reservationRequest != null) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws YarnException, IOException { + rm.getClientRMService().submitReservation(reservationRequest); + LOG.info("RESERVATION SUCCESSFULLY SUBMITTED " + + reservationRequest.getReservationId()); + return null; + + } + }); + return reservationRequest.getReservationId(); + } else { + return null; + } + } + @Override public void middleStep() throws Exception { if (isAMContainerRunning) { @@ -217,14 +256,13 @@ public abstract class AMSimulator extends TaskRunner.Task { } }); - simulateFinishTimeMS = System.currentTimeMillis() - - SLSRunner.getRunner().getStartTimeMS(); + simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS; // record job running information SchedulerMetrics schedulerMetrics = - ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); + ((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics(); if (schedulerMetrics != null) { schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS, - simulateStartTimeMS, simulateFinishTimeMS); + simulateStartTimeMS, simulateFinishTimeMS); } } @@ -261,7 +299,7 @@ public abstract class AMSimulator extends TaskRunner.Task { protected abstract void checkStop(); - private void submitApp() + private void submitApp(ReservationId reservationId) throws YarnException, InterruptedException, IOException { // ask for new application GetNewApplicationRequest newAppRequest = @@ -291,6 +329,11 @@ public abstract class AMSimulator extends TaskRunner.Task { appSubContext.setResource(Resources .createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB, MR_AM_CONTAINER_RESOURCE_VCORES)); + + if(reservationId != null) { + appSubContext.setReservationID(reservationId); + } + subAppRequest.setApplicationSubmissionContext(appSubContext); UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); ugi.doAs(new PrivilegedExceptionAction<Object>() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index e726b09..de6d19d 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -27,13 +27,13 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import org.apache.avro.Protocol; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; import org.apache.hadoop.yarn.sls.SLSRunner; @@ -114,13 +113,15 @@ public class MRAMSimulator extends AMSimulator { public final Logger LOG = Logger.getLogger(MRAMSimulator.class); + @SuppressWarnings("checkstyle:parameternumber") public void init(int id, int heartbeatInterval, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, - boolean isTracked, String oldAppId) { + boolean isTracked, String oldAppId, ReservationSubmissionRequest rr, + long baselineStartTimeMS) { super.init(id, heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, - isTracked, oldAppId); + isTracked, oldAppId, rr, baselineStartTimeMS); amtype = "mapreduce"; // get map/reduce tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 20cf3e5..b4ffb61 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -65,6 +65,11 @@ public class MockAMLauncher extends ApplicationMasterLauncher // Do nothing } + @Override + protected void serviceStop() throws Exception { + // Do nothing + } + private void setupAMRMToken(RMAppAttempt appAttempt) { // Setup AMRMToken Token<AMRMTokenIdentifier> amrmToken = http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 7c37465..56190df 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.codahale.metrics.Timer; @@ -96,16 +94,6 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } catch (Exception e) { e.printStackTrace(); } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override public void run() { - try { - schedulerMetrics.tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); } } @@ -344,7 +332,6 @@ public class SLSCapacityScheduler extends CapacityScheduler implements initQueueMetrics(child); } } - @Override public void serviceInit(Configuration configuration) throws Exception { super.serviceInit(configuration); @@ -354,6 +341,17 @@ public class SLSCapacityScheduler extends CapacityScheduler implements } } + @Override + public void serviceStop() throws Exception { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + super.serviceStop(); + } + + public SchedulerMetrics getSchedulerMetrics() { return schedulerMetrics; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 572dacf..f740f5a 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; @@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; -import org.apache.hadoop.yarn.sls.utils.SLSUtils; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -90,16 +88,6 @@ public class SLSFairScheduler extends FairScheduler } catch (Exception e) { e.printStackTrace(); } - - ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override public void run() { - try { - schedulerMetrics.tearDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }, SLSUtils.SHUTDOWN_HOOK_PRIORITY); } } @@ -335,5 +323,15 @@ public class SLSFairScheduler extends FairScheduler initQueueMetrics(getQueueManager().getRootQueue()); } } + + @Override + public void serviceStop() throws Exception { + try { + schedulerMetrics.tearDown(); + } catch (Exception e) { + e.printStackTrace(); + } + super.serviceStop(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java index d352904..19cfe88 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.sls.scheduler; -import java.io.IOException; import java.text.MessageFormat; import java.util.Queue; import java.util.concurrent.DelayQueue; @@ -27,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.exceptions.YarnException; @Private @Unstable @@ -148,8 +146,8 @@ public class TaskRunner { @SuppressWarnings("unchecked") public void start() { - if (executor != null) { - throw new IllegalStateException("Already started"); + if (executor != null && !executor.isTerminated()) { + throw new IllegalStateException("Executor already running"); } DelayQueue preStartQueue = queue; @@ -164,8 +162,9 @@ public class TaskRunner { } } - public void stop() { + public void stop() throws InterruptedException { executor.shutdownNow(); + executor.awaitTermination(20, TimeUnit.SECONDS); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java new file mode 100644 index 0000000..3ed81e1 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskStatus.State; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.tools.rumen.*; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME; + +/** + * Generates random task data for a synthetic job. + */ +public class SynthJob implements JobStory { + + @SuppressWarnings("StaticVariableName") + private static Log LOG = LogFactory.getLog(SynthJob.class); + + private final Configuration conf; + private final int id; + + @SuppressWarnings("ConstantName") + private static final AtomicInteger sequence = new AtomicInteger(0); + private final String name; + private final String queueName; + private final SynthJobClass jobClass; + + // job timing + private final long submitTime; + private final long duration; + private final long deadline; + + private final int numMapTasks; + private final int numRedTasks; + private final long mapMaxMemory; + private final long reduceMaxMemory; + private final long mapMaxVcores; + private final long reduceMaxVcores; + private final long[] mapRuntime; + private final float[] reduceRuntime; + private long totMapRuntime; + private long totRedRuntime; + + public SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthJobClass jobClass, long actualSubmissionTime) { + + this.conf = conf; + this.jobClass = jobClass; + + this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); + this.numMapTasks = jobClass.getMtasks(); + this.numRedTasks = jobClass.getRtasks(); + + // sample memory distributions, correct for sub-minAlloc sizes + long tempMapMaxMemory = jobClass.getMapMaxMemory(); + this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB + ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory; + long tempReduceMaxMemory = jobClass.getReduceMaxMemory(); + this.reduceMaxMemory = + tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB + ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory; + + // sample vcores distributions, correct for sub-minAlloc sizes + long tempMapMaxVCores = jobClass.getMapMaxVcores(); + this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES + ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores; + long tempReduceMaxVcores = jobClass.getReduceMaxVcores(); + this.reduceMaxVcores = + tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES + ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores; + + if (numMapTasks > 0) { + conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); + conf.set(MRJobConfig.MAP_JAVA_OPTS, + "-Xmx" + (this.mapMaxMemory - 100) + "m"); + } + + if (numRedTasks > 0) { + conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); + conf.set(MRJobConfig.REDUCE_JAVA_OPTS, + "-Xmx" + (this.reduceMaxMemory - 100) + "m"); + } + + boolean hasDeadline = + (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + + LogNormalDistribution deadlineFactor = + SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, + jobClass.jobClass.deadline_factor_stddev); + + double deadlineFactorSample = + (deadlineFactor != null) ? deadlineFactor.sample() : -1; + + this.queueName = jobClass.workload.getQueueName(); + + this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); + + this.deadline = + hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + + (long) Math.ceil(deadlineFactorSample * duration) : -1; + + conf.set(QUEUE_NAME, queueName); + + // name and initialize job randomness + final long seed = rand.nextLong(); + rand.setSeed(seed); + id = sequence.getAndIncrement(); + + name = String.format(jobClass.getClassName() + "_%06d", id); + LOG.debug(name + " (" + seed + ")"); + + LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + + " deadline:" + deadline + " duration:" + duration + + " deadline-submission: " + (deadline - submitTime)); + + // generate map and reduce runtimes + mapRuntime = new long[numMapTasks]; + for (int i = 0; i < numMapTasks; i++) { + mapRuntime[i] = jobClass.getMapTimeSample(); + totMapRuntime += mapRuntime[i]; + } + reduceRuntime = new float[numRedTasks]; + for (int i = 0; i < numRedTasks; i++) { + reduceRuntime[i] = jobClass.getReduceTimeSample(); + totRedRuntime += (long) Math.ceil(reduceRuntime[i]); + } + } + + public boolean hasDeadline() { + return deadline > 0; + } + + @Override + public String getName() { + return name; + } + + @Override + public String getUser() { + return jobClass.getUserName(); + } + + @Override + public JobID getJobID() { + return new JobID("job_mock_" + name, id); + } + + @Override + public Values getOutcome() { + return Values.SUCCESS; + } + + @Override + public long getSubmissionTime() { + return submitTime; + } + + @Override + public int getNumberMaps() { + return numMapTasks; + } + + @Override + public int getNumberReduces() { + return numRedTasks; + } + + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch (taskType) { + case MAP: + return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); + case REDUCE: + return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); + default: + throw new IllegalArgumentException("Not interested"); + } + } + + @Override + public InputSplit[] getInputSplits() { + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, + int taskAttemptNumber) { + switch (taskType) { + case MAP: + return new MapTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); + + case REDUCE: + // We assume uniform split between pull/sort/reduce + // aligned with naive progress reporting assumptions + return new ReduceTaskAttemptInfo(State.SUCCEEDED, + getTaskInfo(taskType, taskNumber), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), + (long) Math.round((reduceRuntime[taskNumber] / 3)), null); + + default: + break; + } + throw new UnsupportedOperationException(); + } + + @Override + public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, + int taskAttemptNumber, int locality) { + throw new UnsupportedOperationException(); + } + + @Override + public org.apache.hadoop.mapred.JobConf getJobConf() { + return new JobConf(conf); + } + + @Override + public String getQueueName() { + return queueName; + } + + @Override + public String toString() { + return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() + + "\n" + " jobClass=" + + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n" + + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name + + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" + + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" + + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks + + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" + + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" + + " queueName=" + queueName + "\n" + "]"; + } + + public SynthJobClass getJobClass() { + return jobClass; + } + + public long getTotalSlotTime() { + return totMapRuntime + totRedRuntime; + } + + public long getDuration() { + return duration; + } + + public long getDeadline() { + return deadline; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJob)) { + return false; + } + SynthJob o = (SynthJob) other; + return Arrays.equals(mapRuntime, o.mapRuntime) + && Arrays.equals(reduceRuntime, o.reduceRuntime) + && submitTime == o.submitTime && numMapTasks == o.numMapTasks + && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory + && reduceMaxMemory == o.reduceMaxMemory + && mapMaxVcores == o.mapMaxVcores + && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) + && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime + && totRedRuntime == o.totRedRuntime; + } + + @Override + public int hashCode() { + // could have a bad distr; investigate if a relevant use case exists + return jobClass.hashCode() * (int) submitTime; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de69d6e8/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java new file mode 100644 index 0000000..439698f --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.sls.synthetic; + +import org.apache.commons.math3.distribution.AbstractRealDistribution; +import org.apache.commons.math3.distribution.LogNormalDistribution; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; +import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; + +/** + * This is a class that represent a class of Jobs. It is used to generate an + * individual job, by picking random durations, task counts, container size, + * etc. + */ +public class SynthJobClass { + + private final JDKRandomGenerator rand; + private final LogNormalDistribution dur; + private final LogNormalDistribution mapRuntime; + private final LogNormalDistribution redRuntime; + private final LogNormalDistribution mtasks; + private final LogNormalDistribution rtasks; + private final LogNormalDistribution mapMem; + private final LogNormalDistribution redMem; + private final LogNormalDistribution mapVcores; + private final LogNormalDistribution redVcores; + + private final Trace trace; + @SuppressWarnings("VisibilityModifier") + protected final SynthWorkload workload; + @SuppressWarnings("VisibilityModifier") + protected final JobClass jobClass; + + public SynthJobClass(JDKRandomGenerator rand, Trace trace, + SynthWorkload workload, int classId) { + + this.trace = trace; + this.workload = workload; + this.rand = new JDKRandomGenerator(); + this.rand.setSeed(rand.nextLong()); + jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); + + this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, + jobClass.dur_stddev); + this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, + jobClass.mtime_stddev); + this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, + jobClass.rtime_stddev); + this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, + jobClass.mtasks_stddev); + this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, + jobClass.rtasks_stddev); + + this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, + jobClass.map_max_memory_stddev); + this.redMem = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); + this.mapVcores = SynthUtils.getLogNormalDist(rand, + jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); + this.redVcores = SynthUtils.getLogNormalDist(rand, + jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); + } + + public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { + return new SynthJob(rand, conf, this, actualSubmissionTime); + } + + @Override + public String toString() { + return "SynthJobClass [workload=" + workload.getName() + ", class=" + + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" + + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" + + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) + + ", redRuntime=" + + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) + + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) + + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) + + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; + + } + + public double getClassWeight() { + return jobClass.class_weight; + } + + public long getDur() { + return genLongSample(dur); + } + + public int getMtasks() { + return genIntSample(mtasks); + } + + public int getRtasks() { + return genIntSample(rtasks); + } + + public long getMapMaxMemory() { + return genLongSample(mapMem); + } + + public long getReduceMaxMemory() { + return genLongSample(redMem); + } + + public long getMapMaxVcores() { + return genLongSample(mapVcores); + } + + public long getReduceMaxVcores() { + return genLongSample(redVcores); + } + + public SynthWorkload getWorkload() { + return workload; + } + + public int genIntSample(AbstractRealDistribution dist) { + if (dist == null) { + return 0; + } + double baseSample = dist.sample(); + if (baseSample < 0) { + baseSample = 0; + } + return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); + } + + public long genLongSample(AbstractRealDistribution dist) { + return dist != null ? (long) Math.ceil(dist.sample()) : 0; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJobClass)) { + return false; + } + SynthJobClass o = (SynthJobClass) other; + return workload.equals(o.workload); + } + + @Override + public int hashCode() { + return workload.hashCode() * workload.getId(); + } + + public String getClassName() { + return jobClass.class_name; + } + + public long getMapTimeSample() { + return genLongSample(mapRuntime); + } + + public long getReduceTimeSample() { + return genLongSample(redRuntime); + } + + public String getUserName() { + return jobClass.user_name; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
