Repository: eagle Updated Branches: refs/heads/master 015d57788 -> eae6e8f11
http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index a12f589..a9f5132 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -17,7 +17,7 @@ package org.apache.eagle.jpm.analyzer.publisher; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import java.util.ArrayList; @@ -27,14 +27,17 @@ import java.util.Map; public class Result { //for EagleStorePublisher - private TaggedLogAPIEntity alertEntity = null;//TODO + private Map<String, List<TaggedLogAPIEntity>> alertEntities = new HashMap<>(); //for EmailPublisher - private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>(); + private Map<String, List<ProcessorResult>> alertMessages = new HashMap<>(); public void addEvaluatorResult(Class<?> type, EvaluatorResult result) { Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults(); + Map<Class<?>, TaggedLogAPIEntity> processorEntities = result.getProcessorEntities(); + for (Class<?> processorType : processorResults.keySet()) { ProcessorResult processorResult = processorResults.get(processorType); + if (processorResult.resultLevel.equals(ResultLevel.NONE)) { continue; } @@ -42,17 +45,27 @@ public class Result { String typeName = type.getName(); if (!alertMessages.containsKey(typeName)) { alertMessages.put(typeName, new ArrayList<>()); + alertEntities.put(typeName, new ArrayList<>()); } - alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage())); + normalizeResult(processorResult); + alertMessages.get(typeName).add(processorResult); + alertEntities.get(typeName).add(processorEntities.get(processorType)); + } } - public TaggedLogAPIEntity getAlertEntity() { - return alertEntity; + public Map<String, List<ProcessorResult>> getAlertMessages() { + return alertMessages; + } + + public Map<String, List<TaggedLogAPIEntity>> getAlertEntities() { + return alertEntities; } - public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() { - return alertMessages; + private void normalizeResult(ProcessorResult processorResult) { + if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) { + processorResult.setSettingList(StringUtils.join(processorResult.getSettings(), "\n")); + } } /** @@ -61,18 +74,52 @@ public class Result { public enum ResultLevel { NONE, + INFO, NOTICE, WARNING, CRITICAL } + public enum RuleType { + COMPRESS, + SPLIT, + SPILL, + TASK_NUMBER, + GC_TIME, + RESOURCE_CONTENTION, + DATA_SKEW, + + LONG_STUCK_JOB, + LONG_DURATION_JOB + } + public static class ProcessorResult { + private RuleType ruleType; private ResultLevel resultLevel; private String message; + private List<String> settings; + private String settingList; - public ProcessorResult(ResultLevel resultLevel, String message) { + public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List<String> settings) { + this.ruleType = ruleType; this.resultLevel = resultLevel; this.message = message; + this.settings = settings; + } + + public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) { + this.ruleType = ruleType; + this.resultLevel = resultLevel; + this.message = message; + this.settings = new ArrayList<>(); + } + + public RuleType getRuleType() { + return ruleType; + } + + public void setRuleType(RuleType ruleType) { + this.ruleType = ruleType; } public ResultLevel getResultLevel() { @@ -90,6 +137,22 @@ public class Result { public void setMessage(String message) { this.message = message; } + + public List<String> getSettings() { + return settings; + } + + public void setSettings(List<String> settings) { + this.settings = settings; + } + + public String getSettingList() { + return settingList; + } + + public void setSettingList(String settingList) { + this.settingList = settingList; + } } /** @@ -97,13 +160,22 @@ public class Result { */ public static class EvaluatorResult { private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>(); + private Map<Class<?>, TaggedLogAPIEntity> processorEntities = new HashMap<>(); public void addProcessorResult(Class<?> type, ProcessorResult result) { this.processorResults.put(type, result); } + public void addProcessorEntity(Class<?> type, TaggedLogAPIEntity entity) { + this.processorEntities.put(type, entity); + } + public Map<Class<?>, ProcessorResult> getProcessorResults() { return this.processorResults; } + + public Map<Class<?>, TaggedLogAPIEntity> getProcessorEntities() { + return processorEntities; + } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java index 4b18f7c..6a51a76 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java @@ -17,7 +17,7 @@ package org.apache.eagle.jpm.analyzer.publisher.dedup; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; public interface AlertDeduplicator { http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index 09f1af6..b139b3c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -17,7 +17,7 @@ package org.apache.eagle.jpm.analyzer.publisher.dedup.impl; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; import org.apache.eagle.jpm.analyzer.util.Constants; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java index 774e6d2..4c6661a 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -62,4 +62,5 @@ public class Constants { public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic"; public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend"; + } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm index 39cec68..996adba 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm @@ -115,13 +115,17 @@ <table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1"> <caption><b>Analysis By $evaluator</b></caption> <tr> - <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td> + <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>type</b></th> <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th> + <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>optimizer setting</b></th> + <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></th> </tr> - #foreach($message in ${elem["extend"].get($evaluator).keySet()}) + #foreach($result in ${elem["extend"].get($evaluator)}) <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"> - <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td> - <th style="...">$message</th> + <td style="...">${result.ruleType}</td> + <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.message}</td> + <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">${result.settingList}</td> + <td style="...">${result.resultLevel}</td> </tr> #end </table> http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java index cbbdad3..8c65adf 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JPAEntityRepository.java @@ -38,5 +38,6 @@ public class JPAEntityRepository extends EntityRepository { entitySet.add(JobProcessTimeStampEntity.class); entitySet.add(JobCountEntity.class); entitySet.add(TaskAttemptErrorCategoryEntity.class); + entitySet.add(JobSuggestionAPIEntity.class); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java new file mode 100644 index 0000000..3863a5d --- /dev/null +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/JobSuggestionAPIEntity.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.historyentity; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +import java.util.List; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import static org.apache.eagle.jpm.util.Constants.JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("eaglejpa") +@ColumnFamily("f") +@Prefix("jsuggestion") +@Service(JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME) +@TimeSeries(true) +@Partition({"site"}) +@Indexes({ + @Index(name = "Index_1_jobId", columns = { "jobId" }, unique = true), + @Index(name = "Index_2_jobDefId", columns = { "jobDefId" }, unique = false) + }) +public class JobSuggestionAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private String optimizerSuggestion; + @Column("b") + private List<String> optimizerSettings; + + public String getOptimizerSuggestion() { + return optimizerSuggestion; + } + + public void setOptimizerSuggestion(String optimizerSuggestion) { + this.optimizerSuggestion = optimizerSuggestion; + valueChanged("optimizerSuggestion"); + } + + public List<String> getOptimizerSettings() { + return optimizerSettings; + } + + public void setOptimizerSettings(List<String> optimizerSettings) { + this.optimizerSettings = optimizerSettings; + valueChanged("optimizerSettings"); + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java index 8db7f5c..46fcf5e 100644 --- a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java +++ b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/mr/historyentity/TaskAttemptExecutionAPIEntity.java @@ -46,6 +46,40 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { private String error; @Column("f") private JobCounters jobCounters; + // new added + @Column("g") + private long shuffleFinishTime; + @Column("h") + private long sortFinishTime; + @Column("i") + private long mapFinishTime; + + public long getShuffleFinishTime() { + return shuffleFinishTime; + } + + public void setShuffleFinishTime(long shuffleFinishTime) { + this.shuffleFinishTime = shuffleFinishTime; + valueChanged("shuffleFinishTime"); + } + + public long getSortFinishTime() { + return sortFinishTime; + } + + public void setSortFinishTime(long sortFinishTime) { + this.sortFinishTime = sortFinishTime; + valueChanged("sortFinishTime"); + } + + public long getMapFinishTime() { + return mapFinishTime; + } + + public void setMapFinishTime(long mapFinishTime) { + this.mapFinishTime = mapFinishTime; + valueChanged("mapFinishTime"); + } public String getTaskStatus() { return taskStatus; @@ -53,7 +87,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setTaskStatus(String taskStatus) { this.taskStatus = taskStatus; - pcs.firePropertyChange("taskStatus", null, null); + valueChanged("taskStatus"); } public long getStartTime() { @@ -62,7 +96,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setStartTime(long startTime) { this.startTime = startTime; - pcs.firePropertyChange("startTime", null, null); + valueChanged("startTime"); } public long getEndTime() { @@ -71,7 +105,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setEndTime(long endTime) { this.endTime = endTime; - pcs.firePropertyChange("endTime", null, null); + valueChanged("endTime"); } public long getDuration() { @@ -80,7 +114,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setDuration(long duration) { this.duration = duration; - pcs.firePropertyChange("duration", null, null); + valueChanged("duration"); } public String getError() { @@ -89,7 +123,7 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setError(String error) { this.error = error; - pcs.firePropertyChange("error", null, null); + valueChanged("error"); } public JobCounters getJobCounters() { @@ -98,6 +132,6 @@ public class TaskAttemptExecutionAPIEntity extends JobBaseAPIEntity { public void setJobCounters(JobCounters jobCounters) { this.jobCounters = jobCounters; - pcs.firePropertyChange("jobCounters", null, null); + valueChanged("jobCounters"); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/pom.xml b/eagle-jpm/eagle-jpm-mr-history/pom.xml index 349c489..2e3a1a8 100644 --- a/eagle-jpm/eagle-jpm-mr-history/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-history/pom.xml @@ -89,6 +89,11 @@ <version>1.6</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-analyzer</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index 262054e..28ebf4e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -21,6 +21,7 @@ package org.apache.eagle.jpm.mr.history.crawler; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.parser.JHFParserBase; import org.apache.eagle.jpm.mr.history.parser.JHFParserFactory; +import org.apache.eagle.jpm.util.MRJobTagName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { @SuppressWarnings("serial") Map<String, String> baseTags = new HashMap<String, String>() { { - put("site", appConfig.getJobHistoryEndpointConfig().site); + put(MRJobTagName.SITE.toString(), appConfig.getJobHistoryEndpointConfig().site); } }; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java index d89937e..d58eadc 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFEventReaderBase.java @@ -81,6 +81,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl private long sumReduceTaskDuration; private JobCounterMetricsGenerator jobCounterMetricsGenerator; + private JobSuggestionListener jobSuggestionListener; private MRHistoryJobConfig appConfig; @@ -127,6 +128,8 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl this.appConfig = appConfig; this.jobCounterMetricsGenerator = new JobCounterMetricsGenerator(appConfig.getEagleServiceConfig()); + this.jobSuggestionListener = new JobSuggestionListener(appConfig.getConfig()); + this.addListener(jobSuggestionListener); } public void register(HistoryJobEntityLifecycleListener lifecycleListener) { @@ -179,7 +182,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl this.jobType = jobType; } - protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters) throws Exception { + protected void handleJob(EventType eventType, Map<Keys, String> values, Object totalCounters, Object mapCounters, Object reduceCounters) throws Exception { String id = values.get(Keys.JOBID); if (jobId == null) { @@ -300,8 +303,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl this.jobCounterMetricsGenerator.setBaseTags(jobExecutionEntity.getTags()); formatDiagnostics(values.get(Keys.DIAGNOSTICS)); - entityCreated(jobExecutionEntity); + + if (configuration != null && totalCounters != null) { + JobCounters parsedTotalCounters = parseCounters(totalCounters); + JobCounters parsedMapCounters = parseCounters(mapCounters); + JobCounters parsedReduceCounters = parseCounters(reduceCounters); + jobSuggestionListener.jobCountersCreated(parsedTotalCounters, parsedMapCounters, parsedReduceCounters); + jobSuggestionListener.jobConfigCreated(configuration); + } } } @@ -332,7 +342,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl } } - super.notifiyListeners(entity); + super.notifyListeners(entity); } protected abstract JobCounters parseCounters(Object value) throws IOException; @@ -432,7 +442,7 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl // it is very likely that an attempt ID could be both succeeded and failed due to M/R system // in this case, we should ignore this attempt? if (taskAttemptStartTime.get(taskAttemptID) == null) { - LOG.warn("task attemp has consistency issue " + taskAttemptID); + LOG.warn("task attempt has consistency issue " + taskAttemptID); return; } entity.setStartTime(taskAttemptStartTime.get(taskAttemptID)); @@ -441,6 +451,15 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl entity.setDuration(entity.getEndTime() - entity.getStartTime()); entity.setTaskStatus(values.get(Keys.TASK_STATUS)); entity.setError(values.get(Keys.ERROR)); + if (values.containsKey(Keys.SHUFFLE_FINISHED)) { + entity.setShuffleFinishTime(Long.valueOf(values.get(Keys.SHUFFLE_FINISHED))); + } + if (values.containsKey(Keys.SORT_FINISHED)) { + entity.setSortFinishTime(Long.valueOf(values.get(Keys.SORT_FINISHED))); + } + if (values.containsKey(Keys.MAP_FINISH_TIME)) { + entity.setMapFinishTime(Long.valueOf(values.get(Keys.MAP_FINISH_TIME))); + } if (values.get(Keys.COUNTERS) != null || counters != null) { // when task is killed, COUNTERS does not exist //entity.setJobCounters(parseCounters(values.get(Keys.COUNTERS))); entity.setJobCounters(parseCounters(counters)); @@ -473,7 +492,6 @@ public abstract class JHFEventReaderBase extends JobEntityCreationPublisher impl taskAttemptErrorCategoryEntity.setTimestamp(entity.getTimestamp()); entityCreated(taskAttemptErrorCategoryEntity); } - taskAttemptStartTime.remove(taskAttemptID); } else { // silently ignore http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java index adeb41e..615e9ad 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFFormat.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.history.parser; +@Deprecated public enum JHFFormat { MRVer1, MRVer2 http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java index 8184f90..1903fc9 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFMRVer2EventReader.java @@ -185,7 +185,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { if (js.getJobQueueName() != null) { values.put(Keys.JOB_QUEUE, js.getJobQueueName().toString()); } - handleJob(wrapper.getType(), values, null); + handleJob(wrapper.getType(), values, null, null, null); } private void handleJobInited(Event wrapper) throws Exception { @@ -209,7 +209,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { if (js.getUberized() != null) { values.put(Keys.UBERISED, js.getUberized().toString()); } - handleJob(wrapper.getType(), values, null); + handleJob(wrapper.getType(), values, null, null, null); } private void handleJobFinished(Event wrapper) throws Exception { @@ -234,7 +234,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { values.put(Keys.FAILED_REDUCES, js.getFailedReduces().toString()); } values.put(Keys.JOB_STATUS, EagleJobStatus.SUCCEEDED.name()); - handleJob(wrapper.getType(), values, js.getTotalCounters()); + handleJob(wrapper.getType(), values, js.getTotalCounters(), js.getMapCounters(), js.getReduceCounters()); } private void handleJobUnsuccessfulCompletion(Event wrapper) throws Exception { @@ -258,7 +258,7 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { if (js.getDiagnostics() != null) { values.put(Keys.DIAGNOSTICS, js.getDiagnostics().toString()); } - handleJob(wrapper.getType(), values, null); + handleJob(wrapper.getType(), values, null, null, null); } private void handleTaskStarted(Event wrapper) throws Exception { @@ -539,6 +539,9 @@ public class JHFMRVer2EventReader extends JHFEventReaderBase { } protected JobCounters parseCounters(Object value) throws IOException { + if (value == null) { + return null; + } JobCounters jc = new JobCounters(); Map<String, Map<String, Long>> groups = new HashMap<>(); JhCounters counters = (JhCounters) value; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java index ca49d9c..1d17640 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -38,11 +38,13 @@ public class JHFParserFactory { MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, appConfig); + // add HistoryJobEntityCreationListener reader2.addListener(new JobEntityCreationEagleServiceListener(appConfig)); reader2.addListener(new TaskFailureListener(eagleServiceConfig)); reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig)); reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig)); + // add HistoryJobEntityLifecycleListener reader2.register(new JobEntityLifecycleAggregator()); JHFParserBase parser = new JHFMRVer2Parser(reader2); return parser; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java index a80462d..f2730fd 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationPublisher.java @@ -29,7 +29,7 @@ public class JobEntityCreationPublisher { listeners.add(l); } - public void notifiyListeners(JobBaseAPIEntity entity) throws Exception { + public void notifyListeners(JobBaseAPIEntity entity) throws Exception { for (HistoryJobEntityCreationListener l : listeners) { l.jobEntityCreated(entity); } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java new file mode 100644 index 0000000..e5b0d2e --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.mr.history.parser; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; +import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; +import org.apache.eagle.jpm.mr.historyentity.JobExecutionAPIEntity; +import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; +import org.apache.eagle.jpm.mr.historyentity.TaskExecutionAPIEntity; +import org.apache.eagle.jpm.util.MRJobTagName; +import org.apache.eagle.jpm.util.jobcounter.JobCounters; +import org.apache.eagle.jpm.mr.history.parser.JHFEventReaderBase.Keys; +import org.apache.hadoop.conf.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ATTEMPT_ID; +import static org.apache.eagle.jpm.util.MRJobTagName.TASK_ID; + +/* + * JobEventCounterListener provides an interface to add job/task counter analyzers + */ +public class JobSuggestionListener implements HistoryJobEntityCreationListener { + private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionListener.class); + + private MapReduceAnalyzerEntity info; + private MRJobPerformanceAnalyzer<MapReduceAnalyzerEntity> analyzer; + + public JobSuggestionListener(Config config) { + this.info = new MapReduceAnalyzerEntity(); + this.analyzer = new MRJobPerformanceAnalyzer<>(config); + } + + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + if (entity instanceof TaskExecutionAPIEntity) { + info.getTasksMap().put(entity.getTags().get(TASK_ID.toString()), (TaskExecutionAPIEntity) entity); + } else if (entity instanceof TaskAttemptExecutionAPIEntity) { + info.getCompletedTaskAttemptsMap().put(entity.getTags().get(TASK_ATTEMPT_ID.toString()), (TaskAttemptExecutionAPIEntity) entity); + } else if (entity instanceof JobExecutionAPIEntity) { + JobExecutionAPIEntity jobExecutionAPIEntity = (JobExecutionAPIEntity) entity; + info.setCurrentState(jobExecutionAPIEntity.getCurrentState()); + info.setStartTime(jobExecutionAPIEntity.getStartTime()); + info.setEndTime(jobExecutionAPIEntity.getEndTime()); + info.setDurationTime(jobExecutionAPIEntity.getDurationTime()); + info.setUserId(jobExecutionAPIEntity.getTags().get(MRJobTagName.USER.toString())); + info.setJobId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_ID.toString())); + info.setJobDefId(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOD_DEF_ID.toString())); + info.setSiteId(jobExecutionAPIEntity.getTags().get(MRJobTagName.SITE.toString())); + info.setJobName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_NAME.toString())) ; + info.setJobQueueName(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_QUEUE.toString())); + info.setJobType(jobExecutionAPIEntity.getTags().get(MRJobTagName.JOB_TYPE.toString())); + info.setFinishedMaps(jobExecutionAPIEntity.getNumFinishedMaps()); + info.setFinishedReduces(jobExecutionAPIEntity.getNumFinishedReduces()); + info.setFailedReduces(jobExecutionAPIEntity.getNumFailedReduces()); + info.setFailedMaps(jobExecutionAPIEntity.getNumFailedMaps()); + info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps()); + info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces()); + } + } + + public void jobConfigCreated(Configuration configuration) { + info.setJobConf(configuration); + } + + public void jobCountersCreated(JobCounters totalCounters, JobCounters mapCounters, JobCounters reduceCounters) { + info.setTotalCounters(totalCounters); + info.setReduceCounters(reduceCounters); + info.setMapCounters(mapCounters); + } + + @Override + public void flush() throws Exception { + analyzer.analyze(info); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index 856f051..24c734d 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -18,7 +18,6 @@ package org.apache.eagle.jpm.mr.history.parser; -import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.historyentity.JobBaseAPIEntity; import org.apache.eagle.jpm.mr.historyentity.TaskAttemptCounterAPIEntity; import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf index e8d5311..3836e3a 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf @@ -49,7 +49,7 @@ "service": { "host": "sandbox.hortonworks.com", - "port": 9099, + "port": 9090, "username": "admin", "password": "secret", "readTimeOutSeconds" : 10, http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 120303e..6b33d31 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -19,7 +19,7 @@ package org.apache.eagle.jpm.mr.running.parser; import com.typesafe.config.Config; -import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; @@ -173,7 +173,7 @@ public class MRJobParser implements Runnable { break; } } - mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId))); + mrJobPerformanceAnalyzer.analyze(convertToAnalysisEntity(mrJobEntityMap.get(jobId))); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java index 0ba6521..4ee58a1 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java @@ -117,6 +117,7 @@ public class Constants { public static final String JPA_RUNNING_TASK_EXECUTION_SERVICE_NAME = "RunningTaskExecutionService"; public static final String JPA_RUNNING_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = "RunningTaskAttemptExecutionService"; public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = "JobProcessTimeStampService"; + public static final String JOB_OPTIMIZER_SUGGESTION_SERVICE_NAME = "JobOptimizerSuggestionService"; public static final String JOB_TASK_TYPE_TAG = "taskType"; http://git-wip-us.apache.org/repos/asf/eagle/blob/eae6e8f1/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java index 8def44f..bbb80cd 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/jobcounter/JobCounters.java @@ -48,7 +48,11 @@ public final class JobCounters implements Serializable { } public Long getCounterValue(CounterName counterName) { - return counters.get(counterName.group.name).get(counterName.name); + if (counters.get(counterName.group.name).containsKey(counterName.name)) { + return counters.get(counterName.group.name).get(counterName.name); + } else { + return 0L; + } } public static enum GroupName {
