MAPREDUCE-6546. reconcile the two versions of the timeline service performance tests. (Sangjin Lee via Naganarasimha G R)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd444089 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd444089 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd444089 Branch: refs/heads/YARN-2928 Commit: cd4440899a2cebbbfb15e18228d3821961319986 Parents: 34248bd Author: naganarasimha <naganarasimha...@apache.com> Authored: Wed Mar 9 11:20:32 2016 +0530 Committer: Li Lu <gtcarre...@apache.org> Committed: Wed May 4 16:35:05 2016 -0700 ---------------------------------------------------------------------- .../hadoop/mapred/JobHistoryFileParser.java | 53 ---- .../mapred/JobHistoryFileReplayMapper.java | 301 ------------------- .../hadoop/mapred/SimpleEntityWriter.java | 140 --------- .../hadoop/mapred/TimelineEntityConverter.java | 211 ------------- .../mapred/TimelineServicePerformanceV2.java | 229 -------------- .../apache/hadoop/mapreduce/EntityWriterV2.java | 56 ++++ .../mapreduce/JobHistoryFileReplayMapperV1.java | 14 +- .../mapreduce/JobHistoryFileReplayMapperV2.java | 161 ++++++++++ .../mapreduce/SimpleEntityWriterConstants.java | 43 +++ .../hadoop/mapreduce/SimpleEntityWriterV1.java | 28 +- .../hadoop/mapreduce/SimpleEntityWriterV2.java | 131 ++++++++ .../mapreduce/TimelineEntityConverterV1.java | 5 - .../mapreduce/TimelineEntityConverterV2.java | 211 +++++++++++++ .../mapreduce/TimelineServicePerformance.java | 129 +++++--- .../apache/hadoop/test/MapredTestDriver.java | 35 +-- 15 files changed, 704 insertions(+), 1043 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java deleted file mode 100644 index 9d051df..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileParser.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; - -class JobHistoryFileParser { - private static final Log LOG = LogFactory.getLog(JobHistoryFileParser.class); - - private final FileSystem fs; - - public JobHistoryFileParser(FileSystem fs) { - LOG.info("JobHistoryFileParser created with " + fs); - this.fs = fs; - } - - public JobInfo parseHistoryFile(Path path) throws IOException { - LOG.info("parsing job history file " + path); - JobHistoryParser parser = new JobHistoryParser(fs, path); - return parser.parse(); - } - - public Configuration parseConfiguration(Path path) throws IOException { - LOG.info("parsing job configuration file " + path); - Configuration conf = new Configuration(false); - conf.addResource(fs.open(path)); - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java deleted file mode 100644 index 4fb5308..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/JobHistoryFileReplayMapper.java +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter; -import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TypeConverter; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; - -/** - * Mapper for TimelineServicePerformanceV2 that replays job history files to the - * timeline service. - * - */ -class JobHistoryFileReplayMapper extends EntityWriter { - private static final Log LOG = - LogFactory.getLog(JobHistoryFileReplayMapper.class); - - static final String PROCESSING_PATH = "processing path"; - static final String REPLAY_MODE = "replay mode"; - static final int WRITE_ALL_AT_ONCE = 1; - static final int WRITE_PER_ENTITY = 2; - static final int REPLAY_MODE_DEFAULT = WRITE_ALL_AT_ONCE; - - private static final Pattern JOB_ID_PARSER = - Pattern.compile("^(job_[0-9]+_([0-9]+)).*"); - - public static class JobFiles { - private final String jobId; - private Path jobHistoryFilePath; - private Path jobConfFilePath; - - public JobFiles(String jobId) { - this.jobId = jobId; - } - - public String getJobId() { - return jobId; - } - - public Path getJobHistoryFilePath() { - return jobHistoryFilePath; - } - - public void setJobHistoryFilePath(Path jobHistoryFilePath) { - this.jobHistoryFilePath = jobHistoryFilePath; - } - - public Path getJobConfFilePath() { - return jobConfFilePath; - } - - public void setJobConfFilePath(Path jobConfFilePath) { - this.jobConfFilePath = jobConfFilePath; - } - - @Override - public int hashCode() { - return jobId.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - JobFiles other = (JobFiles) obj; - return jobId.equals(other.jobId); - } - } - - private enum FileType { JOB_HISTORY_FILE, JOB_CONF_FILE, UNKNOWN } - - - @Override - protected void writeEntities(Configuration tlConf, - TimelineCollectorManager manager, Context context) throws IOException { - // collect the apps it needs to process - Configuration conf = context.getConfiguration(); - int taskId = context.getTaskAttemptID().getTaskID().getId(); - int size = conf.getInt(MRJobConfig.NUM_MAPS, - TimelineServicePerformanceV2.NUM_MAPS_DEFAULT); - String processingDir = - conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH); - int replayMode = - conf.getInt(JobHistoryFileReplayMapper.REPLAY_MODE, - JobHistoryFileReplayMapper.REPLAY_MODE_DEFAULT); - Path processingPath = new Path(processingDir); - FileSystem processingFs = processingPath.getFileSystem(conf); - JobHistoryFileParser parser = new JobHistoryFileParser(processingFs); - TimelineEntityConverter converter = new TimelineEntityConverter(); - - Collection<JobFiles> jobs = - selectJobFiles(processingFs, processingPath, taskId, size); - if (jobs.isEmpty()) { - LOG.info(context.getTaskAttemptID().getTaskID() + - " will process no jobs"); - } else { - LOG.info(context.getTaskAttemptID().getTaskID() + " will process " + - jobs.size() + " jobs"); - } - for (JobFiles job: jobs) { - // process each job - String jobIdStr = job.getJobId(); - LOG.info("processing " + jobIdStr + "..."); - JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr)); - ApplicationId appId = jobId.getAppId(); - - // create the app level timeline collector and start it - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); - manager.putIfAbsent(appId, collector); - try { - // parse the job info and configuration - JobInfo jobInfo = - parser.parseHistoryFile(job.getJobHistoryFilePath()); - Configuration jobConf = - parser.parseConfiguration(job.getJobConfFilePath()); - LOG.info("parsed the job history file and the configuration file for job" - + jobIdStr); - - // set the context - // flow id: job name, flow run id: timestamp, user id - TimelineCollectorContext tlContext = - collector.getTimelineEntityContext(); - tlContext.setFlowName(jobInfo.getJobname()); - tlContext.setFlowRunId(jobInfo.getSubmitTime()); - tlContext.setUserId(jobInfo.getUsername()); - - // create entities from job history and write them - long totalTime = 0; - List<TimelineEntity> entitySet = - converter.createTimelineEntities(jobInfo, jobConf); - LOG.info("converted them into timeline entities for job " + jobIdStr); - // use the current user for this purpose - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - long startWrite = System.nanoTime(); - try { - switch (replayMode) { - case JobHistoryFileReplayMapper.WRITE_ALL_AT_ONCE: - writeAllEntities(collector, entitySet, ugi); - break; - case JobHistoryFileReplayMapper.WRITE_PER_ENTITY: - writePerEntity(collector, entitySet, ugi); - break; - default: - break; - } - } catch (Exception e) { - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). - increment(1); - LOG.error("writing to the timeline service failed", e); - } - long endWrite = System.nanoTime(); - totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); - int numEntities = entitySet.size(); - LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms"); - - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). - increment(totalTime); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). - increment(numEntities); - } finally { - manager.remove(appId); - context.progress(); // move it along - } - } - } - - private void writeAllEntities(AppLevelTimelineCollector collector, - List<TimelineEntity> entitySet, UserGroupInformation ugi) - throws IOException { - TimelineEntities entities = new TimelineEntities(); - entities.setEntities(entitySet); - collector.putEntities(entities, ugi); - } - - private void writePerEntity(AppLevelTimelineCollector collector, - List<TimelineEntity> entitySet, UserGroupInformation ugi) - throws IOException { - for (TimelineEntity entity : entitySet) { - TimelineEntities entities = new TimelineEntities(); - entities.addEntity(entity); - collector.putEntities(entities, ugi); - LOG.info("wrote entity " + entity.getId()); - } - } - - private Collection<JobFiles> selectJobFiles(FileSystem fs, - Path processingRoot, int i, int size) throws IOException { - Map<String,JobFiles> jobs = new HashMap<>(); - RemoteIterator<LocatedFileStatus> it = fs.listFiles(processingRoot, true); - while (it.hasNext()) { - LocatedFileStatus status = it.next(); - Path path = status.getPath(); - String fileName = path.getName(); - Matcher m = JOB_ID_PARSER.matcher(fileName); - if (!m.matches()) { - continue; - } - String jobId = m.group(1); - int lastId = Integer.parseInt(m.group(2)); - int mod = lastId % size; - if (mod != i) { - continue; - } - LOG.info("this mapper will process file " + fileName); - // it's mine - JobFiles jobFiles = jobs.get(jobId); - if (jobFiles == null) { - jobFiles = new JobFiles(jobId); - jobs.put(jobId, jobFiles); - } - setFilePath(fileName, path, jobFiles); - } - return jobs.values(); - } - - private void setFilePath(String fileName, Path path, - JobFiles jobFiles) { - // determine if we're dealing with a job history file or a job conf file - FileType type = getFileType(fileName); - switch (type) { - case JOB_HISTORY_FILE: - if (jobFiles.getJobHistoryFilePath() == null) { - jobFiles.setJobHistoryFilePath(path); - } else { - LOG.warn("we already have the job history file " + - jobFiles.getJobHistoryFilePath() + ": skipping " + path); - } - break; - case JOB_CONF_FILE: - if (jobFiles.getJobConfFilePath() == null) { - jobFiles.setJobConfFilePath(path); - } else { - LOG.warn("we already have the job conf file " + - jobFiles.getJobConfFilePath() + ": skipping " + path); - } - break; - case UNKNOWN: - LOG.warn("unknown type: " + path); - } - } - - private FileType getFileType(String fileName) { - if (fileName.endsWith(".jhist")) { - return FileType.JOB_HISTORY_FILE; - } - if (fileName.endsWith("_conf.xml")) { - return FileType.JOB_CONF_FILE; - } - return FileType.UNKNOWN; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java deleted file mode 100644 index 625c32a..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/SimpleEntityWriter.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.TimelineServicePerformanceV2.EntityWriter; -import org.apache.hadoop.mapred.TimelineServicePerformanceV2.PerfCounters; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; - -/** - * Adds simple entities with random string payload, events, metrics, and - * configuration. - */ -class SimpleEntityWriter extends EntityWriter { - private static final Log LOG = LogFactory.getLog(SimpleEntityWriter.class); - - // constants for mtype = 1 - static final String KBS_SENT = "kbs sent"; - static final int KBS_SENT_DEFAULT = 1; - static final String TEST_TIMES = "testtimes"; - static final int TEST_TIMES_DEFAULT = 100; - static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = - "timeline.server.performance.run.id"; - - protected void writeEntities(Configuration tlConf, - TimelineCollectorManager manager, Context context) throws IOException { - Configuration conf = context.getConfiguration(); - // simulate the app id with the task id - int taskId = context.getTaskAttemptID().getTaskID().getId(); - long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); - ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); - - // create the app level timeline collector - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(appId); - manager.putIfAbsent(appId, collector); - - try { - // set the context - // flow id: job name, flow run id: timestamp, user id - TimelineCollectorContext tlContext = - collector.getTimelineEntityContext(); - tlContext.setFlowName(context.getJobName()); - tlContext.setFlowRunId(timestamp); - tlContext.setUserId(context.getUser()); - - final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT); - - long totalTime = 0; - final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT); - final Random rand = new Random(); - final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); - final char[] payLoad = new char[kbs * 1024]; - - for (int i = 0; i < testtimes; i++) { - // Generate a fixed length random payload - for (int xx = 0; xx < kbs * 1024; xx++) { - int alphaNumIdx = - rand.nextInt(TimelineServicePerformanceV2.alphaNums.length); - payLoad[xx] = TimelineServicePerformanceV2.alphaNums[alphaNumIdx]; - } - String entId = taskAttemptId + "_" + Integer.toString(i); - final TimelineEntity entity = new TimelineEntity(); - entity.setId(entId); - entity.setType("FOO_ATTEMPT"); - entity.addInfo("PERF_TEST", payLoad); - // add an event - TimelineEvent event = new TimelineEvent(); - event.setId("foo_event_id"); - event.setTimestamp(System.currentTimeMillis()); - event.addInfo("foo_event", "test"); - entity.addEvent(event); - // add a metric - TimelineMetric metric = new TimelineMetric(); - metric.setId("foo_metric"); - metric.addValue(System.currentTimeMillis(), 123456789L); - entity.addMetric(metric); - // add a config - entity.addConfig("foo", "bar"); - - TimelineEntities entities = new TimelineEntities(); - entities.addEntity(entity); - // use the current user for this purpose - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - long startWrite = System.nanoTime(); - try { - collector.putEntities(entities, ugi); - } catch (Exception e) { - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). - increment(1); - LOG.error("writing to the timeline service failed", e); - } - long endWrite = System.nanoTime(); - totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); - } - LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + - " kB) in " + totalTime + " ms"); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). - increment(totalTime); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). - increment(testtimes); - context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). - increment(kbs*testtimes); - } finally { - // clean up - manager.remove(appId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java deleted file mode 100644 index 0e2eb72..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineEntityConverter.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; -import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; - -class TimelineEntityConverter { - private static final Log LOG = - LogFactory.getLog(TimelineEntityConverter.class); - - static final String JOB = "MAPREDUCE_JOB"; - static final String TASK = "MAPREDUCE_TASK"; - static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT"; - - /** - * Creates job, task, and task attempt entities based on the job history info - * and configuration. - * - * Note: currently these are plan timeline entities created for mapreduce - * types. These are not meant to be the complete and accurate entity set-up - * for mapreduce jobs. We do not leverage hierarchical timeline entities. If - * we create canonical mapreduce hierarchical timeline entities with proper - * parent-child relationship, we could modify this to use that instead. - * - * Note that we also do not add info to the YARN application entity, which - * would be needed for aggregation. - */ - public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo, - Configuration conf) { - List<TimelineEntity> entities = new ArrayList<>(); - - // create the job entity - TimelineEntity job = createJobEntity(jobInfo, conf); - entities.add(job); - - // create the task and task attempt entities - List<TimelineEntity> tasksAndAttempts = - createTaskAndTaskAttemptEntities(jobInfo); - entities.addAll(tasksAndAttempts); - - return entities; - } - - private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) { - TimelineEntity job = new TimelineEntity(); - job.setType(JOB); - job.setId(jobInfo.getJobId().toString()); - job.setCreatedTime(jobInfo.getSubmitTime()); - - job.addInfo("JOBNAME", jobInfo.getJobname()); - job.addInfo("USERNAME", jobInfo.getUsername()); - job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName()); - job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime()); - job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime()); - job.addInfo("FINISH_TIME", jobInfo.getFinishTime()); - job.addInfo("JOB_STATUS", jobInfo.getJobStatus()); - job.addInfo("PRIORITY", jobInfo.getPriority()); - job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps()); - job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces()); - job.addInfo("UBERIZED", jobInfo.getUberized()); - job.addInfo("ERROR_INFO", jobInfo.getErrorInfo()); - - // add metrics from total counters - // we omit the map counters and reduce counters for now as it's kind of - // awkward to put them (map/reduce/total counters are really a group of - // related counters) - Counters totalCounters = jobInfo.getTotalCounters(); - if (totalCounters != null) { - addMetrics(job, totalCounters); - } - // finally add configuration to the job - addConfiguration(job, conf); - LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity"); - return job; - } - - private void addConfiguration(TimelineEntity job, Configuration conf) { - for (Map.Entry<String,String> e: conf) { - job.addConfig(e.getKey(), e.getValue()); - } - } - - private void addMetrics(TimelineEntity entity, Counters counters) { - for (CounterGroup g: counters) { - String groupName = g.getName(); - for (Counter c: g) { - String name = groupName + ":" + c.getName(); - TimelineMetric metric = new TimelineMetric(); - metric.setId(name); - metric.addValue(System.currentTimeMillis(), c.getValue()); - entity.addMetric(metric); - } - } - } - - private List<TimelineEntity> createTaskAndTaskAttemptEntities( - JobInfo jobInfo) { - List<TimelineEntity> entities = new ArrayList<>(); - Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks(); - LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + - " tasks"); - for (TaskInfo taskInfo: taskInfoMap.values()) { - TimelineEntity task = createTaskEntity(taskInfo); - entities.add(task); - // add the task attempts from this task - Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo); - entities.addAll(taskAttempts); - } - return entities; - } - - private TimelineEntity createTaskEntity(TaskInfo taskInfo) { - TimelineEntity task = new TimelineEntity(); - task.setType(TASK); - task.setId(taskInfo.getTaskId().toString()); - task.setCreatedTime(taskInfo.getStartTime()); - - task.addInfo("START_TIME", taskInfo.getStartTime()); - task.addInfo("FINISH_TIME", taskInfo.getFinishTime()); - task.addInfo("TASK_TYPE", taskInfo.getTaskType()); - task.addInfo("TASK_STATUS", taskInfo.getTaskStatus()); - task.addInfo("ERROR_INFO", taskInfo.getError()); - - // add metrics from counters - Counters counters = taskInfo.getCounters(); - if (counters != null) { - addMetrics(task, counters); - } - LOG.info("converted task " + taskInfo.getTaskId() + - " to a timeline entity"); - return task; - } - - private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) { - Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>(); - Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap = - taskInfo.getAllTaskAttempts(); - LOG.info("task " + taskInfo.getTaskId() + " has " + - taskAttemptInfoMap.size() + " task attempts"); - for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) { - TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo); - taskAttempts.add(taskAttempt); - } - return taskAttempts; - } - - private TimelineEntity createTaskAttemptEntity( - TaskAttemptInfo taskAttemptInfo) { - TimelineEntity taskAttempt = new TimelineEntity(); - taskAttempt.setType(TASK_ATTEMPT); - taskAttempt.setId(taskAttemptInfo.getAttemptId().toString()); - taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime()); - - taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime()); - taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime()); - taskAttempt.addInfo("MAP_FINISH_TIME", - taskAttemptInfo.getMapFinishTime()); - taskAttempt.addInfo("SHUFFLE_FINISH_TIME", - taskAttemptInfo.getShuffleFinishTime()); - taskAttempt.addInfo("SORT_FINISH_TIME", - taskAttemptInfo.getSortFinishTime()); - taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus()); - taskAttempt.addInfo("STATE", taskAttemptInfo.getState()); - taskAttempt.addInfo("ERROR", taskAttemptInfo.getError()); - taskAttempt.addInfo("CONTAINER_ID", - taskAttemptInfo.getContainerId().toString()); - - // add metrics from counters - Counters counters = taskAttemptInfo.getCounters(); - if (counters != null) { - addMetrics(taskAttempt, counters); - } - LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() + - " to a timeline entity"); - return taskAttempt; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java deleted file mode 100644 index f674ae1..0000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.IOException; -import java.util.Date; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; - -public class TimelineServicePerformanceV2 extends Configured implements Tool { - static final int NUM_MAPS_DEFAULT = 1; - - static final int SIMPLE_ENTITY_WRITER = 1; - static final int JOB_HISTORY_FILE_REPLAY_MAPPER = 2; - static int mapperType = SIMPLE_ENTITY_WRITER; - - protected static int printUsage() { - System.err.println( - "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT + - ")\n" + - " [-mtype <mapper type in integer>]\n" + - " 1. simple entity write mapper\n" + - " 2. job history file replay mapper\n" + - " [-s <(KBs)test>] number of KB per put (mtype=1, default: " + - SimpleEntityWriter.KBS_SENT_DEFAULT + " KB)\n" + - " [-t] package sending iterations per mapper (mtype=1, default: " + - SimpleEntityWriter.TEST_TIMES_DEFAULT + ")\n" + - " [-d <path>] root path of job history files (mtype=2)\n" + - " [-r <replay mode>] (mtype=2)\n" + - " 1. write all entities for a job in one put (default)\n" + - " 2. write one entity at a time\n"); - GenericOptionsParser.printGenericCommandUsage(System.err); - return -1; - } - - /** - * Configure a job given argv. - */ - public static boolean parseArgs(String[] args, Job job) throws IOException { - // set the common defaults - Configuration conf = job.getConfiguration(); - conf.setInt(MRJobConfig.NUM_MAPS, NUM_MAPS_DEFAULT); - - for (int i = 0; i < args.length; i++) { - if (args.length == i + 1) { - System.out.println("ERROR: Required parameter missing from " + args[i]); - return printUsage() == 0; - } - try { - if ("-m".equals(args[i])) { - if (Integer.parseInt(args[++i]) > 0) { - job.getConfiguration() - .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i])); - } - } else if ("-mtype".equals(args[i])) { - mapperType = Integer.parseInt(args[++i]); - } else if ("-s".equals(args[i])) { - if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(SimpleEntityWriter.KBS_SENT, Integer.parseInt(args[i])); - } - } else if ("-t".equals(args[i])) { - if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(SimpleEntityWriter.TEST_TIMES, - Integer.parseInt(args[i])); - } - } else if ("-d".equals(args[i])) { - conf.set(JobHistoryFileReplayMapper.PROCESSING_PATH, args[++i]); - } else if ("-r".equals(args[i])) { - conf.setInt(JobHistoryFileReplayMapper.REPLAY_MODE, - Integer.parseInt(args[++i])); - } else { - System.out.println("Unexpected argument: " + args[i]); - return printUsage() == 0; - } - } catch (NumberFormatException except) { - System.out.println("ERROR: Integer expected instead of " + args[i]); - return printUsage() == 0; - } catch (Exception e) { - throw (IOException)new IOException().initCause(e); - } - } - - // handle mapper-specific settings - switch (mapperType) { - case JOB_HISTORY_FILE_REPLAY_MAPPER: - job.setMapperClass(JobHistoryFileReplayMapper.class); - String processingPath = - conf.get(JobHistoryFileReplayMapper.PROCESSING_PATH); - if (processingPath == null || processingPath.isEmpty()) { - System.out.println("processing path is missing while mtype = 2"); - return printUsage() == 0; - } - break; - case SIMPLE_ENTITY_WRITER: - default: - job.setMapperClass(SimpleEntityWriter.class); - // use the current timestamp as the "run id" of the test: this will - // be used as simulating the cluster timestamp for apps - conf.setLong(SimpleEntityWriter.TIMELINE_SERVICE_PERFORMANCE_RUN_ID, - System.currentTimeMillis()); - break; - } - - return true; - } - - /** - * TimelineServer Performance counters - */ - static enum PerfCounters { - TIMELINE_SERVICE_WRITE_TIME, - TIMELINE_SERVICE_WRITE_COUNTER, - TIMELINE_SERVICE_WRITE_FAILURES, - TIMELINE_SERVICE_WRITE_KBS, - } - - public int run(String[] args) throws Exception { - - Job job = Job.getInstance(getConf()); - job.setJarByClass(TimelineServicePerformanceV2.class); - job.setMapperClass(SimpleEntityWriter.class); - job.setInputFormatClass(SleepInputFormat.class); - job.setOutputFormatClass(NullOutputFormat.class); - job.setNumReduceTasks(0); - if (!parseArgs(args, job)) { - return -1; - } - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); - long writetime = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue(); - long writecounts = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue(); - long writesize = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); - double transacrate = writecounts * 1000 / (double)writetime; - double iorate = writesize * 1000 / (double)writetime; - int numMaps = - Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS)); - - System.out.println("TRANSACTION RATE (per mapper): " + transacrate + - " ops/s"); - System.out.println("IO RATE (per mapper): " + iorate + " KB/s"); - - System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps + - " ops/s"); - System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s"); - - return ret; - } - - public static void main(String[] args) throws Exception { - int res = - ToolRunner.run(new Configuration(), new TimelineServicePerformanceV2(), - args); - System.exit(res); - } - - /** - * To ensure that the compression really gets exercised, generate a - * random alphanumeric fixed length payload - */ - static final char[] alphaNums = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', - 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', - 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', - 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', - 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', - '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; - - /** - * Base mapper for writing entities to the timeline service. Subclasses - * override {@link #writeEntities(Configuration, TimelineCollectorManager, - * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities - * to the timeline service. - */ - public static abstract class EntityWriter - extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> { - @Override - public void map(IntWritable key, IntWritable val, Context context) - throws IOException { - - // create the timeline collector manager wired with the writer - Configuration tlConf = new YarnConfiguration(); - TimelineCollectorManager manager = new TimelineCollectorManager("test"); - manager.init(tlConf); - manager.start(); - try { - // invoke the method to have the subclass write entities - writeEntities(tlConf, manager, context); - } finally { - manager.close(); - } - } - - protected abstract void writeEntities(Configuration tlConf, - TimelineCollectorManager manager, Context context) throws IOException; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java new file mode 100644 index 0000000..f5d95c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/EntityWriterV2.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Base mapper for writing entities to the timeline service. Subclasses + * override {@link #writeEntities(Configuration, TimelineCollectorManager, + * org.apache.hadoop.mapreduce.Mapper.Context)} to create and write entities + * to the timeline service. + */ +abstract class EntityWriterV2 + extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> { + @Override + public void map(IntWritable key, IntWritable val, Context context) + throws IOException { + + // create the timeline collector manager wired with the writer + Configuration tlConf = new YarnConfiguration(); + TimelineCollectorManager manager = new TimelineCollectorManager("test"); + manager.init(tlConf); + manager.start(); + try { + // invoke the method to have the subclass write entities + writeEntities(tlConf, manager, context); + } finally { + manager.close(); + } + } + + protected abstract void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java index 5e10662..447ea4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV1.java @@ -20,33 +20,21 @@ package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Mapper.Context; -import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; -import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper; import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java new file mode 100644 index 0000000..6a9a878 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/JobHistoryFileReplayMapperV2.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.JobHistoryFileReplayHelper.JobFiles; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Mapper for TimelineServicePerformance that replays job history files to the + * timeline service v.2. + * + */ +class JobHistoryFileReplayMapperV2 extends EntityWriterV2 { + private static final Log LOG = + LogFactory.getLog(JobHistoryFileReplayMapperV2.class); + + @Override + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + JobHistoryFileReplayHelper helper = new JobHistoryFileReplayHelper(context); + int replayMode = helper.getReplayMode(); + JobHistoryFileParser parser = helper.getParser(); + TimelineEntityConverterV2 converter = new TimelineEntityConverterV2(); + + // collect the apps it needs to process + Collection<JobFiles> jobs = helper.getJobFiles(); + if (jobs.isEmpty()) { + LOG.info(context.getTaskAttemptID().getTaskID() + + " will process no jobs"); + } else { + LOG.info(context.getTaskAttemptID().getTaskID() + " will process " + + jobs.size() + " jobs"); + } + for (JobFiles job: jobs) { + // process each job + String jobIdStr = job.getJobId(); + // skip if either of the file is missing + if (job.getJobConfFilePath() == null || + job.getJobHistoryFilePath() == null) { + LOG.info(jobIdStr + " missing either the job history file or the " + + "configuration file. Skipping."); + continue; + } + LOG.info("processing " + jobIdStr + "..."); + JobId jobId = TypeConverter.toYarn(JobID.forName(jobIdStr)); + ApplicationId appId = jobId.getAppId(); + + // create the app level timeline collector and start it + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + try { + // parse the job info and configuration + JobInfo jobInfo = + parser.parseHistoryFile(job.getJobHistoryFilePath()); + Configuration jobConf = + parser.parseConfiguration(job.getJobConfFilePath()); + LOG.info("parsed the job history file and the configuration file for job" + + jobIdStr); + + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(jobInfo.getJobname()); + tlContext.setFlowRunId(jobInfo.getSubmitTime()); + tlContext.setUserId(jobInfo.getUsername()); + + // create entities from job history and write them + long totalTime = 0; + List<TimelineEntity> entitySet = + converter.createTimelineEntities(jobInfo, jobConf); + LOG.info("converted them into timeline entities for job " + jobIdStr); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + switch (replayMode) { + case JobHistoryFileReplayHelper.WRITE_ALL_AT_ONCE: + writeAllEntities(collector, entitySet, ugi); + break; + case JobHistoryFileReplayHelper.WRITE_PER_ENTITY: + writePerEntity(collector, entitySet, ugi); + break; + default: + break; + } + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + int numEntities = entitySet.size(); + LOG.info("wrote " + numEntities + " entities in " + totalTime + " ms"); + + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(numEntities); + } finally { + manager.remove(appId); + context.progress(); // move it along + } + } + } + + private void writeAllEntities(AppLevelTimelineCollector collector, + List<TimelineEntity> entitySet, UserGroupInformation ugi) + throws IOException { + TimelineEntities entities = new TimelineEntities(); + entities.setEntities(entitySet); + collector.putEntities(entities, ugi); + } + + private void writePerEntity(AppLevelTimelineCollector collector, + List<TimelineEntity> entitySet, UserGroupInformation ugi) + throws IOException { + for (TimelineEntity entity : entitySet) { + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + collector.putEntities(entities, ugi); + LOG.info("wrote entity " + entity.getId()); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java new file mode 100644 index 0000000..b89d0e8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterConstants.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +/** + * Constants for simple entity writers. + */ +interface SimpleEntityWriterConstants { + // constants for mtype = 1 + String KBS_SENT = "kbs sent"; + int KBS_SENT_DEFAULT = 1; + String TEST_TIMES = "testtimes"; + int TEST_TIMES_DEFAULT = 100; + String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = + "timeline.server.performance.run.id"; + + /** + * To ensure that the compression really gets exercised, generate a + * random alphanumeric fixed length payload + */ + char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', + 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', + 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', + 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', + 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', + '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java index 2c851e9..b10ae04 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV1.java @@ -27,44 +27,22 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; -import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Adds simple entities with random string payload, events, metrics, and * configuration. */ -class SimpleEntityWriterV1 extends - org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> { +class SimpleEntityWriterV1 + extends org.apache.hadoop.mapreduce.Mapper<IntWritable,IntWritable,Writable,Writable> + implements SimpleEntityWriterConstants { private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV1.class); - // constants for mtype = 1 - static final String KBS_SENT = "kbs sent"; - static final int KBS_SENT_DEFAULT = 1; - static final String TEST_TIMES = "testtimes"; - static final int TEST_TIMES_DEFAULT = 100; - static final String TIMELINE_SERVICE_PERFORMANCE_RUN_ID = - "timeline.server.performance.run.id"; - /** - * To ensure that the compression really gets exercised, generate a - * random alphanumeric fixed length payload - */ - private static char[] ALPHA_NUMS = new char[] { 'a', 'b', 'c', 'd', 'e', 'f', - 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', - 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', - 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', - 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '1', '2', - '3', '4', '5', '6', '7', '8', '9', '0', ' ' }; - public void map(IntWritable key, IntWritable val, Context context) throws IOException { TimelineClient tlc = new TimelineClientImpl(); Configuration conf = context.getConfiguration(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java new file mode 100644 index 0000000..d66deb0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/SimpleEntityWriterV2.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TimelineServicePerformance.PerfCounters; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager; + +/** + * Adds simple entities with random string payload, events, metrics, and + * configuration. + */ +class SimpleEntityWriterV2 extends EntityWriterV2 + implements SimpleEntityWriterConstants { + private static final Log LOG = LogFactory.getLog(SimpleEntityWriterV2.class); + + protected void writeEntities(Configuration tlConf, + TimelineCollectorManager manager, Context context) throws IOException { + Configuration conf = context.getConfiguration(); + // simulate the app id with the task id + int taskId = context.getTaskAttemptID().getTaskID().getId(); + long timestamp = conf.getLong(TIMELINE_SERVICE_PERFORMANCE_RUN_ID, 0); + ApplicationId appId = ApplicationId.newInstance(timestamp, taskId); + + // create the app level timeline collector + AppLevelTimelineCollector collector = + new AppLevelTimelineCollector(appId); + manager.putIfAbsent(appId, collector); + + try { + // set the context + // flow id: job name, flow run id: timestamp, user id + TimelineCollectorContext tlContext = + collector.getTimelineEntityContext(); + tlContext.setFlowName(context.getJobName()); + tlContext.setFlowRunId(timestamp); + tlContext.setUserId(context.getUser()); + + final int kbs = conf.getInt(KBS_SENT, KBS_SENT_DEFAULT); + + long totalTime = 0; + final int testtimes = conf.getInt(TEST_TIMES, TEST_TIMES_DEFAULT); + final Random rand = new Random(); + final TaskAttemptID taskAttemptId = context.getTaskAttemptID(); + final char[] payLoad = new char[kbs * 1024]; + + for (int i = 0; i < testtimes; i++) { + // Generate a fixed length random payload + for (int xx = 0; xx < kbs * 1024; xx++) { + int alphaNumIdx = + rand.nextInt(ALPHA_NUMS.length); + payLoad[xx] = ALPHA_NUMS[alphaNumIdx]; + } + String entId = taskAttemptId + "_" + Integer.toString(i); + final TimelineEntity entity = new TimelineEntity(); + entity.setId(entId); + entity.setType("FOO_ATTEMPT"); + entity.addInfo("PERF_TEST", payLoad); + // add an event + TimelineEvent event = new TimelineEvent(); + event.setId("foo_event_id"); + event.setTimestamp(System.currentTimeMillis()); + event.addInfo("foo_event", "test"); + entity.addEvent(event); + // add a metric + TimelineMetric metric = new TimelineMetric(); + metric.setId("foo_metric"); + metric.addValue(System.currentTimeMillis(), 123456789L); + entity.addMetric(metric); + // add a config + entity.addConfig("foo", "bar"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entity); + // use the current user for this purpose + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + long startWrite = System.nanoTime(); + try { + collector.putEntities(entities, ugi); + } catch (Exception e) { + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES). + increment(1); + LOG.error("writing to the timeline service failed", e); + } + long endWrite = System.nanoTime(); + totalTime += TimeUnit.NANOSECONDS.toMillis(endWrite-startWrite); + } + LOG.info("wrote " + testtimes + " entities (" + kbs*testtimes + + " kB) in " + totalTime + " ms"); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME). + increment(totalTime); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER). + increment(testtimes); + context.getCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS). + increment(kbs*testtimes); + } finally { + // clean up + manager.remove(appId); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java index 79d123e..4d8b74b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV1.java @@ -25,11 +25,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.CounterGroup; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java new file mode 100644 index 0000000..79633d2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineEntityConverterV2.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +class TimelineEntityConverterV2 { + private static final Log LOG = + LogFactory.getLog(TimelineEntityConverterV2.class); + + static final String JOB = "MAPREDUCE_JOB"; + static final String TASK = "MAPREDUCE_TASK"; + static final String TASK_ATTEMPT = "MAPREDUCE_TASK_ATTEMPT"; + + /** + * Creates job, task, and task attempt entities based on the job history info + * and configuration. + * + * Note: currently these are plan timeline entities created for mapreduce + * types. These are not meant to be the complete and accurate entity set-up + * for mapreduce jobs. We do not leverage hierarchical timeline entities. If + * we create canonical mapreduce hierarchical timeline entities with proper + * parent-child relationship, we could modify this to use that instead. + * + * Note that we also do not add info to the YARN application entity, which + * would be needed for aggregation. + */ + public List<TimelineEntity> createTimelineEntities(JobInfo jobInfo, + Configuration conf) { + List<TimelineEntity> entities = new ArrayList<>(); + + // create the job entity + TimelineEntity job = createJobEntity(jobInfo, conf); + entities.add(job); + + // create the task and task attempt entities + List<TimelineEntity> tasksAndAttempts = + createTaskAndTaskAttemptEntities(jobInfo); + entities.addAll(tasksAndAttempts); + + return entities; + } + + private TimelineEntity createJobEntity(JobInfo jobInfo, Configuration conf) { + TimelineEntity job = new TimelineEntity(); + job.setType(JOB); + job.setId(jobInfo.getJobId().toString()); + job.setCreatedTime(jobInfo.getSubmitTime()); + + job.addInfo("JOBNAME", jobInfo.getJobname()); + job.addInfo("USERNAME", jobInfo.getUsername()); + job.addInfo("JOB_QUEUE_NAME", jobInfo.getJobQueueName()); + job.addInfo("SUBMIT_TIME", jobInfo.getSubmitTime()); + job.addInfo("LAUNCH_TIME", jobInfo.getLaunchTime()); + job.addInfo("FINISH_TIME", jobInfo.getFinishTime()); + job.addInfo("JOB_STATUS", jobInfo.getJobStatus()); + job.addInfo("PRIORITY", jobInfo.getPriority()); + job.addInfo("TOTAL_MAPS", jobInfo.getTotalMaps()); + job.addInfo("TOTAL_REDUCES", jobInfo.getTotalReduces()); + job.addInfo("UBERIZED", jobInfo.getUberized()); + job.addInfo("ERROR_INFO", jobInfo.getErrorInfo()); + + // add metrics from total counters + // we omit the map counters and reduce counters for now as it's kind of + // awkward to put them (map/reduce/total counters are really a group of + // related counters) + Counters totalCounters = jobInfo.getTotalCounters(); + if (totalCounters != null) { + addMetrics(job, totalCounters); + } + // finally add configuration to the job + addConfiguration(job, conf); + LOG.info("converted job " + jobInfo.getJobId() + " to a timeline entity"); + return job; + } + + private void addConfiguration(TimelineEntity job, Configuration conf) { + for (Map.Entry<String,String> e: conf) { + job.addConfig(e.getKey(), e.getValue()); + } + } + + private void addMetrics(TimelineEntity entity, Counters counters) { + for (CounterGroup g: counters) { + String groupName = g.getName(); + for (Counter c: g) { + String name = groupName + ":" + c.getName(); + TimelineMetric metric = new TimelineMetric(); + metric.setId(name); + metric.addValue(System.currentTimeMillis(), c.getValue()); + entity.addMetric(metric); + } + } + } + + private List<TimelineEntity> createTaskAndTaskAttemptEntities( + JobInfo jobInfo) { + List<TimelineEntity> entities = new ArrayList<>(); + Map<TaskID,TaskInfo> taskInfoMap = jobInfo.getAllTasks(); + LOG.info("job " + jobInfo.getJobId()+ " has " + taskInfoMap.size() + + " tasks"); + for (TaskInfo taskInfo: taskInfoMap.values()) { + TimelineEntity task = createTaskEntity(taskInfo); + entities.add(task); + // add the task attempts from this task + Set<TimelineEntity> taskAttempts = createTaskAttemptEntities(taskInfo); + entities.addAll(taskAttempts); + } + return entities; + } + + private TimelineEntity createTaskEntity(TaskInfo taskInfo) { + TimelineEntity task = new TimelineEntity(); + task.setType(TASK); + task.setId(taskInfo.getTaskId().toString()); + task.setCreatedTime(taskInfo.getStartTime()); + + task.addInfo("START_TIME", taskInfo.getStartTime()); + task.addInfo("FINISH_TIME", taskInfo.getFinishTime()); + task.addInfo("TASK_TYPE", taskInfo.getTaskType()); + task.addInfo("TASK_STATUS", taskInfo.getTaskStatus()); + task.addInfo("ERROR_INFO", taskInfo.getError()); + + // add metrics from counters + Counters counters = taskInfo.getCounters(); + if (counters != null) { + addMetrics(task, counters); + } + LOG.info("converted task " + taskInfo.getTaskId() + + " to a timeline entity"); + return task; + } + + private Set<TimelineEntity> createTaskAttemptEntities(TaskInfo taskInfo) { + Set<TimelineEntity> taskAttempts = new HashSet<TimelineEntity>(); + Map<TaskAttemptID,TaskAttemptInfo> taskAttemptInfoMap = + taskInfo.getAllTaskAttempts(); + LOG.info("task " + taskInfo.getTaskId() + " has " + + taskAttemptInfoMap.size() + " task attempts"); + for (TaskAttemptInfo taskAttemptInfo: taskAttemptInfoMap.values()) { + TimelineEntity taskAttempt = createTaskAttemptEntity(taskAttemptInfo); + taskAttempts.add(taskAttempt); + } + return taskAttempts; + } + + private TimelineEntity createTaskAttemptEntity( + TaskAttemptInfo taskAttemptInfo) { + TimelineEntity taskAttempt = new TimelineEntity(); + taskAttempt.setType(TASK_ATTEMPT); + taskAttempt.setId(taskAttemptInfo.getAttemptId().toString()); + taskAttempt.setCreatedTime(taskAttemptInfo.getStartTime()); + + taskAttempt.addInfo("START_TIME", taskAttemptInfo.getStartTime()); + taskAttempt.addInfo("FINISH_TIME", taskAttemptInfo.getFinishTime()); + taskAttempt.addInfo("MAP_FINISH_TIME", + taskAttemptInfo.getMapFinishTime()); + taskAttempt.addInfo("SHUFFLE_FINISH_TIME", + taskAttemptInfo.getShuffleFinishTime()); + taskAttempt.addInfo("SORT_FINISH_TIME", + taskAttemptInfo.getSortFinishTime()); + taskAttempt.addInfo("TASK_STATUS", taskAttemptInfo.getTaskStatus()); + taskAttempt.addInfo("STATE", taskAttemptInfo.getState()); + taskAttempt.addInfo("ERROR", taskAttemptInfo.getError()); + taskAttempt.addInfo("CONTAINER_ID", + taskAttemptInfo.getContainerId().toString()); + + // add metrics from counters + Counters counters = taskAttemptInfo.getCounters(); + if (counters != null) { + addMetrics(taskAttempt, counters); + } + LOG.info("converted task attempt " + taskAttemptInfo.getAttemptId() + + " to a timeline entity"); + return taskAttempt; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd444089/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java index 0753d7f..1a14137 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TimelineServicePerformance.java @@ -23,8 +23,6 @@ import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.SleepJob.SleepInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; @@ -46,15 +44,19 @@ public class TimelineServicePerformance extends Configured implements Tool { System.err.println( "Usage: [-m <maps>] number of mappers (default: " + NUM_MAPS_DEFAULT + ")\n" + - " [-v] timeline service version\n" + - " [-mtype <mapper type in integer>]\n" + - " 1. simple entity write mapper (default)\n" + + " [-v] timeline service version (default: " + + TIMELINE_SERVICE_VERSION_1 + ")\n" + + " 1. version 1.x\n" + + " 2. version 2.x\n" + + " [-mtype <mapper type in integer>] (default: " + + SIMPLE_ENTITY_WRITER + ")\n" + + " 1. simple entity write mapper\n" + " 2. jobhistory files replay mapper\n" + " [-s <(KBs)test>] number of KB per put (mtype=1, default: " + - SimpleEntityWriterV1.KBS_SENT_DEFAULT + " KB)\n" + + SimpleEntityWriterConstants.KBS_SENT_DEFAULT + " KB)\n" + " [-t] package sending iterations per mapper (mtype=1, default: " + - SimpleEntityWriterV1.TEST_TIMES_DEFAULT + ")\n" + - " [-d <path>] root path of job history files (mtype=2)\n" + + SimpleEntityWriterConstants.TEST_TIMES_DEFAULT + ")\n" + + " [-d <path>] hdfs root path of job history files (mtype=2)\n" + " [-r <replay mode>] (mtype=2)\n" + " 1. write all entities for a job in one put (default)\n" + " 2. write one entity at a time\n"); @@ -78,8 +80,7 @@ public class TimelineServicePerformance extends Configured implements Tool { try { if ("-v".equals(args[i])) { timeline_service_version = Integer.parseInt(args[++i]); - } - if ("-m".equals(args[i])) { + } else if ("-m".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { job.getConfiguration() .setInt(MRJobConfig.NUM_MAPS, Integer.parseInt(args[i])); @@ -88,11 +89,12 @@ public class TimelineServicePerformance extends Configured implements Tool { mapperType = Integer.parseInt(args[++i]); } else if ("-s".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(SimpleEntityWriterV1.KBS_SENT, Integer.parseInt(args[i])); + conf.setInt(SimpleEntityWriterConstants.KBS_SENT, + Integer.parseInt(args[i])); } } else if ("-t".equals(args[i])) { if (Integer.parseInt(args[++i]) > 0) { - conf.setInt(SimpleEntityWriterV1.TEST_TIMES, + conf.setInt(SimpleEntityWriterConstants.TEST_TIMES, Integer.parseInt(args[i])); } } else if ("-d".equals(args[i])) { @@ -113,28 +115,40 @@ public class TimelineServicePerformance extends Configured implements Tool { } // handle mapper-specific settings - switch (timeline_service_version) { - case TIMELINE_SERVICE_VERSION_1: - default: - switch (mapperType) { - case JOB_HISTORY_FILE_REPLAY_MAPPER: + switch (mapperType) { + case JOB_HISTORY_FILE_REPLAY_MAPPER: + String processingPath = + conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH); + if (processingPath == null || processingPath.isEmpty()) { + System.out.println("processing path is missing while mtype = 2"); + return printUsage() == 0; + } + switch (timeline_service_version) { + case TIMELINE_SERVICE_VERSION_2: + job.setMapperClass(JobHistoryFileReplayMapperV2.class); + break; + case TIMELINE_SERVICE_VERSION_1: + default: job.setMapperClass(JobHistoryFileReplayMapperV1.class); - String processingPath = - conf.get(JobHistoryFileReplayHelper.PROCESSING_PATH); - if (processingPath == null || processingPath.isEmpty()) { - System.out.println("processing path is missing while mtype = 2"); - return printUsage() == 0; - } break; - case SIMPLE_ENTITY_WRITER: + } + break; + case SIMPLE_ENTITY_WRITER: + default: + // use the current timestamp as the "run id" of the test: this will + // be used as simulating the cluster timestamp for apps + conf.setLong(SimpleEntityWriterConstants.TIMELINE_SERVICE_PERFORMANCE_RUN_ID, + System.currentTimeMillis()); + switch (timeline_service_version) { + case TIMELINE_SERVICE_VERSION_2: + job.setMapperClass(SimpleEntityWriterV2.class); + break; + case TIMELINE_SERVICE_VERSION_1: default: job.setMapperClass(SimpleEntityWriterV1.class); - // use the current timestamp as the "run id" of the test: this will - // be used as simulating the cluster timestamp for apps - conf.setLong(SimpleEntityWriterV1.TIMELINE_SERVICE_PERFORMANCE_RUN_ID, - System.currentTimeMillis()); break; } + break; } return true; } @@ -164,25 +178,46 @@ public class TimelineServicePerformance extends Configured implements Tool { Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true) ? 0 : 1; - org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); - long writetime = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue(); - long writecounts = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue(); - long writesize = - counters.findCounter(PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); - double transacrate = writecounts * 1000 / (double)writetime; - double iorate = writesize * 1000 / (double)writetime; - int numMaps = - Integer.parseInt(job.getConfiguration().get(MRJobConfig.NUM_MAPS)); - - System.out.println("TRANSACTION RATE (per mapper): " + transacrate + - " ops/s"); - System.out.println("IO RATE (per mapper): " + iorate + " KB/s"); - - System.out.println("TRANSACTION RATE (total): " + transacrate*numMaps + - " ops/s"); - System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s"); + if (job.isSuccessful()) { + org.apache.hadoop.mapreduce.Counters counters = job.getCounters(); + long writecounts = + counters.findCounter( + PerfCounters.TIMELINE_SERVICE_WRITE_COUNTER).getValue(); + long writefailures = + counters.findCounter( + PerfCounters.TIMELINE_SERVICE_WRITE_FAILURES).getValue(); + if (writefailures > 0 && writefailures == writecounts) { + // see if we have a complete failure to write + System.out.println("Job failed: all writes failed!"); + } else { + long writetime = + counters.findCounter( + PerfCounters.TIMELINE_SERVICE_WRITE_TIME).getValue(); + long writesize = + counters.findCounter( + PerfCounters.TIMELINE_SERVICE_WRITE_KBS).getValue(); + if (writetime == 0L) { + // see if write time is zero (normally shouldn't happen) + System.out.println("Job failed: write time is 0!"); + } else { + double transacrate = writecounts * 1000 / (double)writetime; + double iorate = writesize * 1000 / (double)writetime; + int numMaps = + Integer.parseInt( + job.getConfiguration().get(MRJobConfig.NUM_MAPS)); + + System.out.println("TRANSACTION RATE (per mapper): " + transacrate + + " ops/s"); + System.out.println("IO RATE (per mapper): " + iorate + " KB/s"); + + System.out.println("TRANSACTION RATE (total): " + + transacrate*numMaps + " ops/s"); + System.out.println("IO RATE (total): " + iorate*numMaps + " KB/s"); + } + } + } else { + System.out.println("Job failed: " + job.getStatus().getFailureInfo()); + } return ret; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org