[EAGLE-797] add job performance analysis for continuous development
Author: wujinhu <[email protected]> Closes #762 from wujinhu/EAGLE-797. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/b4695801 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/b4695801 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/b4695801 Branch: refs/heads/master Commit: b4695801f6c93ca0991fd9758c470b784aa8c781 Parents: 2adbbf5 Author: wujinhu <[email protected]> Authored: Thu Jan 12 10:23:02 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Thu Jan 12 10:23:02 2017 +0800 ---------------------------------------------------------------------- .../app/spi/AbstractApplicationProvider.java | 3 +- ...adoopQueueRunningApplicationHealthCheck.java | 3 +- .../jpm/aggregation/mr/MRMetricAggregator.java | 3 + .../mr/MRMetricsAggregateContainer.java | 1 + eagle-jpm/eagle-jpm-analyzer/pom.xml | 38 ++++++ .../eagle/jpm/analyzer/AnalyzerEntity.java | 130 ++++++++++++++++++ .../apache/eagle/jpm/analyzer/Evaluator.java | 24 ++++ .../apache/eagle/jpm/analyzer/JobAnalyzer.java | 28 ++++ .../apache/eagle/jpm/analyzer/Processor.java | 24 ++++ .../analyzer/meta/MetaManagementService.java | 39 ++++++ .../impl/MetaManagementServiceJDBCImpl.java | 77 +++++++++++ .../impl/MetaManagementServiceMemoryImpl.java | 127 ++++++++++++++++++ .../jpm/analyzer/meta/model/JobMetaEntity.java | 85 ++++++++++++ .../analyzer/meta/model/PublisherEntity.java | 77 +++++++++++ .../analyzer/mr/MRJobPerformanceAnalyzer.java | 65 +++++++++ .../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 67 ++++++++++ .../sla/processors/LongStuckJobProcessor.java | 43 ++++++ .../UnExpectedLongDurationJobProcessor.java | 120 +++++++++++++++++ .../mr/suggestion/JobSuggestionEvaluator.java | 44 +++++++ .../analyzer/publisher/EagleStorePublisher.java | 40 ++++++ .../jpm/analyzer/publisher/EmailPublisher.java | 109 +++++++++++++++ .../eagle/jpm/analyzer/publisher/Publisher.java | 24 ++++ .../eagle/jpm/analyzer/publisher/Result.java | 109 +++++++++++++++ .../publisher/dedup/AlertDeduplicator.java | 25 ++++ .../dedup/impl/SimpleDeduplicator.java | 59 +++++++++ .../jpm/analyzer/resource/AnalyzerResource.java | 131 +++++++++++++++++++ .../eagle/jpm/analyzer/util/Constants.java | 65 +++++++++ .../apache/eagle/jpm/analyzer/util/Utils.java | 74 +++++++++++ .../main/resources/AnalyzerReportTemplate.vm | 131 +++++++++++++++++++ .../src/main/resources/createTable.sql | 23 ++++ .../MRHistoryJobApplicationHealthCheck.java | 1 - ...JobConfigurationCreationServiceListener.java | 1 - .../JobEntityCreationEagleServiceListener.java | 3 +- .../parser/TaskAttemptCounterListener.java | 3 +- .../mr/history/parser/TaskFailureListener.java | 1 - eagle-jpm/eagle-jpm-mr-running/pom.xml | 10 ++ .../jpm/mr/running/MRRunningJobApplication.java | 4 +- .../MRRunningJobApplicationProvider.java | 11 ++ .../parser/MRJobEntityCreationHandler.java | 1 - .../jpm/mr/running/parser/MRJobParser.java | 25 +++- .../mr/running/storm/MRRunningJobParseBolt.java | 9 +- .../mr/running/MRRunningJobApplicationTest.java | 4 +- .../jpm/mr/running/parser/MRJobParserTest.java | 17 +-- .../SparkHistoryJobApplicationHealthCheck.java | 3 +- .../history/crawl/JHFSparkEventReader.java | 2 +- .../parser/SparkAppEntityCreationHandler.java | 2 +- .../java/org/apache/eagle/jpm/util/Utils.java | 6 +- eagle-jpm/pom.xml | 1 + eagle-server-assembly/src/main/conf/eagle.conf | 6 + .../TopologyCheckApplicationHealthCheck.java | 3 +- 50 files changed, 1868 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java index 2a8d7c0..e537643 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java @@ -19,6 +19,7 @@ package org.apache.eagle.app.spi; import com.google.common.base.Preconditions; import com.google.inject.AbstractModule; +import com.google.inject.Singleton; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.app.Application; import org.apache.eagle.app.service.ApplicationListener; @@ -130,7 +131,7 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme currentRegistry.register(scope, new AbstractModule() { @Override protected void configure() { - bind(type).to(impl); + bind(type).to(impl).in(Singleton.class); } }); } http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java index 6471dfc..5a5d0ee 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApplicationHealthCheck.java @@ -52,7 +52,7 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC eagleServiceConfig.eagleService.username, eagleServiceConfig.eagleService.password); - client.getJerseyClient().setReadTimeout(60000); + client.setReadTimeout(60000); String message = ""; try { @@ -91,7 +91,6 @@ public class HadoopQueueRunningApplicationHealthCheck extends ApplicationHealthC } catch (Exception e) { return Result.unhealthy(printMessages(message, "An exception was caught when fetch application current process time: ", ExceptionUtils.getStackTrace(e))); } finally { - client.getJerseyClient().destroy(); try { client.close(); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java index f8840b2..54bd29b 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricAggregator.java @@ -74,6 +74,8 @@ public class MRMetricAggregator implements MetricAggregator, Serializable { .endTime(endTime) .pageSize(Integer.MAX_VALUE) .send(); + + client.close(); } catch (Exception e) { LOG.warn("{}", e); return false; @@ -151,6 +153,7 @@ public class MRMetricAggregator implements MetricAggregator, Serializable { client.create(entities); LOG.info("finish flushing entities of total number " + entities.size()); entities.clear(); + client.close(); } catch (Exception e) { LOG.warn("{}", e); return false; http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java index 45bbcef..dd1980b 100644 --- a/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java +++ b/eagle-jpm/eagle-jpm-aggregation/src/main/java/org/apache/eagle/jpm/aggregation/mr/MRMetricsAggregateContainer.java @@ -74,6 +74,7 @@ public class MRMetricsAggregateContainer implements MetricsAggregateContainer, S List<Map<List<String>, List<Double>>> results = response.getObj(); long currentProcessTimeStamp = results.get(0).get("value").get(0).longValue(); + client.close(); return currentProcessTimeStamp; } catch (Exception e) { LOG.warn("{}", e); http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml new file mode 100644 index 0000000..d6383b6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>eagle-jpm-parent</artifactId> + <groupId>org.apache.eagle</groupId> + <version>0.5.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>eagle-jpm-analyzer</artifactId> + <name>Eagle::App::JPM::Analyzer</name> + <url>http://maven.apache.org</url> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-metadata-base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-jpm-util</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-app-base</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java new file mode 100644 index 0000000..f9b7af0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/AnalyzerEntity.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +import java.util.HashMap; +import java.util.Map; + +/** + * will refactor later if other types of job needs this. + * AnalyzerEntity for each job needed to be analysised + */ +public class AnalyzerEntity { + private String jobDefId; + private String jobId; + private String siteId; + private String userId; + + private long startTime; + private long endTime; + private long durationTime; + private String currentState; + private double progress; + + private Map<String, Object> jobConfig = new HashMap<>(); + + private Map<String, Object> jobMeta = new HashMap<>(); + + public String getJobDefId() { + return jobDefId; + } + + public void setJobDefId(String jobDefId) { + this.jobDefId = jobDefId; + } + + public String getJobId() { + return jobId; + } + + public void setJobId(String jobId) { + this.jobId = jobId; + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getEndTime() { + return endTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public long getDurationTime() { + return durationTime; + } + + public void setDurationTime(long durationTime) { + this.durationTime = durationTime; + } + + public String getCurrentState() { + return currentState; + } + + public void setCurrentState(String currentState) { + this.currentState = currentState; + } + + public Map<String, Object> getJobConfig() { + return jobConfig; + } + + public void setJobConfig(Map<String, Object> jobConfig) { + this.jobConfig = jobConfig; + } + + public Map<String, Object> getJobMeta() { + return jobMeta; + } + + public void setJobMeta(Map<String, Object> jobMeta) { + this.jobMeta = jobMeta; + } + + public double getProgress() { + return progress; + } + + public void setProgress(double progress) { + this.progress = progress; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java new file mode 100644 index 0000000..6617916 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Evaluator.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +import org.apache.eagle.jpm.analyzer.publisher.Result; + +public interface Evaluator { + Result.EvaluatorResult evaluate(AnalyzerEntity analyzerEntity); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java new file mode 100644 index 0000000..6cda1cd --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/JobAnalyzer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +/** + * Each JobAnalyzer contains one or more Evaluators to analysis each job. + * Each Evaluator is a group of Processors + * Each Processor implements an algorithm or a model to analysis one dimension of a job + * + */ +public interface JobAnalyzer { + void analysis(AnalyzerEntity analyzerEntity) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java new file mode 100644 index 0000000..d5a8a74 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/Processor.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer; + +import org.apache.eagle.jpm.analyzer.publisher.Result; + +public interface Processor { + Result.ProcessorResult process(AnalyzerEntity jobAnalysisEntity); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java new file mode 100644 index 0000000..0935266 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta; + +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; + +import java.util.List; + +public interface MetaManagementService { + boolean addJobMeta(JobMetaEntity jobMetaEntity); + + boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity); + + List<JobMetaEntity> getJobMeta(String jobDefId); + + boolean deleteJobMeta(String jobDefId); + + boolean addPublisherMeta(PublisherEntity publisherEntity); + + boolean deletePublisherMeta(String userId); + + List<PublisherEntity> getPublisherMeta(String userId); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java new file mode 100644 index 0000000..cfb5029 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl; + +import com.google.inject.Inject; +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; + +public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class); + + @Inject + Config config; + + @Override + public boolean addJobMeta(JobMetaEntity jobMetaEntity) { + + return true; + } + + @Override + public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { + + return true; + } + + @Override + public List<JobMetaEntity> getJobMeta(String jobDefId) { + + return null; + } + + @Override + public boolean deleteJobMeta(String jobDefId) { + + return true; + } + + @Override + public boolean addPublisherMeta(PublisherEntity publisherEntity) { + + return true; + } + + @Override + public boolean deletePublisherMeta(String userId) { + + return true; + } + + @Override + public List<PublisherEntity> getPublisherMeta(String userId) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java new file mode 100644 index 0000000..85e8358 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl; + +import com.google.inject.Inject; +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.*; + +public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class); + + private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>(); + private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>(); + + @Inject + Config config; + + @Override + public boolean addJobMeta(JobMetaEntity jobMetaEntity) { + if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { + LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId()); + return false; + } + + jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity); + LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId()); + return true; + } + + @Override + public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { + if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { + LOG.warn("does not contain job {}, update job meta failed", jobDefId); + return false; + } + + jobMetaEntities.put(jobDefId, jobMetaEntity); + LOG.info("Successfully update job {} meta", jobDefId); + return true; + } + + @Override + public List<JobMetaEntity> getJobMeta(String jobDefId) { + if (!jobMetaEntities.containsKey(jobDefId)) { + LOG.warn("does not contain job {}, get job meta failed", jobDefId); + return new ArrayList<>(); + } + + return Arrays.asList(jobMetaEntities.get(jobDefId)); + } + + @Override + public boolean deleteJobMeta(String jobDefId) { + if (!jobMetaEntities.containsKey(jobDefId)) { + LOG.warn("does not contain job {}, delete job meta failed", jobDefId); + return false; + } + + jobMetaEntities.remove(jobDefId); + LOG.info("Successfully delete job {} meta", jobDefId); + return true; + } + + @Override + public boolean addPublisherMeta(PublisherEntity publisherEntity) { + if (publisherEntities.containsKey(publisherEntity.getUserId())) { + for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) { + if (entity.equals(publisherEntity)) { + LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress()); + return false; + } + } + } + + if (!publisherEntities.containsKey(publisherEntity.getUserId())) { + publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>()); + } + + publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity); + LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress()); + return true; + } + + @Override + public boolean deletePublisherMeta(String userId) { + if (!publisherEntities.containsKey(userId)) { + LOG.warn("does not contain user {}, failed to delete publisher", userId); + return false; + } + + publisherEntities.remove(userId); + LOG.info("Successfully delete publisher user " + userId); + return true; + } + + @Override + public List<PublisherEntity> getPublisherMeta(String userId) { + if (!publisherEntities.containsKey(userId)) { + LOG.warn("does not contain user {}, failed to get publisher", userId); + return new ArrayList<>(); + } + + return publisherEntities.get(userId); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java new file mode 100644 index 0000000..2e15c17 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.model; + +import org.apache.eagle.metadata.persistence.PersistenceEntity; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class JobMetaEntity extends PersistenceEntity { + private String jobDefId; + private String siteId; + private Map<String, Object> configuration = new HashMap<>(); + private Set<String> evaluators = new HashSet<>(); + + public JobMetaEntity() { + + } + + public JobMetaEntity(String jobDefId, + String siteId, + Map<String, Object> configuration, + Set<String> evaluators) { + this.jobDefId = jobDefId; + this.siteId = siteId; + this.configuration = configuration; + this.evaluators = evaluators; + } + + @Override + public String toString() { + return String.format("JobMetaEntity[jobDefId=%s, siteId=%s]", jobDefId, siteId); + } + + public String getJobDefId() { + return jobDefId; + } + + public void setJobDefId(String jobDefId) { + this.jobDefId = jobDefId; + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public Map<String, Object> getConfiguration() { + return configuration; + } + + public void setConfiguration(Map<String, Object> configuration) { + this.configuration = configuration; + } + + public Set<String> getEvaluators() { + return evaluators; + } + + public void setEvaluators(Set<String> evaluators) { + this.evaluators = evaluators; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java new file mode 100644 index 0000000..bca7ab1 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.model; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.eagle.metadata.persistence.PersistenceEntity; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PublisherEntity extends PersistenceEntity { + private String userId; + private String mailAddress; + + public PublisherEntity(String userId, String mailAddress) { + this.userId = userId; + this.mailAddress = mailAddress; + } + + @Override + public String toString() { + return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress); + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getMailAddress() { + return mailAddress; + } + + public void setMailAddress(String mailAddress) { + this.mailAddress = mailAddress; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(userId) + .append(mailAddress) + .build(); + } + + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + + if (!(that instanceof PublisherEntity)) { + return false; + } + + PublisherEntity another = (PublisherEntity)that; + + return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java new file mode 100644 index 0000000..e0e579a --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.*; +import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; +import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; +import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher; +import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher; +import org.apache.eagle.jpm.analyzer.publisher.Publisher; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class MRJobPerformanceAnalyzer implements JobAnalyzer, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MRJobPerformanceAnalyzer.class); + + private List<Evaluator> evaluators = new ArrayList<>(); + private List<Publisher> publishers = new ArrayList<>(); + + private Config config; + + public MRJobPerformanceAnalyzer(Config config) { + this.config = config; + evaluators.add(new SLAJobEvaluator(config)); + evaluators.add(new JobSuggestionEvaluator(config)); + + publishers.add(new EagleStorePublisher(config)); + publishers.add(new EmailPublisher(config)); + } + + @Override + public void analysis(AnalyzerEntity analyzerJobEntity) throws Exception { + Result result = new Result(); + + for (Evaluator evaluator : evaluators) { + result.addEvaluatorResult(evaluator.getClass(), evaluator.evaluate(analyzerJobEntity)); + } + + for (Publisher publisher : publishers) { + publisher.publish(analyzerJobEntity, result); + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java new file mode 100644 index 0000000..f10b68d --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.sla; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.mr.sla.processors.LongStuckJobProcessor; +import org.apache.eagle.jpm.analyzer.mr.sla.processors.UnExpectedLongDurationJobProcessor; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.analyzer.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class SLAJobEvaluator implements Evaluator, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SLAJobEvaluator.class); + + private List<Processor> processors = new ArrayList<>(); + private Config config; + + public SLAJobEvaluator(Config config) { + this.config = config; + processors.add(new UnExpectedLongDurationJobProcessor(config)); + processors.add(new LongStuckJobProcessor(config)); + } + + @Override + public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) { + Result.EvaluatorResult result = new Result.EvaluatorResult(); + + List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId()); + if (jobMetaEntities.size() == 0 + || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) { + LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId()); + return result; + } + + analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration()); + + for (Processor processor : processors) { + result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity)); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java new file mode 100644 index 0000000..35f3b27 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/LongStuckJobProcessor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.sla.processors; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class LongStuckJobProcessor implements Processor, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(LongStuckJobProcessor.class); + + private Config config; + + public LongStuckJobProcessor(Config config) { + this.config = config; + } + + @Override + public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { + LOG.info("Job {} In LongStuckJobProcessor", analyzerJobEntity.getJobDefId()); + return new Result.ProcessorResult(Result.ResultLevel.NONE, ""); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java new file mode 100644 index 0000000..9d4ce2b --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.sla.processors; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.analyzer.Processor; +import org.apache.eagle.jpm.analyzer.util.Constants; +import org.apache.eagle.jpm.analyzer.util.Utils; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.client.IEagleServiceClient; +import org.apache.eagle.service.client.impl.EagleServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URLEncoder; +import java.util.List; +import java.util.Map; + +public class UnExpectedLongDurationJobProcessor implements Processor, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(UnExpectedLongDurationJobProcessor.class); + + private Config config; + + public UnExpectedLongDurationJobProcessor(Config config) { + this.config = config; + } + + @Override + public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { + LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); + + Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta(); + long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + if (avgDurationTime == 0L) { + return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE); + } + + Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD; + if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) { + alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY); + } + List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold); + + double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime; + for (Map.Entry<Result.ResultLevel, Double> entry : sorted) { + if (expirePercent >= entry.getValue()) { + return new Result.ProcessorResult(entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds", + (int)(expirePercent * 100), avgDurationTime / 1000)); + } + } + + return new Result.ProcessorResult(Result.ResultLevel.NONE, Constants.PROCESS_NONE); + } + + private long getAvgDuration(AnalyzerEntity mrJobAnalysisEntity, Map<String, Object> jobMetaData) { + IEagleServiceClient client = new EagleServiceClientImpl( + config.getString(Constants.HOST_PATH), + config.getInt(Constants.PORT_PATH), + config.getString(Constants.USERNAME_PATH), + config.getString(Constants.PASSWORD_PATH)); + + client.setReadTimeout(config.getInt(Constants.READ_TIMEOUT_PATH) * 1000); + + try { + int timeLength = Constants.DEFAULT_EVALUATOR_TIME_LENGTH; + try { + if (jobMetaData.containsKey(Constants.EVALUATOR_TIME_LENGTH_KEY)) { + timeLength = Integer.parseInt(jobMetaData.get(Constants.EVALUATOR_TIME_LENGTH_KEY).toString()); + } + } catch (Exception e) { + LOG.warn("exception found when parse timeLength {}, use default", e); + } + + String query = String.format("%s[@site=\"%s\" and @jobDefId=\"%s\"]<@site>{avg(durationTime)}", + org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME, + mrJobAnalysisEntity.getSiteId(), + URLEncoder.encode(mrJobAnalysisEntity.getJobDefId())); + + GenericServiceAPIResponseEntity response = client + .search(query) + .startTime(System.currentTimeMillis() - (timeLength + 1) * 24 * 60 * 60000L) + .endTime(System.currentTimeMillis() - 24 * 60 * 60000L) + .pageSize(10) + .send(); + + List<Map<List<String>, List<Double>>> results = response.getObj(); + if (results.size() == 0) { + return 0L; + } + return results.get(0).get("value").get(0).longValue(); + } catch (Exception e) { + LOG.warn("{}", e); + return 0L; + } finally { + try { + client.close(); + } catch (Exception e) { + LOG.warn("{}", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java new file mode 100644 index 0000000..79f5318 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.mr.suggestion; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.Evaluator; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class JobSuggestionEvaluator implements Evaluator, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(JobSuggestionEvaluator.class); + + private Config config; + + public JobSuggestionEvaluator(Config config) { + this.config = config; + } + + @Override + public Result.EvaluatorResult evaluate(AnalyzerEntity mrJobEntity) { + Result.EvaluatorResult result = new Result.EvaluatorResult(); + //TODO + return result; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java new file mode 100644 index 0000000..6109704 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +public class EagleStorePublisher implements Publisher, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(EagleStorePublisher.class); + + private Config config; + + public EagleStorePublisher(Config config) { + this.config = config; + } + + @Override + public void publish(AnalyzerEntity analyzerJobEntity, Result result) { + + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java new file mode 100644 index 0000000..4e49094 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher; + +import com.typesafe.config.Config; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.app.service.ApplicationEmailService; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.mail.AlertEmailConstants; +import org.apache.eagle.common.mail.AlertEmailContext; +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; +import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; +import org.apache.eagle.jpm.analyzer.util.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EmailPublisher implements Publisher, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class); + + private Config config; + private AlertDeduplicator alertDeduplicator; + + public EmailPublisher(Config config) { + this.config = config; + this.alertDeduplicator = new SimpleDeduplicator(); + } + + @Override + public void publish(AnalyzerEntity analyzerJobEntity, Result result) { + if (result.getAlertMessages().size() == 0) { + return; + } + + LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId()); + if (alertDeduplicator.dedup(analyzerJobEntity, result)) { + LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); + return; + } + + Map<String, String> basic = new HashMap<>(); + basic.put("site", analyzerJobEntity.getSiteId()); + basic.put("name", analyzerJobEntity.getJobDefId()); + basic.put("user", analyzerJobEntity.getUserId()); + basic.put("status", analyzerJobEntity.getCurrentState()); + basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s"); + basic.put("start", DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getStartTime())); + basic.put("end", analyzerJobEntity.getEndTime() == 0 + ? "0" + : DateTimeUtil.millisecondsToHumanDateWithSeconds(analyzerJobEntity.getEndTime())); + basic.put("progress", analyzerJobEntity.getProgress() + "%"); + basic.put("detail", getJobLink(analyzerJobEntity)); + + + Map<String, Map<String, String>> extend = new HashMap<>(); + Map<String, List<Pair<Result.ResultLevel, String>>> alertMessages = result.getAlertMessages(); + for (String evaluator : alertMessages.keySet()) { + List<Pair<Result.ResultLevel, String>> messages = alertMessages.get(evaluator); + extend.put(evaluator, new HashMap<>()); + for (Pair<Result.ResultLevel, String> message : messages) { + LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", + analyzerJobEntity.getJobDefId(), message.getRight(), message.getLeft(), evaluator); + extend.get(evaluator).put(message.getRight(), message.getLeft().toString()); + } + } + + Map<String, Object> alertData = new HashMap<>(); + alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); + alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); + + //TODO, override email config in job meta data + ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH); + String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId()); + AlertEmailContext alertContext = emailService.buildEmailContext(subject); + emailService.onAlert(alertContext, alertData); + } + + private String getJobLink(AnalyzerEntity analyzerJobEntity) { + return "http://" + + config.getString(Constants.HOST_PATH) + + ":" + + config.getInt(Constants.PORT_PATH) + + "/#/site/" + + analyzerJobEntity.getSiteId() + + "/jpm/detail/" + + analyzerJobEntity.getJobId(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java new file mode 100644 index 0000000..2f42bf9 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Publisher.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher; + +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; + +public interface Publisher { + void publish(AnalyzerEntity analyzerJobEntity, Result result); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java new file mode 100644 index 0000000..a12f589 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Result { + //for EagleStorePublisher + private TaggedLogAPIEntity alertEntity = null;//TODO + //for EmailPublisher + private Map<String, List<Pair<ResultLevel, String>>> alertMessages = new HashMap<>(); + + public void addEvaluatorResult(Class<?> type, EvaluatorResult result) { + Map<Class<?>, ProcessorResult> processorResults = result.getProcessorResults(); + for (Class<?> processorType : processorResults.keySet()) { + ProcessorResult processorResult = processorResults.get(processorType); + if (processorResult.resultLevel.equals(ResultLevel.NONE)) { + continue; + } + + String typeName = type.getName(); + if (!alertMessages.containsKey(typeName)) { + alertMessages.put(typeName, new ArrayList<>()); + } + alertMessages.get(typeName).add(Pair.of(processorResult.getResultLevel(), processorResult.getMessage())); + } + } + + public TaggedLogAPIEntity getAlertEntity() { + return alertEntity; + } + + public Map<String, List<Pair<ResultLevel, String>>> getAlertMessages() { + return alertMessages; + } + + /** + * Processor result. + */ + + public enum ResultLevel { + NONE, + NOTICE, + WARNING, + CRITICAL + } + + public static class ProcessorResult { + private ResultLevel resultLevel; + private String message; + + public ProcessorResult(ResultLevel resultLevel, String message) { + this.resultLevel = resultLevel; + this.message = message; + } + + public ResultLevel getResultLevel() { + return resultLevel; + } + + public void setResultLevel(ResultLevel resultLevel) { + this.resultLevel = resultLevel; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } + + /** + * Evaluator result. + */ + public static class EvaluatorResult { + private Map<Class<?>, ProcessorResult> processorResults = new HashMap<>(); + + public void addProcessorResult(Class<?> type, ProcessorResult result) { + this.processorResults.put(type, result); + } + + public Map<Class<?>, ProcessorResult> getProcessorResults() { + return this.processorResults; + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java new file mode 100644 index 0000000..4b18f7c --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/AlertDeduplicator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher.dedup; + +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; + +public interface AlertDeduplicator { + boolean dedup(AnalyzerEntity analyzerJobEntity, Result result); +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java new file mode 100644 index 0000000..09f1af6 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.publisher.dedup.impl; + +import org.apache.eagle.jpm.analyzer.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; +import org.apache.eagle.jpm.analyzer.util.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * dedup by jobDefId. + */ +public class SimpleDeduplicator implements AlertDeduplicator, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class); + + private Map<String, Long> lastUpdateTime = new HashMap<>(); + + @Override + public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { + long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; + if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) { + dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY); + } + + dedupInterval = dedupInterval * 1000; + long currentTimeStamp = System.currentTimeMillis(); + if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { + if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { + return true; + } else { + return false; + } + } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java new file mode 100644 index 0000000..dc09202 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.resource; + +import com.google.inject.Inject; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.metadata.resource.RESTResponse; + +import javax.ws.rs.*; +import javax.ws.rs.core.MediaType; +import java.util.List; + +import static org.apache.eagle.jpm.analyzer.util.Constants.*; + +@Path(ANALYZER_PATH) +public class AnalyzerResource { + @Inject + MetaManagementService metaManagementService; + + public AnalyzerResource() { + } + + @POST + @Path(META_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) { + return RESTResponse.<Void>async((response) -> { + jobMetaEntity.ensureDefault(); + boolean ret = metaManagementService.addJobMeta(jobMetaEntity); + String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId(); + if (!ret) { + message = "Failed to add job meta for " + jobMetaEntity.getJobDefId(); + } + response.success(ret).message(message); + }).get(); + } + + @POST + @Path(JOB_META_PATH) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) { + return RESTResponse.<Void>async((response) -> { + jobMetaEntity.ensureDefault(); + boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity); + String message = "Successfully update job meta for " + jobDefId; + if (!ret) { + message = "Failed to update job meta for " + jobDefId; + } + response.success(ret).message(message); + }).get(); + } + + @GET + @Path(JOB_META_PATH) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { + return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get(); + } + + @DELETE + @Path(JOB_META_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { + return RESTResponse.<Void>async((response) -> { + boolean ret = metaManagementService.deleteJobMeta(jobDefId); + String message = "Successfully delete job meta for " + jobDefId; + if (!ret) { + message = "Failed to delete job meta for " + jobDefId; + } + + response.success(ret).message(message); + }).get(); + } + + @POST + @Path(PUBLISHER_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) { + return RESTResponse.<Void>async((response) -> { + publisherEntity.ensureDefault(); + boolean ret = metaManagementService.addPublisherMeta(publisherEntity); + String message = "Successfully add publisher meta for " + publisherEntity.getUserId(); + if (!ret) { + message = "Failed to add publisher meta for " + publisherEntity.getUserId(); + } + response.success(ret).message(message); + }).get(); + } + + @DELETE + @Path(PUBLISHER_META_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) { + return RESTResponse.<Void>async((response) -> { + boolean ret = metaManagementService.deletePublisherMeta(userId); + String message = "Successfully delete publisher meta for " + userId; + if (!ret) { + message = "Failed to delete publisher meta for " + userId; + } + response.success(ret).message(message); + }).get(); + } + + @GET + @Path(PUBLISHER_META_PATH) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) { + return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get(); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java new file mode 100644 index 0000000..774e6d2 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.analyzer.util; + +import org.apache.eagle.jpm.analyzer.publisher.Result; + +import java.util.HashMap; +import java.util.Map; + +public class Constants { + public static final String HOST_PATH = "service.host"; + public static final String PORT_PATH = "service.port"; + public static final String USERNAME_PATH = "service.username"; + public static final String PASSWORD_PATH = "service.password"; + public static final String CONTEXT_PATH = "service.context"; + public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds"; + + public static final String META_PATH = "/metadata"; + public static final String ANALYZER_PATH = "/job/analyzer"; + public static final String JOB_DEF_PATH = "jobDefId"; + public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}"; + + public static final String PUBLISHER_PATH = "/publisher"; + public static final String USER_PATH = "userId"; + public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}"; + + public static final String PROCESS_NONE = "PROCESS_NONE"; + + public static final String EVALUATOR_TIME_LENGTH_KEY = "evaluator.timeLength"; + public static final int DEFAULT_EVALUATOR_TIME_LENGTH = 7;//7 days + + public static final String ALERT_THRESHOLD_KEY = "alert.threshold"; + public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() { + { + put(Result.ResultLevel.NOTICE, 0.1); + put(Result.ResultLevel.WARNING, 0.3); + put(Result.ResultLevel.CRITICAL, 0.5); + } + }; + + public static final String DEDUP_INTERVAL_KEY = "alert.dedupInterval"; //seconds + public static final int DEFAULT_DEDUP_INTERVAL = 300; + + public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport"; + public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s"; + + public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic"; + public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend"; +} http://git-wip-us.apache.org/repos/asf/eagle/blob/b4695801/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java new file mode 100644 index 0000000..66f7622 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.eagle.jpm.analyzer.util; + +import com.typesafe.config.Config; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; +import org.apache.eagle.metadata.resource.RESTResponse; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.net.URLEncoder; +import java.util.*; + +public class Utils { + private static final Logger LOG = LoggerFactory.getLogger(Utils.class); + + private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); + + static { + OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); + } + + public static List<JobMetaEntity> getJobMeta(Config config, String jobDefId) { + List<JobMetaEntity> result = new ArrayList<>(); + String url = "http://" + + config.getString(Constants.HOST_PATH) + + ":" + + config.getInt(Constants.PORT_PATH) + + config.getString(Constants.CONTEXT_PATH) + + Constants.ANALYZER_PATH + + Constants.META_PATH + + "/" + + URLEncoder.encode(jobDefId); + + InputStream is = null; + try { + is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE); + LOG.info("get job meta from {}", url); + result = (List<JobMetaEntity>)OBJ_MAPPER.readValue(is, RESTResponse.class).getData(); + } catch (Exception e) { + LOG.warn("failed to get job meta from {}", url, e); + } finally { + org.apache.eagle.jpm.util.Utils.closeInputStream(is); + return result; + } + } + + public static <K, V extends Comparable<? super V>> List<Map.Entry<K, V>> sortByValue(Map<K, V> map) { + List<Map.Entry<K, V>> list = new LinkedList<>(map.entrySet()); + Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue())); + Collections.reverse(list); + return list; + } +}
