[EAGLE-859] MapReduce job performance suggestion https://issues.apache.org/jira/browse/EAGLE-859
Author: Zhao, Qingwen <[email protected]> Author: Qingwen Zhao <[email protected]> Closes #784 from qingwen220/EAGLE-859. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/eae6e8f1 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/eae6e8f1 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/eae6e8f1 Branch: refs/heads/master Commit: eae6e8f11367c397c3a9ce39f2c59cf341f33847 Parents: 015d577 Author: Zhao, Qingwen <[email protected]> Authored: Fri Jan 20 11:14:56 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Fri Jan 20 11:14:56 2017 +0800 ---------------------------------------------------------------------- eagle-jpm/eagle-jpm-analyzer/pom.xml | 5 + .../eagle/jpm/analyzer/AnalyzerEntity.java | 130 ------------ .../apache/eagle/jpm/analyzer/Evaluator.java | 5 +- .../apache/eagle/jpm/analyzer/JobAnalyzer.java | 10 +- .../apache/eagle/jpm/analyzer/Processor.java | 5 +- .../jpm/analyzer/meta/model/AnalyzerEntity.java | 130 ++++++++++++ .../meta/model/MapReduceAnalyzerEntity.java | 174 +++++++++++++++ .../analyzer/mr/MRJobPerformanceAnalyzer.java | 10 +- .../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 7 +- .../sla/processors/LongStuckJobProcessor.java | 4 +- .../UnExpectedLongDurationJobProcessor.java | 8 +- .../mr/suggestion/JobSuggestionEvaluator.java | 74 ++++++- .../MapReduceCompressionSettingProcessor.java | 82 ++++++++ .../suggestion/MapReduceDataSkewProcessor.java | 63 ++++++ .../mr/suggestion/MapReduceGCTimeProcessor.java | 74 +++++++ .../MapReduceJobSuggestionContext.java | 209 +++++++++++++++++++ .../MapReduceQueueResourceProcessor.java | 85 ++++++++ .../mr/suggestion/MapReduceSpillProcessor.java | 125 +++++++++++ .../MapReduceSplitSettingProcessor.java | 47 +++++ .../suggestion/MapReduceTaskNumProcessor.java | 194 +++++++++++++++++ .../analyzer/publisher/EagleStorePublisher.java | 40 +++- .../jpm/analyzer/publisher/EmailPublisher.java | 21 +- .../eagle/jpm/analyzer/publisher/Publisher.java | 2 +- .../eagle/jpm/analyzer/publisher/Result.java | 90 +++++++- .../publisher/dedup/AlertDeduplicator.java | 2 +- .../dedup/impl/SimpleDeduplicator.java | 2 +- .../eagle/jpm/analyzer/util/Constants.java | 1 + .../main/resources/AnalyzerReportTemplate.vm | 12 +- .../mr/historyentity/JPAEntityRepository.java | 1 + .../historyentity/JobSuggestionAPIEntity.java | 63 ++++++ .../TaskAttemptExecutionAPIEntity.java | 46 +++- eagle-jpm/eagle-jpm-mr-history/pom.xml | 5 + .../crawler/DefaultJHFInputStreamCallback.java | 3 +- .../mr/history/parser/JHFEventReaderBase.java | 28 ++- .../eagle/jpm/mr/history/parser/JHFFormat.java | 1 + .../mr/history/parser/JHFMRVer2EventReader.java | 11 +- .../jpm/mr/history/parser/JHFParserFactory.java | 2 + .../parser/JobEntityCreationPublisher.java | 2 +- .../history/parser/JobSuggestionListener.java | 94 +++++++++ .../parser/TaskAttemptCounterListener.java | 1 - .../src/main/resources/application.conf | 2 +- .../jpm/mr/running/parser/MRJobParser.java | 4 +- .../org/apache/eagle/jpm/util/Constants.java | 1 + .../eagle/jpm/util/jobcounter/JobCounters.java | 6 +- 44 files changed, 1674 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml index 50fb803..07f5766 100644 --- a/eagle-jpm/eagle-jpm-analyzer/pom.xml +++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml @@ -47,6 +47,11 @@ </dependency> <dependency> <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-entity</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> <artifactId>eagle-app-base</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java deleted file mode 100644 index f9b7af0..0000000 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.analyzer; - -import java.util.HashMap; -import java.util.Map; - -/** - * will refactor later if other types of job needs this. - * AnalyzerEntity for each job needed to be analysised - */ -public class AnalyzerEntity { - private String jobDefId; - private String jobId; - private String siteId; - private String userId; - - private long startTime; - private long endTime; - private long durationTime; - private String currentState; - private double progress; - - private Map<String, Object> jobConfig = new HashMap<>(); - - private Map<String, Object> jobMeta = new HashMap<>(); - - public String getJobDefId() { - return jobDefId; - } - - public void setJobDefId(String jobDefId) { - this.jobDefId = jobDefId; - } - - public String getJobId() { - return jobId; - } - - public void setJobId(String jobId) { - this.jobId = jobId; - } - - public String getSiteId() { - return siteId; - } - - public void setSiteId(String siteId) { - this.siteId = siteId; - } - - public String getUserId() { - return userId; - } - - public void setUserId(String userId) { - this.userId = userId; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - - public long getEndTime() { - return endTime; - } - - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - public long getDurationTime() { - return durationTime; - } - - public void setDurationTime(long durationTime) { - this.durationTime = durationTime; - } - - public String getCurrentState() { - return currentState; - } - - public void setCurrentState(String currentState) { - this.currentState = currentState; - } - - public Map<String, Object> getJobConfig() { - return jobConfig; - } - - public void setJobConfig(Map<String, Object> jobConfig) { - this.jobConfig = jobConfig; - } - - public Map<String, Object> getJobMeta() { - return jobMeta; - } - - public void setJobMeta(Map<String, Object> jobMeta) { - this.jobMeta = jobMeta; - } - - public double getProgress() { - return progress; - } - - public void setProgress(double progress) { - this.progress = progress; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java index 6617916..60ee8d6 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java @@ -17,8 +17,9 @@ package org.apache.eagle.jpm.analyzer; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; -public interface Evaluator { - Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity); +public interface Evaluator<T extends AnalyzerEntity> { + Result.EvaluatorResult evaluate(T analyzerEntity); } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java index 6cda1cd..1e9c00e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java @@ -17,12 +17,14 @@ package org.apache.eagle.jpm.analyzer; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; + /** - * Each JobAnalyzer contains one or more Evaluators to analysis each job. + * Each JobAnalyzer contains one or more Evaluators to analyze each job. * Each Evaluator is a group of Processors - * Each Processor implements an algorithm or a model to analysis one dimension of a job + * Each Processor implements an algorithm or a model to analyze one dimension of a job * */ -public interface JobAnalyzer { - void analysis(AnalyzerEntity analyzerEntity) throws Exception; +public interface JobAnalyzer<T extends AnalyzerEntity> { + void analyze(T analyzerEntity) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java index d5a8a74..419e402 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java @@ -17,8 +17,9 @@ package org.apache.eagle.jpm.analyzer; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; -public interface Processor { - Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity); +public interface Processor<T extends AnalyzerEntity> { + Result.ProcessorResult process(T jobAnalysisEntity); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java new file mode 100644 index 0000000..189d85d --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.model; + +import java.util.HashMap; +import java.util.Map; + +/** + * will refactor later if other types of job needs this. + * AnalyzerEntity for each job needed to be analyzed + */ +public class AnalyzerEntity { + private String jobDefId; + private String jobId; + private String siteId; + private String userId; + + private long startTime; + private long endTime; + private long durationTime; + private String currentState; + private double progress; + + private Map<String, Object> jobConfig = new HashMap<>(); + + private Map<String, Object> jobMeta = new HashMap<>(); + + public String getJobDefId() { + return jobDefId; + } + + public void setJobDefId(String jobDefId) { + this.jobDefId = jobDefId; + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public long getDurationTime() { + return durationTime; + } + + public void setDurationTime(long durationTime) { + this.durationTime = durationTime; + } + + public String getCurrentState() { + return currentState; + } + + public void setCurrentState(String currentState) { + this.currentState = currentState; + } + + public Map<String, Object> getJobConfig() { + return jobConfig; + } + + public void setJobConfig(Map<String, Object> jobConfig) { + this.jobConfig = jobConfig; + } + + public Map<String, Object> getJobMeta() { + return jobMeta; + } + + public void setJobMeta(Map<String, Object> jobMeta) { + this.jobMeta = jobMeta; + } + + public double getProgress() { + return progress; + } + + public void setProgress(double progress) { + this.progress = progress; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java new file mode 100644 index 0000000..cd6249d --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/MapReduceAnalyzerEntity.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.model; + +import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.hadoop.conf.Configuration; + +import java.util.HashMap; +import java.util.Map; + +public class MapReduceAnalyzerEntity extends AnalyzerEntity { + private String jobName; + private String jobQueueName; + private String jobType; + private int totalMaps; + private int totalReduces; + private int failedMaps; + private int failedReduces; + private int finishedMaps; + private int finishedReduces; + private JobCounters totalCounters; + private JobCounters mapCounters; + private JobCounters reduceCounters; + private Map<String, TaskExecutionAPIEntity> tasksMap; + private Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap; + private Configuration jobConf; + + public MapReduceAnalyzerEntity() { + this.setEndTime(-1); + this.setStartTime(-1); + finishedMaps = finishedReduces = 0; + jobName = jobQueueName = ""; + tasksMap = new HashMap<>(); + completedTaskAttemptsMap = new HashMap<>(); + } + + public String getJobName() { + return jobName; + } + + public String getJobQueueName() { + return jobQueueName; + } + + public String getJobType() { + return jobType; + } + + public int getTotalMaps() { + return totalMaps; + } + + public int getTotalReduces() { + return totalReduces; + } + + public int getFailedMaps() { + return failedMaps; + } + + public int getFailedReduces() { + return failedReduces; + } + + public int getFinishedMaps() { + return finishedMaps; + } + + public int getFinishedReduces() { + return finishedReduces; + } + + public JobCounters getTotalCounters() { + return totalCounters; + } + + public JobCounters getMapCounters() { + return mapCounters; + } + + public JobCounters getReduceCounters() { + return reduceCounters; + } + + public Map<String, TaskExecutionAPIEntity> getTasksMap() { + return tasksMap; + } + + public Map<String, TaskAttemptExecutionAPIEntity> getCompletedTaskAttemptsMap() { + return completedTaskAttemptsMap; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public void setJobQueueName(String jobQueueName) { + this.jobQueueName = jobQueueName; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public void setTotalMaps(int totalMaps) { + this.totalMaps = totalMaps; + } + + public void setTotalReduces(int totalReduces) { + this.totalReduces = totalReduces; + } + + public void setFailedMaps(int failedMaps) { + this.failedMaps = failedMaps; + } + + public void setFailedReduces(int failedReduces) { + this.failedReduces = failedReduces; + } + + public void setFinishedMaps(int finishedMaps) { + this.finishedMaps = finishedMaps; + } + + public void setFinishedReduces(int finishedReduces) { + this.finishedReduces = finishedReduces; + } + + public void setTotalCounters(JobCounters totalCounters) { + this.totalCounters = totalCounters; + } + + public void setMapCounters(JobCounters mapCounters) { + this.mapCounters = mapCounters; + } + + public void setReduceCounters(JobCounters reduceCounters) { + this.reduceCounters = reduceCounters; + } + + public void setTasksMap(Map<String, TaskExecutionAPIEntity> tasksMap) { + this.tasksMap = tasksMap; + } + + public void setCompletedTaskAttemptsMap(Map<String, TaskAttemptExecutionAPIEntity> completedTaskAttemptsMap) { + this.completedTaskAttemptsMap = completedTaskAttemptsMap; + } + + public Configuration getJobConf() { + return jobConf; + } + + public void setJobConf(Configuration jobConf) { + this.jobConf = jobConf; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java index e0e579a..e32a37c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -20,6 +20,7 @@ package org.apache.eagle.jpm.analyzer.mr; import com.typesafe.config.Config; import org.apache.eagle.jpm.analyzer.*; import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher; @@ -33,7 +34,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable { +public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAnalyzer<T>, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class); private List<Evaluator> evaluators = new ArrayList<>(); @@ -51,11 +52,14 @@ public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable { } @Override - public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception { + public void analyze(T analyzerJobEntity) throws Exception { Result result = new Result(); for (Evaluator evaluator : evaluators) { - result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity)); + Result.EvaluatorResult evaluatorResult = evaluator.evaluate(analyzerJobEntity); + if (evaluatorResult != null) { + result.addEvaluatorResult(evaluator.getClass(), evaluatorResult); + } } for (Publisher publisher : publishers) { http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java index f10b68d..a77e55d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.analyzer.mr.sla; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.Evaluator; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor; @@ -26,6 +26,7 @@ import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJob import org.apache.eagle.jpm.analyzer.Processor; import org.apache.eagle.jpm.analyzer.publisher.Result; import org.apache.eagle.jpm.analyzer.util.Utils; +import org.apache.eagle.jpm.util.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,10 @@ public class SLAJobEvaluator implements Evaluator, Serializable { @Override public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) { + if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) { + return null; + } + Result.EvaluatorResult result = new Result.EvaluatorResult(); List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId()); http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java index 35f3b27..b3322ed 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.analyzer.mr.sla.processors; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.Processor; import org.apache.eagle.jpm.analyzer.publisher.Result; import org.slf4j.Logger; @@ -38,6 +38,6 @@ public class LongStuckJobProcessor implements Processor, Serializable { @Override public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId()); - return new Result.ProcessorResult(Result.ResultLevel.NONE, ""); + return new Result.ProcessorResult(Result.RuleType.LONG_STUCK_JOB, Result.ResultLevel.NONE, ""); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index 9d4ce2b..8f655ba 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -18,7 +18,7 @@ package org.apache.eagle.jpm.analyzer.mr.sla.processors; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; import org.apache.eagle.jpm.analyzer.Processor; import org.apache.eagle.jpm.analyzer.util.Constants; @@ -50,7 +50,7 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta(); long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); if (avgDurationTime == 0L) { - return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE); + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); } Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD; @@ -62,12 +62,12 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime; for (Map.Entry<Result.ResultLevel, Double> entry : sorted) { if (expirePercent >= entry.getValue()) { - return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds", + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds", (int)(expirePercent * 100), avgDurationTime / 1000)); } } - return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE); + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); } private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) { http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java index 79f5318..ea60ff9 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java @@ -18,15 +18,24 @@ package org.apache.eagle.jpm.analyzer.mr.suggestion; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.mr.historyentity.JobSuggestionAPIEntity; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.util.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -public class JobSuggestionEvaluator implements Evaluator, Serializable { +import static org.apache.eagle.jpm.util.MRJobTagName.*; + +public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity>, Serializable { private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class); private Config config; @@ -35,10 +44,63 @@ public class JobSuggestionEvaluator implements Evaluator, Serializable { this.config = config; } + private List<Processor> loadProcessors(MapReduceJobSuggestionContext context) { + List<Processor> processors = new ArrayList<>(); + processors.add(new MapReduceCompressionSettingProcessor(context)); + processors.add(new MapReduceSplitSettingProcessor(context)); + processors.add(new MapReduceDataSkewProcessor(context)); + processors.add(new MapReduceGCTimeProcessor(context)); + processors.add(new MapReduceSpillProcessor(context)); + processors.add(new MapReduceTaskNumProcessor(context)); + //processors.add(new MapReduceQueueResourceProcessor(context)); + + return processors; + } + @Override - public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) { - Result.EvaluatorResult result = new Result.EvaluatorResult(); - //TODO - return result; + public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity analyzerEntity) { + if (analyzerEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) { + return null; + } + + MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity); + if (jobContext.getNumMaps() == 0) { + return null; + } + + try { + Result.EvaluatorResult result = new Result.EvaluatorResult(); + for (Processor processor : loadProcessors(jobContext)) { + Result.ProcessorResult processorResult = processor.process(analyzerEntity); + if (processorResult != null) { + result.addProcessorResult(processor.getClass(), processorResult); + result.addProcessorEntity(processor.getClass(), createJobSuggestionEntity(processorResult, analyzerEntity)); + } + } + return result; + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + return null; + } + } + + private static JobSuggestionAPIEntity createJobSuggestionEntity(Result.ProcessorResult processorResult, MapReduceAnalyzerEntity entity) { + Map<String, String> tags = new HashMap<>(); + tags.put(JOB_ID.toString(), entity.getJobId()); + tags.put(JOD_DEF_ID.toString(), entity.getJobDefId()); + tags.put(SITE.toString(), entity.getSiteId()); + tags.put(USER.toString(), entity.getUserId()); + tags.put(RULE_TYPE.toString(), processorResult.getRuleType().toString()); + tags.put(JOB_QUEUE.toString(), entity.getJobQueueName()); + tags.put(JOB_TYPE.toString(), entity.getJobType()); + JobSuggestionAPIEntity jobSuggestionAPIEntity = new JobSuggestionAPIEntity(); + jobSuggestionAPIEntity.setTags(tags); + jobSuggestionAPIEntity.setTimestamp(entity.getStartTime()); // startTime as the job timestamp + jobSuggestionAPIEntity.setOptimizerSuggestion(processorResult.getMessage()); + jobSuggestionAPIEntity.setOptimizerSettings(processorResult.getSettings()); + + return jobSuggestionAPIEntity; + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java new file mode 100644 index 0000000..62c5c2b --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS; +import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC; +import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES; +import static org.apache.hadoop.mapreduce.MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR; + +public class MapReduceCompressionSettingProcessor implements Processor<MapReduceAnalyzerEntity> { + + private MapReduceJobSuggestionContext context; + + public MapReduceCompressionSettingProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + StringBuilder sb = new StringBuilder(); + List<String> optSettings = new ArrayList<>(); + + JobConf jobconf = new JobConf(context.getJobconf()); + if (jobconf.getLong(NUM_REDUCES, 0) > 0) { + if (!jobconf.getCompressMapOutput()) { + optSettings.add(String.format("%s=true", MAP_OUTPUT_COMPRESS)); + sb.append("Please set " + MAP_OUTPUT_COMPRESS + " to true to reduce network IO.\n"); + } else { + String codecClassName = jobconf.get(MAP_OUTPUT_COMPRESS_CODEC); + if (!(codecClassName.endsWith("LzoCodec") || codecClassName.endsWith("SnappyCodec"))) { + optSettings.add(String.format("%s=LzoCodec or SnappyCodec", MAP_OUTPUT_COMPRESS_CODEC)); + sb.append("Best practice: use LzoCodec or SnappyCodec for " + MAP_OUTPUT_COMPRESS_CODEC).append("\n"); + } + } + } + + if (!jobconf.getBoolean(FileOutputFormat.COMPRESS, false)) { + optSettings.add(String.format("%s=true", FileOutputFormat.COMPRESS)); + sb.append("Please set " + FileOutputFormat.COMPRESS + " to true to reduce disk usage and network IO.\n"); + } else { + String codecName = jobconf.get(FileOutputFormat.COMPRESS_CODEC, ""); + String outputFileFormat = jobconf.get(OUTPUT_FORMAT_CLASS_ATTR, ""); + + if ((codecName.endsWith("GzipCodec") || codecName.endsWith("SnappyCodec") || codecName.endsWith("DefaultCodec")) + && outputFileFormat.endsWith("TextOutputFormat")) { + sb.append("Best practice: don't use Gzip/Snappy/DefaultCodec with TextOutputFormat"); + sb.append(" as this will cause the output files to be unsplittable. "); + sb.append("Please use LZO instead or "); + sb.append("use a container file format such as SequenceFileOutputFormat.\n"); + } + } + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java new file mode 100644 index 0000000..b21a927 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; + +public class MapReduceDataSkewProcessor implements Processor<MapReduceAnalyzerEntity> { + private MapReduceJobSuggestionContext context; + + public MapReduceDataSkewProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + TaskAttemptExecutionAPIEntity worstReduce = context.getWorstReduce(); + if (context.getNumReduces() == 0 || worstReduce == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + try { + long worstTimeInSec = (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) / DateTimeUtil.ONESECOND; + if (worstTimeInSec - context.getAvgReduceTimeInSec() > 30 * 60 ) { + long avgInputs = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS) + / context.getNumReduces(); + long worstInputs = worstReduce.getJobCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS); + + if (worstInputs > avgInputs * 5) { + sb.append("Data skew detected in reducers. The average reduce time is ").append(context.getAvgReduceTimeInSec()); + sb.append(" seconds, the worst reduce time is ").append(worstTimeInSec); + sb.append(" seconds. Please investigate this problem to improve your job performance.\n"); + } + } + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString()); + } + } catch (NullPointerException e) { + // When job failed there may not have counters, so just ignore it + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java new file mode 100644 index 0000000..103de7a --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS; +import static org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_JAVA_OPTS; + +public class MapReduceGCTimeProcessor implements Processor<MapReduceAnalyzerEntity> { + private MapReduceJobSuggestionContext context; + + public MapReduceGCTimeProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + StringBuilder sb = new StringBuilder(); + List<String> optSettings = new ArrayList<>(); + String setting; + + try { + long mapGCTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS); + long mapCPUTime = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS); + + if (mapGCTime > mapCPUTime * 0.1) { + setting = String.format("-D%s", MAP_JAVA_OPTS); + optSettings.add(setting); + sb.append("Map GC_TIME_MILLIS took too long. Please increase mapper memory via ").append(setting); + sb.append(", or optimize your mapper class.\n"); + } + + if (context.getNumReduces() > 0) { + long reduceGCTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.GC_MILLISECONDS); + long reduceCPUTime = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.CPU_MILLISECONDS); + if (reduceGCTime > reduceCPUTime * 0.1) { + setting = String.format("-D%s", REDUCE_JAVA_OPTS); + optSettings.add(setting); + sb.append("Reduce GC_TIME_MILLIS took too long. Please increase memory for reduce via ").append(setting); + sb.append(", or optimize your reducer class.\n"); + } + } + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings); + } + } catch (NullPointerException e) { + // When job failed there may not have counters, so just ignore it + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java new file mode 100644 index 0000000..1f4e548 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceJobSuggestionContext.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.util.Constants; +import org.apache.eagle.jpm.util.MRJobTagName; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.TaskType; + +import java.util.regex.Pattern; + +import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_BYTES; +import static org.apache.eagle.jpm.util.jobcounter.JobCounters.CounterName.MAP_OUTPUT_RECORDS; +import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_MAPS; +import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES; + +public class MapReduceJobSuggestionContext { + + private JobConf jobconf; + private MapReduceAnalyzerEntity job; + + private long numMaps; + private long numReduces; + + private long avgMapTimeInSec; + private long avgReduceTimeInSec; + private long avgShuffleTimeInSec; + private TaskAttemptExecutionAPIEntity worstMap; + private TaskAttemptExecutionAPIEntity worstReduce; + private TaskAttemptExecutionAPIEntity worstShuffle; + private TaskAttemptExecutionAPIEntity lastMap; + private TaskAttemptExecutionAPIEntity lastReduce; + private TaskAttemptExecutionAPIEntity lastShuffle; + private TaskAttemptExecutionAPIEntity firstMap; + private TaskAttemptExecutionAPIEntity firstReduce; + private TaskAttemptExecutionAPIEntity firstShuffle; + + private long minMapSpillMemBytes; + + public static final Pattern MAX_HEAP_PATTERN = Pattern.compile("-Xmx([0-9]+)([kKmMgG]?)"); + + public MapReduceJobSuggestionContext(MapReduceAnalyzerEntity job) { + this.job = job; + this.jobconf = new JobConf(job.getJobConf()); + buildContext(); + } + + private MapReduceJobSuggestionContext buildContext() { + avgMapTimeInSec = avgReduceTimeInSec = avgShuffleTimeInSec = 0; + numMaps = jobconf.getLong(NUM_MAPS, 0); + numReduces = jobconf.getLong(NUM_REDUCES, 0); + + for (TaskAttemptExecutionAPIEntity attempt : job.getCompletedTaskAttemptsMap().values()) { + String taskType = getTaskType(attempt); + if (Constants.TaskType.MAP.toString().equalsIgnoreCase(taskType)) { + long mapTime = attempt.getEndTime() - attempt.getStartTime(); + avgMapTimeInSec += mapTime; + if (firstMap == null || firstMap.getStartTime() > attempt.getStartTime()) { + firstMap = attempt; + } + if (lastMap == null || lastMap.getEndTime() < attempt.getEndTime()) { + lastMap = attempt; + } + if (worstMap == null || (worstMap.getEndTime() - worstMap.getStartTime()) < mapTime) { + worstMap = attempt; + } + long tmpMem = getMinimumIOSortMemory(attempt); + if (tmpMem > minMapSpillMemBytes) { + minMapSpillMemBytes = tmpMem; + } + } else if (TaskType.REDUCE.toString().equalsIgnoreCase(taskType)) { + long shuffleTime = attempt.getShuffleFinishTime() - attempt.getStartTime(); + avgShuffleTimeInSec += shuffleTime; + if (firstShuffle == null || firstShuffle.getStartTime() > attempt.getStartTime()) { + firstShuffle = attempt; + } + if (lastShuffle == null || lastShuffle.getShuffleFinishTime() < attempt.getShuffleFinishTime()) { + lastShuffle = attempt; + } + if (worstShuffle == null || (worstShuffle.getShuffleFinishTime() - worstShuffle.getStartTime()) < shuffleTime) { + worstShuffle = attempt; + } + + long reduceTime = attempt.getEndTime() - attempt.getShuffleFinishTime(); + avgReduceTimeInSec += reduceTime; + if (firstReduce == null || firstReduce.getStartTime() > attempt.getStartTime()) { + firstReduce = attempt; + } + if (lastReduce == null || lastReduce.getEndTime() < attempt.getEndTime()) { + lastReduce = attempt; + } + if (worstReduce == null || (worstReduce.getEndTime() - worstReduce.getShuffleFinishTime()) < reduceTime) { + worstReduce = attempt; + } + } + } + if (numMaps > 0) { + avgMapTimeInSec = avgMapTimeInSec / numMaps / DateTimeUtil.ONESECOND; + } + if (numReduces > 0) { + avgReduceTimeInSec = avgReduceTimeInSec / numReduces / DateTimeUtil.ONESECOND; + avgShuffleTimeInSec = avgShuffleTimeInSec / numReduces / DateTimeUtil.ONESECOND; + } + return this; + } + + private String getTaskType(TaskAttemptExecutionAPIEntity taskAttemptInfo) { + return taskAttemptInfo.getTags().get(MRJobTagName.TASK_TYPE.toString()); + } + + /** + * The default index size is 16. + * + * @param attempt + * @return minimal sort memory + */ + private long getMinimumIOSortMemory(TaskAttemptExecutionAPIEntity attempt) { + long records = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_RECORDS); + long outputBytes = attempt.getJobCounters().getCounterValue(MAP_OUTPUT_BYTES); + return outputBytes + records * 16; + } + + public JobConf getJobconf() { + return jobconf; + } + + public MapReduceAnalyzerEntity getJob() { + return job; + } + + public long getNumMaps() { + return numMaps; + } + + public long getNumReduces() { + return numReduces; + } + + public long getAvgMapTimeInSec() { + return avgMapTimeInSec; + } + + public long getAvgReduceTimeInSec() { + return avgReduceTimeInSec; + } + + public long getAvgShuffleTimeInSec() { + return avgShuffleTimeInSec; + } + + public TaskAttemptExecutionAPIEntity getWorstMap() { + return worstMap; + } + + public TaskAttemptExecutionAPIEntity getWorstReduce() { + return worstReduce; + } + + public TaskAttemptExecutionAPIEntity getWorstShuffle() { + return worstShuffle; + } + + public TaskAttemptExecutionAPIEntity getLastMap() { + return lastMap; + } + + public TaskAttemptExecutionAPIEntity getLastReduce() { + return lastReduce; + } + + public TaskAttemptExecutionAPIEntity getLastShuffle() { + return lastShuffle; + } + + public TaskAttemptExecutionAPIEntity getFirstMap() { + return firstMap; + } + + public TaskAttemptExecutionAPIEntity getFirstReduce() { + return firstReduce; + } + + public TaskAttemptExecutionAPIEntity getFirstShuffle() { + return firstShuffle; + } + + public long getMinMapSpillMemBytes() { + return minMapSpillMemBytes; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java new file mode 100644 index 0000000..a1b57bf --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; + +/* + * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20 + */ +public class MapReduceQueueResourceProcessor implements Processor<MapReduceAnalyzerEntity> { + private static final Logger LOG = LoggerFactory.getLogger(MapReduceQueueResourceProcessor.class); + + private MapReduceJobSuggestionContext context; + + public MapReduceQueueResourceProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + try { + String userName = context.getJob().getUserId(); + TaskAttemptExecutionAPIEntity lastMap = context.getLastMap(); + TaskAttemptExecutionAPIEntity firstMap = context.getFirstMap(); + TaskAttemptExecutionAPIEntity lastReduce = context.getLastReduce(); + TaskAttemptExecutionAPIEntity firstShuffle = context.getFirstShuffle(); + + if (checkBatchUser(userName) && lastMap != null && firstMap != null) { + StringBuilder sb = new StringBuilder(); + + long tasksPerTime = 500; // better get it from RM + long mapPhaseTimeInSec = (lastMap.getEndTime() - firstMap.getStartTime()) / DateTimeUtil.ONESECOND; + if (mapPhaseTimeInSec > context.getAvgMapTimeInSec() + * ((context.getNumMaps() + tasksPerTime - 1) / tasksPerTime) * 20) { + sb.append("There appears to have been resource contention during the map phase of your job. Please ask for more resources if your job is SLA-bound,"); + sb.append(" or submit your job when the cluster is less busy.\n"); + } + + if (context.getNumReduces() > 0 && lastReduce != null && firstShuffle != null) { + long reducePhaseTimeInSec = (lastReduce.getEndTime() - firstShuffle.getStartTime()) / DateTimeUtil.ONESECOND; + if (reducePhaseTimeInSec > context.getAvgReduceTimeInSec() + * ((context.getNumReduces() + tasksPerTime - 1) / tasksPerTime) * 20) { + sb.append("Seems there was resource contention when your job in reduce phase, please ask for more resource if your job is SLA enabled,"); + sb.append(" or submit your job when the cluster is less busy.\n"); + } + } + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString()); + } + } + } catch (Exception e) { + LOG.warn(e.getMessage(), e); + } + return null; + } + + protected boolean checkBatchUser(String userName) { + return userName.startsWith("b_"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java new file mode 100644 index 0000000..835b382 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.commons.io.FileUtils; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +import static org.apache.eagle.jpm.analyzer.mr.suggestion.MapReduceJobSuggestionContext.MAX_HEAP_PATTERN; +import static org.apache.hadoop.mapreduce.MRJobConfig.IO_SORT_MB; +import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_JAVA_OPTS; +import static org.apache.hadoop.mapreduce.MRJobConfig.MAP_SORT_SPILL_PERCENT; + +/** + * Check whether spilled more than once, if true, find out the minimum value of the memory to hold all the data, + * based on that value, find out how much memory need for heap size. + */ +public class MapReduceSpillProcessor implements Processor<MapReduceAnalyzerEntity> { + + private MapReduceJobSuggestionContext context; + + public MapReduceSpillProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + StringBuilder sb = new StringBuilder(); + List<String> optSettings = new ArrayList<>(); + String setting; + + long outputRecords = 0L; // Map output records + long spillRecords = 0L; // Spilled Records + try { + outputRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.MAP_OUTPUT_RECORDS); + spillRecords = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS); + + if (outputRecords < spillRecords) { + sb.append("Total map output records: ").append(outputRecords); + sb.append(" Total map spilled records: ").append(spillRecords).append(". Please set"); + + long minMapSpillMemBytes = context.getMinMapSpillMemBytes(); + double spillPercent = context.getJobconf().getDouble(MAP_SORT_SPILL_PERCENT, 0.8); + if (minMapSpillMemBytes > 512 * FileUtils.ONE_MB * spillPercent) { + if (Math.abs(1.0 - spillPercent) > 0.001) { + setting = String.format("-D%s=1", MAP_SORT_SPILL_PERCENT); + sb.append(" ").append(setting); + optSettings.add(setting); + } + } else { + minMapSpillMemBytes /= spillPercent; + } + + long minMapSpillMemMB = (minMapSpillMemBytes / FileUtils.ONE_MB + 10) / 10 * 10; + if (minMapSpillMemMB >= 2047 ) { + sb.append("\nPlease reduce the block size of the input files and make sure they are splittable."); + } else { + setting = String.format("-D%s=%s", IO_SORT_MB, minMapSpillMemMB); + sb.append(" ").append(setting); + optSettings.add(setting); + long heapSize = getMaxHeapSize(context.getJobconf().get(MAP_JAVA_OPTS)); + if (heapSize < 3 * minMapSpillMemMB) { + long expectedHeapSizeMB = (minMapSpillMemMB * 3 + 1024) / 1024 * 1024; + setting = String.format(" -D%s=-Xmx%sM", MAP_JAVA_OPTS, expectedHeapSizeMB); + sb.append(" ").append(setting); + optSettings.add(setting); + } + } + sb.append(" to avoid spilled records.\n"); + } + + + long reduceInputRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_INPUT_RECORDS); + spillRecords = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.SPILLED_RECORDS); + if (reduceInputRecords < spillRecords) { + sb.append("Please add more memory (mapreduce.reduce.java.opts) to avoid spilled records."); + sb.append(" Total Reduce input records: ").append(reduceInputRecords); + sb.append(" Total Spilled Records: ").append(spillRecords); + sb.append("\n"); + } + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings); + } + } catch (NullPointerException e) { + //When job failed there may not have counters, so just ignore it + } + return null; + } + + private static long getMaxHeapSize(String s) { + Matcher m = MAX_HEAP_PATTERN.matcher(s); + long val = 0; + if (m.find()) { + val = Long.parseLong(m.group(1)); + if ("k".equalsIgnoreCase(m.group(2))) { + val /= 1024; + } else if ("g".equalsIgnoreCase(m.group(2))) { + val *= 1024; + } + } + return val; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java new file mode 100644 index 0000000..8eba468 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import java.util.ArrayList; +import java.util.List; + +public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> { + + private MapReduceJobSuggestionContext context; + + public MapReduceSplitSettingProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + StringBuilder sb = new StringBuilder(); + + if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) { + sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE); + sb.append(", because it may lower data locality, hence maps will run slower.\n"); + return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString()); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java new file mode 100644 index 0000000..00d5cc9 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import org.apache.commons.io.FileUtils; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.mapreduce.MRJobConfig.NUM_REDUCES; + +public class MapReduceTaskNumProcessor implements Processor<MapReduceAnalyzerEntity> { + private static final String[] SIZE_UNITS = {"B", "K", "M", "G", "T", "P"}; + private MapReduceJobSuggestionContext context; + + public MapReduceTaskNumProcessor(MapReduceJobSuggestionContext context) { + this.context = context; + } + + @Override + public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) { + StringBuilder sb = new StringBuilder(); + List<String> optSettings = new ArrayList<>(); + try { + sb.append(analyzeReduceTaskNum(optSettings)); + sb.append(analyzeMapTaskNum(optSettings)); + + if (sb.length() > 0) { + return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings); + } + } catch (NullPointerException e) { + // When job failed there may not have counters, so just ignore it + } + return null; + } + + + private String analyzeReduceTaskNum(List<String> optSettings) { + StringBuilder sb = new StringBuilder(); + + long numReduces = context.getNumReduces(); + if (numReduces > 0) { + long avgReduceTime = context.getAvgReduceTimeInSec(); + long avgShuffleTime = context.getAvgShuffleTimeInSec(); + long avgShuffleBytes = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.REDUCE_SHUFFLE_BYTES) + / numReduces; + long avgReduceOutput = context.getJob().getReduceCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_WRITTEN) + / numReduces; + long avgReduceTotalTime = avgShuffleTime + avgReduceTime; + + long suggestReduces = 0; + StringBuilder tmpsb = new StringBuilder(); + + String avgShuffleDisplaySize = bytesToHumanReadable(avgShuffleBytes); + if (avgShuffleBytes < 256 * FileUtils.ONE_MB && avgReduceTotalTime < 300 + && avgReduceOutput < 256 * FileUtils.ONE_MB && numReduces > 1) { + tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", "); + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } else if (avgShuffleBytes > 10 * FileUtils.ONE_GB && avgReduceTotalTime > 1800) { + tmpsb.append("average reduce input bytes is: ").append(avgShuffleDisplaySize).append(", "); + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } + + if (avgReduceTotalTime < 60 && numReduces > 1) { + tmpsb.append("average reduce time is only ").append(avgReduceTotalTime).append(" seconds, "); + if (suggestReduces == 0) { + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } + } else if (avgReduceTotalTime > 3600 && avgReduceTime > 1800) { + tmpsb.append("average reduce time is ").append(avgReduceTotalTime).append(" seconds, "); + if (suggestReduces == 0) { + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } + } + + String avgReduceOutputDisplaySize = bytesToHumanReadable(avgReduceOutput); + if (avgReduceOutput < 10 * FileUtils.ONE_MB && avgReduceTime < 300 + && avgShuffleBytes < 2 * FileUtils.ONE_GB && numReduces > 1) { + tmpsb.append(" average reduce output is only ").append(avgReduceOutputDisplaySize).append(", "); + if (suggestReduces == 0) { + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } + } else if (avgReduceOutput > 10 * FileUtils.ONE_GB && avgReduceTime > 1800) { + tmpsb.append(" average reduce output is ").append(avgReduceOutputDisplaySize).append(", "); + if (suggestReduces == 0) { + suggestReduces = getReduceNum(avgShuffleBytes, avgReduceOutput, avgReduceTime); + } + } + + if (suggestReduces > 0) { + sb.append("Best practice: ").append(tmpsb.toString()).append("please consider "); + if (suggestReduces > numReduces) { + sb.append("increasing the "); + } else { + sb.append("decreasing the "); + } + String setting = String.format("-D%s=%s", NUM_REDUCES, suggestReduces); + sb.append("reducer number. You could try ").append(setting).append("\n"); + optSettings.add(setting); + } + } + return sb.toString(); + } + + private String analyzeMapTaskNum(List<String> optSettings) { + StringBuilder sb = new StringBuilder(); + + long numMaps = context.getNumMaps(); + long avgMapTime = context.getAvgMapTimeInSec(); + long avgMapInput = context.getJob().getMapCounters().getCounterValue(JobCounters.CounterName.HDFS_BYTES_READ) + / numMaps; + String avgMapInputDisplaySize = bytesToHumanReadable(avgMapInput); + + if (avgMapInput < 5 * FileUtils.ONE_MB && avgMapTime < 30 && numMaps > 1) { + sb.append("Best practice: average map input bytes only have ").append(avgMapInputDisplaySize); + sb.append(". Please reduce the number of mappers by merging input files.\n"); + } else if (avgMapInput > FileUtils.ONE_GB) { + sb.append("Best practice: average map input bytes have ").append(avgMapInputDisplaySize); + sb.append(". Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n"); + } + + if (avgMapTime < 10 && numMaps > 1) { + sb.append("Best practice: average map time only have ").append(avgMapTime); + sb.append(" seconds. Please reduce the number of mappers by merging input files or by using a larger block size.\n"); + } else if (avgMapTime > 600 && avgMapInput < FileUtils.ONE_GB) { + sb.append("Best practice: average map time is ").append(avgMapInput); + sb.append(" seconds. Please increase the number of mappers by using splittable compression, a container file format or a smaller block size.\n"); + } + + return sb.toString(); + } + + private long getReduceNum(long avgInputBytes, long avgOutputBytes, long avgTime) { + long newReduceNum = 1; + long tmpReduceNum; + + long numReduces = context.getNumReduces(); + tmpReduceNum = avgInputBytes * numReduces / (3 * FileUtils.ONE_GB); + if (tmpReduceNum > newReduceNum) { + newReduceNum = tmpReduceNum; + } + + tmpReduceNum = avgOutputBytes * numReduces / (2 * FileUtils.ONE_GB); + if (tmpReduceNum > newReduceNum) { + newReduceNum = tmpReduceNum; + } + + tmpReduceNum = avgTime * numReduces / (10 * 60); + if (tmpReduceNum > newReduceNum) { + newReduceNum = tmpReduceNum; + } + + return newReduceNum; + } + + + private static String bytesToHumanReadable(long bytes) { + double val = bytes; + int idx = 0; + while (val >= 1024) { + val /= 1024.0; + idx += 1; + } + StringBuilder sb = new StringBuilder(); + sb.append((int)Math.floor(val)); + sb.append(SIZE_UNITS[idx]); + int tmp = (int)(1000 * val) % 1000; + if (idx >= 1 && tmp > 0) { + sb.append(tmp); + sb.append(SIZE_UNITS[idx - 1]); + } + return sb.toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java index 6109704..0d7d2d7 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java @@ -18,23 +18,61 @@ package org.apache.eagle.jpm.analyzer.publisher; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; +import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; +import java.util.List; +import java.util.Map; public class EagleStorePublisher implements Publisher, Serializable { private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class); private Config config; + private IEagleServiceClient client; + private AlertDeduplicator alertDeduplicator; public EagleStorePublisher(Config config) { this.config = config; + this.alertDeduplicator = new SimpleDeduplicator(); } @Override public void publish(AnalyzerEntity analyzerJobEntity, Result result) { + if (result.getAlertMessages().size() == 0) { + return; + } + + LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId()); + if (alertDeduplicator.dedup(analyzerJobEntity, result)) { + LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); + return; + } + + try { + this.client = new EagleServiceClientImpl(config); + for (Map.Entry<String, List<TaggedLogAPIEntity>> entry : result.getAlertEntities().entrySet()) { + client.create(entry.getValue()); + LOG.info("successfully persist {} entities for evaluator {}", entry.getValue().size(), entry.getKey()); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } finally { + if (client != null) { + try { + client.close(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } + } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 4e49094..842e0ac 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -18,12 +18,10 @@ package org.apache.eagle.jpm.analyzer.publisher; import com.typesafe.config.Config; -import org.apache.commons.lang3.tuple.Pair; import org.apache.eagle.app.service.ApplicationEmailService; import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.common.mail.AlertEmailConstants; import org.apache.eagle.common.mail.AlertEmailContext; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; import org.apache.eagle.jpm.analyzer.util.Constants; @@ -31,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.net.URLEncoder; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,19 +66,15 @@ public class EmailPublisher implements Publisher, Serializable { basic.put("end", analyzerJobEntity.getEndTime() == 0 ? "0" : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime())); - basic.put("progress", analyzerJobEntity.getProgress() + "%"); + double progress = analyzerJobEntity.getCurrentState().equalsIgnoreCase(org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString()) ? analyzerJobEntity.getProgress() : 100; + basic.put("progress", progress + "%"); basic.put("detail", getJobLink(analyzerJobEntity)); - - Map<String, Map<String, String>> extend = new HashMap<>(); - Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages(); - for (String evaluator : alertMessages.keySet()) { - List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator); - extend.put(evaluator, new HashMap<>()); - for (Pair<Result.ResultLevel, String> message : messages) { + Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages(); + for (String evaluator : extend.keySet()) { + for (Result.ProcessorResult message : extend.get(evaluator)) { LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", - analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator); - extend.get(evaluator).put(message.getRight(), message.getLeft().toString()); + analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java index 2f42bf9..1f42ef9 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java @@ -17,7 +17,7 @@ package org.apache.eagle.jpm.analyzer.publisher; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; public interface Publisher { void publish(AnalyzerEntity analyzerJobEntity, Result result);
