Repository: eagle Updated Branches: refs/heads/master 2adbbf59f -> b4695801f
http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm new file mode 100644 index 0000000..39cec68 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm @@ -0,0 +1,131 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> +<html xmlns="http://www.w3.org/1999/xhtml"> +<head> + <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> + <meta name="viewport" content="width=device-width"/> + <style type="text/css"> + img { + max-width: 100%; + } + + body { + -webkit-font-smoothing: antialiased; + -webkit-text-size-adjust: none; + width: 100% !important; + height: 100%; + line-height: 1.6em; + } + + body { + background-color: #f6f6f6; + } + + @media only screen and (max-width: 640px) { + body { + padding: 0 !important; + } + + h1 { + font-weight: 800 !important; + margin: 20px 0 5px !important; + } + + h2 { + font-weight: 800 !important; + margin: 20px 0 5px !important; + } + + h3 { + font-weight: 800 !important; + margin: 20px 0 5px !important; + } + + h4 { + font-weight: 800 !important; + margin: 20px 0 5px !important; + } + + h1 { + font-size: 22px !important; + } + + h2 { + font-size: 18px !important; + } + + h3 { + font-size: 14px !important; + } + + .container { + padding: 0 !important; + width: 100% !important; + } + + .content { + padding: 0 !important; + } + + .content-wrap { + padding: 10px !important; + } + + .invoice { + width: 100% !important; + } + } + </style> +</head> +<body> + #set ( $elem = $alertList[0] ) + +<p><b>Basic Information: </b></p> + +<ul> + <li>Site: ${elem["basic"].get("site")}</li> + <li>Job Name: ${elem["basic"].get("name")}</li> + <li>User: ${elem["basic"].get("user")}</li> + <li>Job Status: ${elem["basic"].get("status")}</li> + <li>Start Time: ${elem["basic"].get("start")}</li> + <li>End Time: ${elem["basic"].get("end")}</li> + <li>Duration Time: ${elem["basic"].get("duration")}</li> + <li>Progress: ${elem["basic"].get("progress")}</li> + <li>Job Detail: <a href=${elem["basic"].get("detail")}>${elem["basic"].get("detail")}</a></li> +</ul> + +<p><b>Analyzer Results: </b></p> + +#foreach($evaluator in ${elem["extend"].keySet()}) +<table class="body-wrap" style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;" bgcolor="#f6f6f6" border="1"> + <caption><b>Analysis By $evaluator</b></caption> + <tr> + <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="100"><b>level</b></td> + <th style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;" width="250"><b>message</b></th> + </tr> + #foreach($message in ${elem["extend"].get($evaluator).keySet()}) + <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"> + <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;"><b>${elem["extend"].get($evaluator).get($message)}</b></td> + <th style="...">$message</th> + </tr> + #end +</table> +#end + +</body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql new file mode 100644 index 0000000..3d790d0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS jobs ( + jobDefId VARCHAR(50) NOT NULL, + configuration MEDIUMTEXT NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (jobDefId) +); + +CREATE TABLE IF NOT EXISTS job_evaluators ( + jobDefId VARCHAR(50) NOT NULL, + evaluator VARCHAR(100) NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (jobDefId, evaluator) +); + +CREATE TABLE IF NOT EXISTS job_publishments ( + userId VARCHAR(100) PRIMARY KEY, + mailAddress mediumtext NOT NULL, + createdtime bigint(20) DEFAULT NULL, + modifiedtime bigint(20) DEFAULT NULL, + PRIMARY KEY (userId) +); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java index 6f337e7..c3d08d4 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationHealthCheck.java @@ -89,7 +89,6 @@ public class MRHistoryJobApplicationHealthCheck extends ApplicationHealthCheckBa } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java index 6dc5791..01f98c0 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobConfigurationCreationServiceListener.java @@ -84,7 +84,6 @@ public class JobConfigurationCreationServiceListener implements HistoryJobEntity } finally { list.clear(); jobConfigurationEntity = null; - client.getJerseyClient().destroy(); client.close(); } tried++; http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 80b4049..2f77456 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -97,7 +97,7 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); logger.info("start flushing entities of total number " + list.size()); List<GenericMetricEntity> metricEntities = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { @@ -167,7 +167,6 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr logger.info("finish flushing entities of total number " + list.size()); list.clear(); - client.getJerseyClient().destroy(); client.close(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java index 666b3db..856f051 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskAttemptCounterListener.java @@ -120,7 +120,7 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); List<TaskAttemptCounterAPIEntity> list = new ArrayList<>(); logger.info("start flushing TaskAttemptCounter entities of total number " + counters.size()); // create entity @@ -149,7 +149,6 @@ public class TaskAttemptCounterListener implements HistoryJobEntityCreationListe logger.info("end flushing TaskAttemptCounter entities of total number " + counters.size()); counters.clear(); list.clear(); - client.getJerseyClient().destroy(); client.close(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java index 40e6432..794f992 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/TaskFailureListener.java @@ -139,7 +139,6 @@ public class TaskFailureListener implements HistoryJobEntityCreationListener { } tried++; } - client.getJerseyClient().destroy(); client.close(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml b/eagle-jpm/eagle-jpm-mr-running/pom.xml index f6f1be5..02f2bc4 100644 --- a/eagle-jpm/eagle-jpm-mr-running/pom.xml +++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml @@ -79,6 +79,16 @@ <version>${powermock.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-analyzer</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-metadata-jdbc</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> <resources> http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java index de0d846..309146e 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java @@ -18,6 +18,7 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobParseBolt; import org.apache.eagle.jpm.util.Constants; @@ -67,7 +68,8 @@ public class MRRunningJobApplication extends StormApplication { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config), + config, + new MRJobPerformanceAnalyzer(config)), tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new Fields("appId")); return topologyBuilder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java index 5a57aca..6670282 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationProvider.java @@ -18,6 +18,11 @@ package org.apache.eagle.jpm.mr.running; import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceJDBCImpl; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceMemoryImpl; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; +import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore; import java.util.Optional; @@ -31,4 +36,10 @@ public class MRRunningJobApplicationProvider extends AbstractApplicationProvider public Optional<ApplicationListener> getApplicationListener() { return Optional.of(new MRRunningJobApplicationListener()); } + + @Override + protected void onRegister() { + bind(MemoryMetadataStore.class, MetaManagementService.class, MetaManagementServiceMemoryImpl.class); + bind(JDBCMetadataStore.class, MetaManagementService.class, MetaManagementServiceJDBCImpl.class); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java index c2cbbe5..2bc1648 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java @@ -85,7 +85,6 @@ public class MRJobEntityCreationHandler { LOG.warn("exception found when flush entities, {}", e); return false; } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 52c1866..c21eaf1 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -19,6 +19,8 @@ package org.apache.eagle.jpm.mr.running.parser; import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; import org.apache.eagle.jpm.mr.runningentity.JobConfig; @@ -82,6 +84,7 @@ public class MRJobParser implements Runnable { private static final int FLUSH_TASKS_EVERY_TIME = 5; private static final int MAX_TASKS_PERMIT = 5000; private Config config; + private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; static { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); @@ -92,7 +95,8 @@ public class MRJobParser implements Runnable { AppInfo app, Map<String, JobExecutionAPIEntity> mrJobMap, MRRunningJobManager runningJobManager, ResourceFetcher rmResourceFetcher, List<String> configKeys, - Config config) { + Config config, + MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { this.app = app; if (mrJobMap == null) { this.mrJobEntityMap = new HashMap<>(); @@ -112,6 +116,7 @@ public class MRJobParser implements Runnable { this.finishedTaskIds = new HashSet<>(); this.configKeys = configKeys; this.config = config; + this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } public void setAppInfo(AppInfo app) { @@ -168,6 +173,7 @@ public class MRJobParser implements Runnable { break; } } + mrJobPerformanceAnalyzer.analysis(convertToAnalysisEntity(mrJobEntityMap.get(jobId))); } } @@ -586,4 +592,21 @@ public class MRJobParser implements Runnable { } } } + + private AnalyzerEntity convertToAnalysisEntity(JobExecutionAPIEntity jobExecutionAPIEntity) { + AnalyzerEntity mrJobAnalysisEntity = new AnalyzerEntity(); + Map<String, String> tags = jobExecutionAPIEntity.getTags(); + mrJobAnalysisEntity.setJobDefId(tags.get(MRJobTagName.JOD_DEF_ID.toString())); + mrJobAnalysisEntity.setJobId(tags.get(MRJobTagName.JOB_ID.toString())); + mrJobAnalysisEntity.setSiteId(tags.get(MRJobTagName.SITE.toString())); + mrJobAnalysisEntity.setUserId(tags.get(MRJobTagName.USER.toString())); + + mrJobAnalysisEntity.setStartTime(jobExecutionAPIEntity.getStartTime()); + mrJobAnalysisEntity.setEndTime(jobExecutionAPIEntity.getEndTime()); + mrJobAnalysisEntity.setDurationTime(jobExecutionAPIEntity.getDurationTime()); + mrJobAnalysisEntity.setCurrentState(jobExecutionAPIEntity.getInternalState()); + mrJobAnalysisEntity.setJobConfig(new HashMap<>(jobExecutionAPIEntity.getJobConfig())); + mrJobAnalysisEntity.setProgress(this.app.getProgress()); + return mrJobAnalysisEntity; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java index 8ec2dec..915df8a 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java @@ -19,6 +19,7 @@ package org.apache.eagle.jpm.mr.running.storm; import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; @@ -51,18 +52,21 @@ public class MRRunningJobParseBolt extends BaseRichBolt { private ResourceFetcher resourceFetcher; private List<String> configKeys; private Config config; + private MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer; public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig eagleServiceConfig, MRRunningJobConfig.EndpointConfig endpointConfig, MRRunningJobConfig.ZKStateConfig zkStateConfig, List<String> configKeys, - Config config) { + Config config, + MRJobPerformanceAnalyzer mrJobPerformanceAnalyzer) { this.eagleServiceConfig = eagleServiceConfig; this.endpointConfig = endpointConfig; this.runningMRParsers = new HashMap<>(); this.zkStateConfig = zkStateConfig; this.configKeys = configKeys; this.config = config; + this.mrJobPerformanceAnalyzer = mrJobPerformanceAnalyzer; } @Override @@ -83,7 +87,8 @@ public class MRRunningJobParseBolt extends BaseRichBolt { MRJobParser applicationParser; if (!runningMRParsers.containsKey(appInfo.getId())) { applicationParser = new MRJobParser(endpointConfig, eagleServiceConfig, - appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config); + appInfo, mrJobs, runningJobManager, this.resourceFetcher, configKeys, this.config, + mrJobPerformanceAnalyzer); runningMRParsers.put(appInfo.getId(), applicationParser); LOG.info("create application parser for {}", appInfo.getId()); } else { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java index 787c9ac..fc674d6 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplicationTest.java @@ -23,6 +23,7 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.parser.MRJobParser; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; import org.apache.eagle.jpm.mr.running.storm.MRRunningJobFetchSpout; @@ -88,7 +89,8 @@ public class MRRunningJobApplicationTest { mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getZkStateConfig(), confKeyKeys, - config); + config, + new MRJobPerformanceAnalyzer(config)); MRRunningJobManager mrRunningJobManager = mock(MRRunningJobManager.class); PowerMockito.whenNew(MRRunningJobManager.class).withArguments(mrRunningJobConfig.getZkStateConfig()).thenReturn(mrRunningJobManager); mrRunningJobParseBolt.prepare(null, null, null); http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java index e0b5533..7046f8b 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/test/java/org/apache/eagle/jpm/mr/running/parser/MRJobParserTest.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; +import org.apache.eagle.jpm.analyzer.mr.MRJobPerformanceAnalyzer; import org.apache.eagle.jpm.mr.running.MRRunningJobConfig; import org.apache.eagle.jpm.mr.running.parser.metrics.JobExecutionMetricsCreationListener; import org.apache.eagle.jpm.mr.running.recover.MRRunningJobManager; @@ -131,7 +132,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -186,7 +187,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -228,7 +229,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -272,7 +273,7 @@ public class MRJobParserTest { MRRunningJobManager runningJobManager = new MRRunningJobManager(mrRunningJobConfig.getZkStateConfig()); RMResourceFetcher resourceFetcher = new RMResourceFetcher(mrRunningJobConfig.getEndpointConfig().rmUrls); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -318,7 +319,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -357,7 +358,7 @@ public class MRJobParserTest { eagleServiceConfig.username, eagleServiceConfig.password).thenReturn(client); when(client.create(any())).thenThrow(Exception.class).thenReturn(null); - when(client.getJerseyClient()).thenReturn(new Client()); + //when(client.getJerseyClient()).thenReturn(new Client()); mockInputJobSteam("/mrjob_30784.json", JOB_URL); mockInputJobSteamWithException(JOB_COUNT_URL); mockGetConnection("/mrconf_30784.xml"); @@ -377,7 +378,7 @@ public class MRJobParserTest { RMResourceFetcher resourceFetcher = mock(RMResourceFetcher.class); when(resourceFetcher.getResource(any())).thenReturn(Collections.emptyList()); MRJobParser mrJobParser = new MRJobParser(mrRunningJobConfig.getEndpointConfig(), mrRunningJobConfig.getEagleServiceConfig(), - app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config); + app1, mrJobs, runningJobManager, resourceFetcher, confKeyKeys, config, new MRJobPerformanceAnalyzer(config)); Map<String, JobExecutionAPIEntity> jobIdToJobExecutionAPIEntity = getMrJobs(mrJobParser); @@ -401,7 +402,7 @@ public class MRJobParserTest { Assert.assertTrue(curator.checkExists().forPath(ZK_APP_PATH) != null); Assert.assertTrue(entities.isEmpty()); verify(client, times(2)).create(any()); - verify(client, times(1)).getJerseyClient(); + //verify(client, times(1)).getJerseyClient(); verify(client, times(1)).close(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java index fdfcaad..8127aa2 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobApplicationHealthCheck.java @@ -50,7 +50,7 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec eagleServiceConfig.username, eagleServiceConfig.password); - client.getJerseyClient().setReadTimeout(eagleServiceConfig.timeout * 1000); + client.setReadTimeout(eagleServiceConfig.timeout * 1000); String message = ""; try { @@ -88,7 +88,6 @@ public class SparkHistoryJobApplicationHealthCheck extends ApplicationHealthChec } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java index 05e35e4..2ef1bd9 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java @@ -708,7 +708,7 @@ public class JHFSparkEventReader { config.eagleInfo.username, config.eagleInfo.password); int timeout = config.eagleInfo.timeout; - client.getJerseyClient().setReadTimeout(timeout * 1000); + client.setReadTimeout(timeout * 1000); return client; } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java index 92adfa8..adef27f 100644 --- a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java +++ b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkAppEntityCreationHandler.java @@ -57,7 +57,7 @@ public class SparkAppEntityCreationHandler { eagleServiceConfig.eagleServicePort, eagleServiceConfig.username, eagleServiceConfig.password)) { - client.getJerseyClient().setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); + client.setReadTimeout(eagleServiceConfig.readTimeoutSeconds * 1000); LOG.info("start to flush spark app entities, size {}", entities.size()); client.create(entities); LOG.info("finish flushing spark app entities, size {}", entities.size()); http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java index 9025d36..a065373 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Utils.java @@ -39,7 +39,7 @@ public class Utils { try { is.close(); } catch (Exception e) { - e.printStackTrace(); + LOG.warn("{}", e); } } } @@ -48,7 +48,7 @@ public class Utils { try { Thread.sleep(seconds * 1000); } catch (Exception e) { - e.printStackTrace(); + LOG.warn("{}", e); } } @@ -60,7 +60,7 @@ public class Utils { Date parsedDate = dateFormat.parse(date); timestamp = parsedDate.getTime(); } catch (ParseException e) { - e.printStackTrace(); + LOG.warn("{}", e); } if (timestamp == 0L) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/pom.xml b/eagle-jpm/pom.xml index 1ffdb03..e7ae3c3 100644 --- a/eagle-jpm/pom.xml +++ b/eagle-jpm/pom.xml @@ -32,6 +32,7 @@ <packaging>pom</packaging> <modules> + <module>eagle-jpm-analyzer</module> <module>eagle-jpm-spark-running</module> <module>eagle-jpm-spark-history</module> <module>eagle-jpm-mr-history</module> http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-server-assembly/src/main/conf/eagle.conf ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/conf/eagle.conf b/eagle-server-assembly/src/main/conf/eagle.conf index af7e14a..a889914 100644 --- a/eagle-server-assembly/src/main/conf/eagle.conf +++ b/eagle-server-assembly/src/main/conf/eagle.conf @@ -108,6 +108,12 @@ application { recipients: "[email protected]" template: "JobReportTemplate.vm" } + analyzerReport { + sender: "[email protected]" + recipients: "[email protected]" + template: "AnalyzerReportTemplate.vm" + cc: "[email protected]" + } } # --------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java index bf5e695..950bb04 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java +++ b/eagle-topology-check/eagle-topology-app/src/main/java/org/apache/eagle/topology/TopologyCheckApplicationHealthCheck.java @@ -50,7 +50,7 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB topologyCheckAppConfig.getConfig().getString("service.username"), topologyCheckAppConfig.getConfig().getString("service.password")); - client.getJerseyClient().setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); + client.setReadTimeout(topologyCheckAppConfig.getConfig().getInt("service.readTimeOutSeconds") * 1000); String message = ""; try { @@ -80,7 +80,6 @@ public class TopologyCheckApplicationHealthCheck extends ApplicationHealthCheckB } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) {
