[EAGLE-935] add jdbc storage support for sla job meta Author: wujinhu <[email protected]>
Closes #854 from wujinhu/EAGLE-935. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/d766f681 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/d766f681 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/d766f681 Branch: refs/heads/master Commit: d766f68152d6cd120d3d0487907f7ee85f7a614a Parents: c75eadd Author: wujinhu <[email protected]> Authored: Tue Mar 7 10:07:01 2017 +0800 Committer: wujinhu <[email protected]> Committed: Tue Mar 7 10:07:01 2017 +0800 ---------------------------------------------------------------------- .../app/test/ApplicationSimulatorImpl.java | 40 ++-- eagle-jpm/eagle-jpm-analyzer/pom.xml | 5 + .../analyzer/meta/MetaManagementService.java | 16 +- .../impl/MetaManagementServiceJDBCImpl.java | 180 +++++++++++++-- .../impl/MetaManagementServiceMemoryImpl.java | 79 ++++--- .../meta/impl/orm/JobMetaEntityToRelation.java | 62 ++++++ .../meta/impl/orm/RelationToJobMetaEntity.java | 93 ++++++++ .../impl/orm/RelationToUserEmailEntity.java | 37 ++++ .../impl/orm/UserEmailEntityToRelation.java | 57 +++++ .../jpm/analyzer/meta/model/AnalyzerEntity.java | 6 +- .../jpm/analyzer/meta/model/JobMetaEntity.java | 15 +- .../analyzer/meta/model/PublisherEntity.java | 77 ------- .../analyzer/meta/model/UserEmailEntity.java | 91 ++++++++ .../analyzer/mr/MRJobPerformanceAnalyzer.java | 12 +- .../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 12 +- .../UnExpectedLongDurationJobProcessor.java | 11 +- .../mr/suggestion/JobSuggestionEvaluator.java | 10 + .../MapReduceQueueResourceProcessor.java | 2 - .../MapReduceSplitSettingProcessor.java | 3 - .../analyzer/publisher/EagleStorePublisher.java | 6 - .../jpm/analyzer/publisher/EmailPublisher.java | 51 ++++- .../eagle/jpm/analyzer/publisher/Result.java | 31 ++- .../dedup/impl/SimpleDeduplicator.java | 29 +-- .../jpm/analyzer/resource/AnalyzerResource.java | 87 +++++--- .../eagle/jpm/analyzer/util/Constants.java | 20 +- .../apache/eagle/jpm/analyzer/util/Utils.java | 38 +++- .../main/resources/AnalyzerReportTemplate.vm | 219 +++++++++++++++---- .../src/main/resources/createTable.sql | 27 +-- .../MRHistoryJobApplicationProvider.java | 13 +- .../history/crawler/JHFCrawlerDriverImpl.java | 2 +- .../history/parser/JobSuggestionListener.java | 9 +- .../jpm/mr/history/storm/JobHistorySpout.java | 2 +- 32 files changed, 1012 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index a5f5a73..b10205f 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -26,12 +26,18 @@ import org.apache.eagle.metadata.model.SiteEntity; import org.apache.eagle.metadata.resource.SiteResource; import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ApplicationSimulatorImpl extends ApplicationSimulator { + private static final Logger LOG = LoggerFactory.getLogger(ApplicationSimulatorImpl.class); + private final Config config; private final SiteResource siteResource; private final ApplicationResource applicationResource; @@ -74,26 +80,28 @@ public class ApplicationSimulatorImpl extends ApplicationSimulator { // Start application applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid())); statusUpdateService.updateApplicationEntityStatus(applicationEntity); - applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); - int attempt = 0; - while (attempt < 10) { - attempt++; - statusUpdateService.updateApplicationEntityStatus(applicationEntity); - if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED - || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) { - break; - } else { + Semaphore semp = new Semaphore(1); + Thread stopThread = new Thread(() -> { + applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); + while (applicationEntity.getStatus() != ApplicationEntity.Status.INITIALIZED + && applicationEntity.getStatus() != ApplicationEntity.Status.STOPPED) { + statusUpdateService.updateApplicationEntityStatus(applicationEntity); try { - Thread.sleep(500); - } catch (InterruptedException e) { - // Ignore + Thread.sleep(1000); + } catch (Exception e) { + LOG.warn("{}", e); } } + semp.release(); + }); + stopThread.start(); + try { + stopThread.join(60000L); + semp.tryAcquire(60, TimeUnit.SECONDS); + applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); + } catch (Exception e) { + throw new IllegalStateException("Application status didn't become STOPPED"); } - if (attempt >= 10 ) { - throw new IllegalStateException("Application status didn't become STOPPED in 10 attempts"); - } - applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml index 07f5766..a2943df 100644 --- a/eagle-jpm/eagle-jpm-analyzer/pom.xml +++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml @@ -55,5 +55,10 @@ <artifactId>eagle-app-base</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.eagle</groupId> + <artifactId>eagle-metadata-jdbc</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java index 0935266..73b7b81 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java @@ -18,22 +18,24 @@ package org.apache.eagle.jpm.analyzer.meta; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import java.util.List; public interface MetaManagementService { boolean addJobMeta(JobMetaEntity jobMetaEntity); - boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity); + boolean updateJobMeta(JobMetaEntity jobMetaEntity); - List<JobMetaEntity> getJobMeta(String jobDefId); + List<JobMetaEntity> getJobMeta(String siteId, String jobDefId); - boolean deleteJobMeta(String jobDefId); + boolean deleteJobMeta(String siteId, String jobDefId); - boolean addPublisherMeta(PublisherEntity publisherEntity); + boolean addUserEmailMeta(UserEmailEntity userEmailEntity); - boolean deletePublisherMeta(String userId); + boolean updateUserEmailMeta(UserEmailEntity userEmailEntity); - List<PublisherEntity> getPublisherMeta(String userId); + boolean deleteUserEmailMeta(String siteId, String userId); + + List<UserEmailEntity> getUserEmailMeta(String siteId, String userId); } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java index cfb5029..2048e97 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java @@ -17,61 +17,215 @@ package org.apache.eagle.jpm.analyzer.meta.impl; -import com.google.inject.Inject; import com.typesafe.config.Config; +import org.apache.commons.lang.StringUtils; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.JobMetaEntityToRelation; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToJobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToUserEmailEntity; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.UserEmailEntityToRelation; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; +import org.apache.eagle.metadata.store.jdbc.JDBCMetadataQueryService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.io.Serializable; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class); + private static final String addJobMetaSql = "INSERT INTO analysis_jobs(uuid, configuration, evaluators, createdtime, modifiedtime, siteId, jobDefId) VALUES (?, ?, ?, ?, ?, ?, ?)"; + private static final String addUserEmailSql = "INSERT INTO analysis_email(uuid, mailAddress, createdtime, modifiedtime, siteId, userId) VALUES (?, ?, ?, ?, ?, ?)"; + + private static final String getJobMetaSql = "SELECT * FROM analysis_jobs where siteId = ? and jobDefId = ?"; + private static final String getUserEmailSql = "SELECT * FROM analysis_email where siteId = ? and userId = ?"; + + private static final String deleteJobMetaSql = "DELETE FROM analysis_jobs where siteId = ? and jobDefId = ?"; + private static final String deleteUserEmailSql = "DELETE FROM analysis_email where siteId = ? and userId = ?"; + @Inject Config config; + @Inject + JDBCMetadataQueryService queryService; + @Override public boolean addJobMeta(JobMetaEntity jobMetaEntity) { - + if (getJobMeta(jobMetaEntity.getSiteId(), jobMetaEntity.getJobDefId()) != null) { + throw new IllegalArgumentException("Duplicated job meta: " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId()); + } + + List<JobMetaEntity> entities = new ArrayList<>(1); + entities.add(jobMetaEntity); + try { + queryService.insert(addJobMetaSql, entities, new JobMetaEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to insert JobMetaEntity: {}", jobMetaEntity, e); + return false; + } return true; } @Override - public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { - + public boolean updateJobMeta(JobMetaEntity entity) { + String updateSql = "update analysis_jobs set "; + if (entity.getUuid() != null && !entity.getUuid().isEmpty()) { + updateSql += "uuid = ?, "; + } + if (entity.getConfiguration() != null) { + updateSql += "configuration = ?, "; + } + if (entity.getEvaluators() != null) { + updateSql += "evaluators = ?, "; + } + if (entity.getCreatedTime() > 0) { + updateSql += "createdtime = ?, "; + } + if (entity.getModifiedTime() > 0) { + updateSql += "modifiedtime = ?, "; + } + updateSql = updateSql.substring(0, updateSql.length() - 2); + if (StringUtils.isNotBlank(entity.getSiteId())) { + updateSql += " where siteId = ?"; + } + if (StringUtils.isNotBlank(entity.getJobDefId())) { + updateSql += " and jobDefId = ?"; + } + + try { + if (queryService.update(updateSql, entity, new JobMetaEntityToRelation()) == 0) { + LOG.warn("failed to execute {}", updateSql); + } + } catch (SQLException e) { + LOG.warn("failed to execute {}, {}", updateSql, e); + return false; + } return true; } @Override - public List<JobMetaEntity> getJobMeta(String jobDefId) { - - return null; + public List<JobMetaEntity> getJobMeta(String siteId, String jobDefId) { + JobMetaEntity jobMetaEntity = new JobMetaEntity(); + jobMetaEntity.setSiteId(siteId); + jobMetaEntity.setJobDefId(jobDefId); + + List<JobMetaEntity> results; + try { + results = queryService.queryWithCond(getJobMetaSql, jobMetaEntity, new JobMetaEntityToRelation(), new RelationToJobMetaEntity()); + } catch (SQLException e) { + LOG.error("Error to getJobMeta : {}", e); + return null; + } + if (results.isEmpty()) { + return null; + } + + return results; } @Override - public boolean deleteJobMeta(String jobDefId) { + public boolean deleteJobMeta(String siteId, String jobDefId) { + JobMetaEntity entity = new JobMetaEntity(); + entity.setSiteId(siteId); + entity.setJobDefId(jobDefId); + try { + queryService.update(deleteJobMetaSql, entity, new JobMetaEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to delete JobMetaEntity: {}", entity, e); + return false; + } return true; } @Override - public boolean addPublisherMeta(PublisherEntity publisherEntity) { + public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) { + if (getUserEmailMeta(userEmailEntity.getSiteId(), userEmailEntity.getUserId()) != null) { + throw new IllegalArgumentException("Duplicated user meta: " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId()); + } + + List<UserEmailEntity> entities = new ArrayList<>(1); + entities.add(userEmailEntity); + try { + queryService.insert(addUserEmailSql, entities, new UserEmailEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to insert UserEmailEntity: {}", userEmailEntity, e); + return false; + } + return true; + } + @Override + public boolean updateUserEmailMeta(UserEmailEntity entity) { + String updateSql = "update analysis_email set "; + if (entity.getUuid() != null && !entity.getUuid().isEmpty()) { + updateSql += "uuid = ?, "; + } + if (entity.getMailAddress() != null && !entity.getMailAddress().isEmpty()) { + updateSql += "mailAddress = ?, "; + } + if (entity.getCreatedTime() > 0) { + updateSql += "createdtime = ?, "; + } + if (entity.getModifiedTime() > 0) { + updateSql += "modifiedtime = ?, "; + } + updateSql = updateSql.substring(0, updateSql.length() - 2); + if (StringUtils.isNotBlank(entity.getSiteId())) { + updateSql += " where siteId = ?"; + } + if (StringUtils.isNotBlank(entity.getUserId())) { + updateSql += " and userId = ?"; + } + + try { + if (queryService.update(updateSql, entity, new UserEmailEntityToRelation()) == 0) { + LOG.warn("failed to execute {}", updateSql); + } + } catch (SQLException e) { + LOG.warn("failed to execute {}, {}", updateSql, e); + return false; + } return true; } @Override - public boolean deletePublisherMeta(String userId) { + public boolean deleteUserEmailMeta(String siteId, String userId) { + UserEmailEntity entity = new UserEmailEntity(); + entity.setSiteId(siteId); + entity.setUserId(userId); + try { + queryService.update(deleteUserEmailSql, entity, new UserEmailEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to delete UserEmailEntity: {}", entity, e); + return false; + } return true; } @Override - public List<PublisherEntity> getPublisherMeta(String userId) { - return null; + public List<UserEmailEntity> getUserEmailMeta(String siteId, String userId) { + UserEmailEntity userEmailEntity = new UserEmailEntity(); + userEmailEntity.setSiteId(siteId); + userEmailEntity.setUserId(userId); + + List<UserEmailEntity> results; + try { + results = queryService.queryWithCond(getUserEmailSql, userEmailEntity, new UserEmailEntityToRelation(), new RelationToUserEmailEntity()); + } catch (SQLException e) { + LOG.error("Error to getJobMeta : {}", e); + return null; + } + if (results.isEmpty()) { + return null; + } + + return results; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java index 85e8358..b7582c1 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java @@ -17,94 +17,105 @@ package org.apache.eagle.jpm.analyzer.meta.impl; -import com.google.inject.Inject; import com.typesafe.config.Config; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.io.Serializable; import java.util.*; public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class); - private final Map<String, JobMetaEntity> jobMetaEntities = new HashMap<>(); - private final Map<String, List<PublisherEntity>> publisherEntities = new HashMap<>(); + private final Map<String, Map<String, JobMetaEntity>> jobMetaEntities = new HashMap<>(); + private final Map<String, Map<String, UserEmailEntity>> publisherEntities = new HashMap<>(); @Inject Config config; @Override public boolean addJobMeta(JobMetaEntity jobMetaEntity) { - if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { - LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId()); - return false; + if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) { + jobMetaEntities.put(jobMetaEntity.getSiteId(), new HashMap<>()); } - jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity); + jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity); LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId()); return true; } @Override - public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { - if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { - LOG.warn("does not contain job {}, update job meta failed", jobDefId); + public boolean updateJobMeta(JobMetaEntity jobMetaEntity) { + if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) { + LOG.warn("does not contain siteId {}, update job meta failed", jobMetaEntity.getSiteId()); return false; } - jobMetaEntities.put(jobDefId, jobMetaEntity); - LOG.info("Successfully update job {} meta", jobDefId); + jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity); + LOG.info("Successfully update job {} meta", jobMetaEntity.getJobDefId()); return true; } @Override - public List<JobMetaEntity> getJobMeta(String jobDefId) { - if (!jobMetaEntities.containsKey(jobDefId)) { - LOG.warn("does not contain job {}, get job meta failed", jobDefId); + public List<JobMetaEntity> getJobMeta(String siteId, String jobDefId) { + if (!jobMetaEntities.containsKey(siteId)) { + LOG.warn("does not contain site {}, get job meta failed", siteId); return new ArrayList<>(); } - return Arrays.asList(jobMetaEntities.get(jobDefId)); + return Arrays.asList(jobMetaEntities.get(siteId).get(jobDefId)); } @Override - public boolean deleteJobMeta(String jobDefId) { - if (!jobMetaEntities.containsKey(jobDefId)) { - LOG.warn("does not contain job {}, delete job meta failed", jobDefId); + public boolean deleteJobMeta(String siteId, String jobDefId) { + if (!jobMetaEntities.containsKey(siteId)) { + LOG.warn("does not contain siteId {}, delete job meta failed", siteId); return false; } - jobMetaEntities.remove(jobDefId); + jobMetaEntities.get(siteId).remove(jobDefId); LOG.info("Successfully delete job {} meta", jobDefId); return true; } @Override - public boolean addPublisherMeta(PublisherEntity publisherEntity) { - if (publisherEntities.containsKey(publisherEntity.getUserId())) { - for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) { - if (entity.equals(publisherEntity)) { + public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) { + if (publisherEntities.containsKey(userEmailEntity.getSiteId())) { + for (UserEmailEntity entity : publisherEntities.get(userEmailEntity.getSiteId()).values()) { + if (entity.equals(userEmailEntity)) { LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress()); return false; } } } - if (!publisherEntities.containsKey(publisherEntity.getUserId())) { - publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>()); + if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) { + publisherEntities.put(userEmailEntity.getSiteId(), new HashMap<>()); } - publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity); - LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress()); + publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity); + LOG.info("Successfully add publisher user {}, mailAddress {}", userEmailEntity.getUserId(), userEmailEntity.getMailAddress()); return true; } @Override - public boolean deletePublisherMeta(String userId) { + public boolean updateUserEmailMeta(UserEmailEntity userEmailEntity) { + if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) { + LOG.warn("does not contain siteId {}, update user email meta failed", userEmailEntity.getSiteId()); + return false; + } + + publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity); + LOG.info("Successfully update user {} meta", userEmailEntity.getUserId()); + return true; + } + + @Override + public boolean deleteUserEmailMeta(String siteId, String userId) { if (!publisherEntities.containsKey(userId)) { LOG.warn("does not contain user {}, failed to delete publisher", userId); return false; @@ -116,12 +127,12 @@ public class MetaManagementServiceMemoryImpl implements MetaManagementService, S } @Override - public List<PublisherEntity> getPublisherMeta(String userId) { - if (!publisherEntities.containsKey(userId)) { - LOG.warn("does not contain user {}, failed to get publisher", userId); + public List<UserEmailEntity> getUserEmailMeta(String siteId, String userId) { + if (!publisherEntities.containsKey(siteId)) { + LOG.warn("does not contain siteId {}, failed to get publisher", siteId); return new ArrayList<>(); } - return publisherEntities.get(userId); + return Arrays.asList(publisherEntities.get(siteId).get(userId)); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java new file mode 100644 index 0000000..5053b50 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.commons.lang.StringUtils; +import org.apache.eagle.common.function.ThrowableConsumer2; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class JobMetaEntityToRelation implements ThrowableConsumer2<PreparedStatement, JobMetaEntity, SQLException> { + @Override + public void accept(PreparedStatement statement, JobMetaEntity entity) throws SQLException { + int parameterIndex = 1; + if (entity.getUuid() != null && StringUtils.isNotBlank(entity.getUuid())) { + statement.setString(parameterIndex, entity.getUuid()); + parameterIndex++; + } + if (entity.getConfiguration() != null) { + statement.setString(parameterIndex, JSONObject.toJSONString(entity.getConfiguration())); + parameterIndex++; + } + if (entity.getEvaluators() != null) { + statement.setString(parameterIndex, JSONArray.toJSONString(entity.getEvaluators())); + parameterIndex++; + } + if (entity.getCreatedTime() > 0) { + statement.setLong(parameterIndex, entity.getCreatedTime()); + parameterIndex++; + } + if (entity.getModifiedTime() > 0) { + statement.setLong(parameterIndex, entity.getModifiedTime()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getSiteId())) { + statement.setString(parameterIndex, entity.getSiteId()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getJobDefId())) { + statement.setString(parameterIndex, entity.getJobDefId()); + parameterIndex++; + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java new file mode 100644 index 0000000..180eb8d --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.eagle.common.function.ThrowableFunction; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + + +public class RelationToJobMetaEntity implements ThrowableFunction<ResultSet, JobMetaEntity, SQLException> { + private static final Logger LOG = LoggerFactory.getLogger(RelationToJobMetaEntity.class); + + @Override + public JobMetaEntity apply(ResultSet resultSet) throws SQLException { + JobMetaEntity jobMetaEntity = new JobMetaEntity(); + jobMetaEntity.setUuid(resultSet.getString(1)); + jobMetaEntity.setJobDefId(resultSet.getString(2)); + jobMetaEntity.setSiteId(resultSet.getString(3)); + jobMetaEntity.setConfiguration(parse(resultSet.getString(4))); + jobMetaEntity.setEvaluators(new ArrayList<>()); + try { + JSONArray jsonArray = new JSONArray(resultSet.getString(5)); + for (int i = 0; i < jsonArray.length(); ++i) { + jobMetaEntity.getEvaluators().add(jsonArray.getString(i)); + } + } catch (Exception e) { + LOG.warn("{}", e); + } + jobMetaEntity.setCreatedTime(resultSet.getLong(6)); + jobMetaEntity.setModifiedTime(resultSet.getLong(7)); + + return jobMetaEntity; + } + + private Map<String, Object> parse(String field) { + Map<String, Object> items = new java.util.HashMap<>(); + try { + JSONObject jsonObject = new JSONObject(field); + + Iterator<String> keyItemItr = jsonObject.keys(); + while (keyItemItr.hasNext()) { + String itemKey = keyItemItr.next(); + if (canParseToMap(jsonObject.getString(itemKey))) { + items.put(itemKey, parse(jsonObject.getString(itemKey))); + } else { + items.put(itemKey, jsonObject.get(itemKey)); + } + } + + } catch (Exception e) { + LOG.warn("{}", e); + } + + return items; + } + + private boolean canParseToMap(String field) { + try { + JSONObject jsonObject = new JSONObject(field); + Iterator<String> keyItemItr = jsonObject.keys(); + while (keyItemItr.hasNext()) { + keyItemItr.next(); + } + return true; + } catch (Exception e) { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java new file mode 100644 index 0000000..ec86506 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.eagle.common.function.ThrowableFunction; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class RelationToUserEmailEntity implements ThrowableFunction<ResultSet, UserEmailEntity, SQLException> { + @Override + public UserEmailEntity apply(ResultSet resultSet) throws SQLException { + UserEmailEntity userEmailEntity = new UserEmailEntity(); + userEmailEntity.setUuid(resultSet.getString(1)); + userEmailEntity.setUserId(resultSet.getString(2)); + userEmailEntity.setSiteId(resultSet.getString(3)); + userEmailEntity.setMailAddress(resultSet.getString(4)); + userEmailEntity.setCreatedTime(resultSet.getLong(5)); + userEmailEntity.setModifiedTime(resultSet.getLong(6)); + return userEmailEntity; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java new file mode 100644 index 0000000..29958b0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + + +import org.apache.commons.lang.StringUtils; +import org.apache.eagle.common.function.ThrowableConsumer2; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class UserEmailEntityToRelation implements ThrowableConsumer2<PreparedStatement, UserEmailEntity, SQLException> { + @Override + public void accept(PreparedStatement statement, UserEmailEntity entity) throws SQLException { + int parameterIndex = 1; + if (StringUtils.isNotBlank(entity.getUuid())) { + statement.setString(parameterIndex, entity.getUuid()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getMailAddress())) { + statement.setString(parameterIndex, entity.getMailAddress()); + parameterIndex++; + } + if (entity.getCreatedTime() > 0) { + statement.setLong(parameterIndex, entity.getCreatedTime()); + parameterIndex++; + } + if (entity.getModifiedTime() > 0) { + statement.setLong(parameterIndex, entity.getModifiedTime()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getSiteId())) { + statement.setString(parameterIndex, entity.getSiteId()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getUserId())) { + statement.setString(parameterIndex, entity.getUserId()); + parameterIndex++; + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java index 189d85d..9497140 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java @@ -38,7 +38,7 @@ public class AnalyzerEntity { private Map<String, Object> jobConfig = new HashMap<>(); - private Map<String, Object> jobMeta = new HashMap<>(); + private JobMetaEntity jobMeta; public String getJobDefId() { return jobDefId; @@ -112,11 +112,11 @@ public class AnalyzerEntity { this.jobConfig = jobConfig; } - public Map<String, Object> getJobMeta() { + public JobMetaEntity getJobMeta() { return jobMeta; } - public void setJobMeta(Map<String, Object> jobMeta) { + public void setJobMeta(JobMetaEntity jobMeta) { this.jobMeta = jobMeta; } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java index 2e15c17..8d4af8e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java @@ -20,17 +20,14 @@ package org.apache.eagle.jpm.analyzer.meta.model; import org.apache.eagle.metadata.persistence.PersistenceEntity; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; @JsonIgnoreProperties(ignoreUnknown = true) public class JobMetaEntity extends PersistenceEntity { private String jobDefId; private String siteId; - private Map<String, Object> configuration = new HashMap<>(); - private Set<String> evaluators = new HashSet<>(); + private Map<String, Object> configuration; + private List<String> evaluators; public JobMetaEntity() { @@ -39,7 +36,7 @@ public class JobMetaEntity extends PersistenceEntity { public JobMetaEntity(String jobDefId, String siteId, Map<String, Object> configuration, - Set<String> evaluators) { + List<String> evaluators) { this.jobDefId = jobDefId; this.siteId = siteId; this.configuration = configuration; @@ -75,11 +72,11 @@ public class JobMetaEntity extends PersistenceEntity { this.configuration = configuration; } - public Set<String> getEvaluators() { + public List<String> getEvaluators() { return evaluators; } - public void setEvaluators(Set<String> evaluators) { + public void setEvaluators(List<String> evaluators) { this.evaluators = evaluators; } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java deleted file mode 100644 index bca7ab1..0000000 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.jpm.analyzer.meta.model; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.metadata.persistence.PersistenceEntity; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; - -@JsonIgnoreProperties(ignoreUnknown = true) -public class PublisherEntity extends PersistenceEntity { - private String userId; - private String mailAddress; - - public PublisherEntity(String userId, String mailAddress) { - this.userId = userId; - this.mailAddress = mailAddress; - } - - @Override - public String toString() { - return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress); - } - - public String getUserId() { - return userId; - } - - public void setUserId(String userId) { - this.userId = userId; - } - - public String getMailAddress() { - return mailAddress; - } - - public void setMailAddress(String mailAddress) { - this.mailAddress = mailAddress; - } - - @Override - public int hashCode() { - return new HashCodeBuilder() - .append(userId) - .append(mailAddress) - .build(); - } - - @Override - public boolean equals(Object that) { - if (that == this) { - return true; - } - - if (!(that instanceof PublisherEntity)) { - return false; - } - - PublisherEntity another = (PublisherEntity)that; - - return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java new file mode 100644 index 0000000..cbac4d0 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.model; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.eagle.metadata.persistence.PersistenceEntity; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserEmailEntity extends PersistenceEntity { + private String userId; + private String siteId; + private String mailAddress; + + public UserEmailEntity() { + } + + public UserEmailEntity(String userId, String siteId, String mailAddress) { + this.userId = userId; + this.siteId = siteId; + this.mailAddress = mailAddress; + } + + @Override + public String toString() { + return String.format("UserEmailEntity[userId=%s, siteId=%s, mailAddress=%s]", userId, siteId, mailAddress); + } + + public String getUserId() { + return userId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public String getSiteId() { + return siteId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getMailAddress() { + return mailAddress; + } + + public void setMailAddress(String mailAddress) { + this.mailAddress = mailAddress; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(userId) + .append(siteId) + .append(mailAddress) + .build(); + } + + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + + if (!(that instanceof UserEmailEntity)) { + return false; + } + + UserEmailEntity another = (UserEmailEntity)that; + + return another.userId.equals(this.userId) && another.siteId.equals(this.siteId) && another.mailAddress.equals(this.mailAddress); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java index 57e1765..34365dc 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -27,6 +27,8 @@ import org.apache.eagle.jpm.analyzer.publisher.EagleStorePublisher; import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher; import org.apache.eagle.jpm.analyzer.publisher.Publisher; import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; +import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn private List<Publisher> publishers = new ArrayList<>(); private Config config; + private AlertDeduplicator alertDeduplicator; public MRJobPerformanceAnalyzer(Config config) { this.config = config; @@ -48,7 +51,9 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn evaluators.add(new JobSuggestionEvaluator(config)); publishers.add(new EagleStorePublisher(config)); - //publishers.add(new EmailPublisher(config)); + publishers.add(new EmailPublisher(config)); + + this.alertDeduplicator = new SimpleDeduplicator(); } @Override @@ -62,6 +67,11 @@ public class MRJobPerformanceAnalyzer<T extends AnalyzerEntity> implements JobAn } } + if (alertDeduplicator.dedup(analyzerJobEntity, result)) { + LOG.info("skip publish job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); + return; + } + for (Publisher publisher : publishers) { publisher.publish(analyzerJobEntity, result); } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java index a77e55d..ec7a641 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java @@ -48,20 +48,16 @@ public class SLAJobEvaluator implements Evaluator, Serializable { @Override public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) { - if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) { - return null; - } - Result.EvaluatorResult result = new Result.EvaluatorResult(); - List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId()); - if (jobMetaEntities.size() == 0 - || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) { + List<JobMetaEntity> jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getJobDefId()); + if (jobMetaEntities == null || jobMetaEntities.size() == 0 + || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getSimpleName())) { LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId()); return result; } - analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration()); + analyzerJobEntity.setJobMeta(jobMetaEntities.get(0)); for (Processor processor : processors) { result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity)); http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index f7748f8..88e799d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -47,22 +47,27 @@ public class UnExpectedLongDurationJobProcessor implements Processor, Serializab public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); - Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta(); + Map<String, Object> jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); } Map<Result.ResultLevel, Double> alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD; if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) { - alertThreshold = (Map<Result.ResultLevel, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY); + Map<String, Double> alertThresholds = (Map<String, Double>)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY); + for (String level : alertThresholds.keySet()) { + alertThreshold.put(Result.ResultLevel.fromString(level), alertThresholds.get(level)); + } } List<Map.Entry<Result.ResultLevel, Double>> sorted = Utils.sortByValue(alertThreshold); double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime; for (Map.Entry<Result.ResultLevel, Double> entry : sorted) { if (expirePercent >= entry.getValue()) { - return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds", + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), + String.format("Job duration exceeds average duration(calculated by historical executions of this job) by %d%%, average duration is %ds", (int)(expirePercent * 100), avgDurationTime / 1000)); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java index ea60ff9..e1a357a 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java @@ -63,6 +63,16 @@ public class JobSuggestionEvaluator implements Evaluator<MapReduceAnalyzerEntity return null; } + + if (analyzerEntity.getTotalCounters() == null) { + LOG.warn("Total counters of Job {} is null", analyzerEntity.getJobId()); + return null; + } + if (analyzerEntity.getMapCounters() == null && analyzerEntity.getReduceCounters() == null) { + LOG.warn("Map/Reduce task counters of Job {} are null", analyzerEntity.getJobId()); + return null; + } + MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity); if (jobContext.getNumMaps() == 0) { return null; http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java index a1b57bf..a86eb72 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java @@ -26,8 +26,6 @@ import org.apache.eagle.jpm.mr.historyentity.TaskAttemptExecutionAPIEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; - /* * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20 */ http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java index 8eba468..28e1129 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java @@ -22,9 +22,6 @@ import org.apache.eagle.jpm.analyzer.meta.model.MapReduceAnalyzerEntity; import org.apache.eagle.jpm.analyzer.publisher.Result; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import java.util.ArrayList; -import java.util.List; - public class MapReduceSplitSettingProcessor implements Processor<MapReduceAnalyzerEntity> { private MapReduceJobSuggestionContext context; http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java index 0d7d2d7..1c5a033 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java @@ -37,11 +37,9 @@ public class EagleStorePublisher implements Publisher, Serializable { private Config config; private IEagleServiceClient client; - private AlertDeduplicator alertDeduplicator; public EagleStorePublisher(Config config) { this.config = config; - this.alertDeduplicator = new SimpleDeduplicator(); } @Override @@ -51,10 +49,6 @@ public class EagleStorePublisher implements Publisher, Serializable { } LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId()); - if (alertDeduplicator.dedup(analyzerJobEntity, result)) { - LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); - return; - } try { this.client = new EagleServiceClientImpl(config); http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 842e0ac..471dbf8 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -18,13 +18,16 @@ package org.apache.eagle.jpm.analyzer.publisher; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.app.service.ApplicationEmailService; import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.mail.AlertEmailConstants; import org.apache.eagle.common.mail.AlertEmailContext; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; -import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; -import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.util.Constants; +import org.apache.eagle.jpm.analyzer.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,25 +40,25 @@ public class EmailPublisher implements Publisher, Serializable { private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class); private Config config; - private AlertDeduplicator alertDeduplicator; public EmailPublisher(Config config) { this.config = config; - this.alertDeduplicator = new SimpleDeduplicator(); } @Override + //will refactor, just work now public void publish(AnalyzerEntity analyzerJobEntity, Result result) { - if (result.getAlertMessages().size() == 0) { + if (!config.hasPath(Constants.ANALYZER_REPORT_CONFIG_PATH)) { + LOG.warn("no email configuration, skip send email"); return; } - LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId()); - if (alertDeduplicator.dedup(analyzerJobEntity, result)) { - LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); + if (result.getAlertMessages().size() == 0) { return; } + LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId()); + Map<String, String> basic = new HashMap<>(); basic.put("site", analyzerJobEntity.getSiteId()); basic.put("name", analyzerJobEntity.getJobDefId()); @@ -71,20 +74,29 @@ public class EmailPublisher implements Publisher, Serializable { basic.put("detail", getJobLink(analyzerJobEntity)); Map<String, List<Result.ProcessorResult>> extend = result.getAlertMessages(); + Map<String, Object> alertData = new HashMap<>(); for (String evaluator : extend.keySet()) { for (Result.ProcessorResult message : extend.get(evaluator)) { + setAlertLevel(alertData, message.getResultLevel()); LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator); } } - Map<String, Object> alertData = new HashMap<>(); alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); - - //TODO, override email config in job meta data - ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH); + Config cloneConfig = ConfigFactory.empty().withFallback(config); + if (analyzerJobEntity.getUserId() != null) { + List<UserEmailEntity> users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId()); + if (users != null && users.size() > 0) { + Map<String, String> additionalConfig = new HashMap<>(); + additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.RECIPIENTS, users.get(0).getMailAddress()); + cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig); + } + } + ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH); String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId()); + alertData.put(PublishConstants.ALERT_EMAIL_SUBJECT, subject); AlertEmailContext alertContext = emailService.buildEmailContext(subject); emailService.onAlert(alertContext, alertData); } @@ -99,4 +111,19 @@ public class EmailPublisher implements Publisher, Serializable { + "/jpm/detail/" + analyzerJobEntity.getJobId(); } + + private void setAlertLevel(Map<String, Object> alertData, Result.ResultLevel level) { + if (!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.INFO.toString()); + } + + if (level.equals(Result.ResultLevel.CRITICAL)) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); + } + + if (level.equals(Result.ResultLevel.WARNING) + && !alertData.get(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY).equals(Result.ResultLevel.CRITICAL.toString())) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index 7d7442b..748a5d5 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -17,7 +17,6 @@ package org.apache.eagle.jpm.analyzer.publisher; -import org.apache.commons.lang3.StringUtils; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import java.util.ArrayList; @@ -47,7 +46,7 @@ public class Result { alertMessages.put(typeName, new ArrayList<>()); alertEntities.put(typeName, new ArrayList<>()); } - normalizeResult(processorResult); + //normalizeResult(processorResult); alertMessages.get(typeName).add(processorResult); alertEntities.get(typeName).add(processorEntities.get(processorType)); @@ -63,11 +62,11 @@ public class Result { } private void normalizeResult(ProcessorResult processorResult) { - String settingList = ""; + /*String settingList = ""; if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) { settingList = StringUtils.join(processorResult.getSettings(), "\n"); } - processorResult.setSettingList(settingList); + processorResult.setSettingList(settingList);*/ } /** @@ -77,9 +76,19 @@ public class Result { public enum ResultLevel { NONE, INFO, - NOTICE, WARNING, - CRITICAL + CRITICAL; + + private static final Map<String, ResultLevel> stringToLevels = new HashMap<>(); + static { + for (ResultLevel level : values()) { + stringToLevels.put(level.toString(), level); + } + } + + public static ResultLevel fromString(String levelString) { + return stringToLevels.get(levelString); + } } public enum RuleType { @@ -100,20 +109,20 @@ public class Result { private ResultLevel resultLevel; private String message; private List<String> settings; - private String settingList; + //private String settingList; public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List<String> settings) { this.ruleType = ruleType; this.resultLevel = resultLevel; this.message = message; - this.settings = settings; + //this.settings = settings; } public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) { this.ruleType = ruleType; this.resultLevel = resultLevel; this.message = message; - this.settings = new ArrayList<>(); + //this.settings = new ArrayList<>(); } public RuleType getRuleType() { @@ -148,13 +157,13 @@ public class Result { this.settings = settings; } - public String getSettingList() { + /*public String getSettingList() { return settingList; } public void setSettingList(String settingList) { this.settingList = settingList; - } + }*/ } /** http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index b139b3c..f8155f1 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -34,26 +34,29 @@ import java.util.Map; public class SimpleDeduplicator implements AlertDeduplicator, Serializable { private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class); - private Map<String, Long> lastUpdateTime = new HashMap<>(); + private static Map<String, Long> lastUpdateTime = new HashMap<>(); @Override public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { - long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; - if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) { - dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY); - } + synchronized (lastUpdateTime) { + long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; + if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) { + dedupInterval = (Integer)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); + } - dedupInterval = dedupInterval * 1000; - long currentTimeStamp = System.currentTimeMillis(); - if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { - if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { - return true; + dedupInterval = dedupInterval * 1000; + long currentTimeStamp = System.currentTimeMillis(); + if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { + if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { + return true; + } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); + return false; + } } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); return false; } - } else { - lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); - return false; } } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java index 80d9fb7..a9c3171 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java @@ -20,8 +20,9 @@ package org.apache.eagle.jpm.analyzer.resource; import com.google.inject.Inject; import org.apache.eagle.common.rest.RESTResponse; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.util.Constants; import javax.ws.rs.*; import javax.ws.rs.core.MediaType; @@ -38,16 +39,16 @@ public class AnalyzerResource { } @POST - @Path(META_PATH) + @Path(JOB_META_ROOT_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public RESTResponse<Void> addJobMeta(JobMetaEntity jobMetaEntity) { return RESTResponse.<Void>async((response) -> { jobMetaEntity.ensureDefault(); boolean ret = metaManagementService.addJobMeta(jobMetaEntity); - String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId(); + String message = "Successfully add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId(); if (!ret) { - message = "Failed to add job meta for " + jobMetaEntity.getJobDefId(); + message = "Failed to add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId(); } response.success(ret).message(message); }).get(); @@ -56,13 +57,17 @@ public class AnalyzerResource { @POST @Path(JOB_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) { + public RESTResponse<Void> updateJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId, + JobMetaEntity jobMetaEntity) { return RESTResponse.<Void>async((response) -> { - jobMetaEntity.ensureDefault(); - boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity); - String message = "Successfully update job meta for " + jobDefId; + jobMetaEntity.setModifiedTime(System.currentTimeMillis()); + jobMetaEntity.setSiteId(siteId); + jobMetaEntity.setJobDefId(jobDefId); + boolean ret = metaManagementService.updateJobMeta(jobMetaEntity); + String message = "Successfully update job meta for " + siteId + ":" + jobDefId; if (!ret) { - message = "Failed to update job meta for " + jobDefId; + message = "Failed to update job meta for " + siteId + ":" + jobDefId; } response.success(ret).message(message); }).get(); @@ -71,20 +76,22 @@ public class AnalyzerResource { @GET @Path(JOB_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { - return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get(); + public RESTResponse<List<JobMetaEntity>> getJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId) { + return RESTResponse.async(() -> metaManagementService.getJobMeta(siteId, jobDefId)).get(); } @DELETE @Path(JOB_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { + public RESTResponse<Void> deleteJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId) { return RESTResponse.<Void>async((response) -> { - boolean ret = metaManagementService.deleteJobMeta(jobDefId); - String message = "Successfully delete job meta for " + jobDefId; + boolean ret = metaManagementService.deleteJobMeta(siteId, jobDefId); + String message = "Successfully delete job meta for " + siteId + ": " + jobDefId; if (!ret) { - message = "Failed to delete job meta for " + jobDefId; + message = "Failed to delete job meta for " + siteId + ": " + jobDefId; } response.success(ret).message(message); @@ -92,40 +99,62 @@ public class AnalyzerResource { } @POST - @Path(PUBLISHER_PATH) + @Path(USER_META_ROOT_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse<Void> addEmailPublisherMeta(UserEmailEntity userEmailEntity) { + return RESTResponse.<Void>async((response) -> { + userEmailEntity.ensureDefault(); + boolean ret = metaManagementService.addUserEmailMeta(userEmailEntity); + String message = "Successfully add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + if (!ret) { + message = "Failed to add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + } + response.success(ret).message(message); + }).get(); + } + + @POST + @Path(USER_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> addPublisherMeta(PublisherEntity publisherEntity) { + public RESTResponse<Void> updateEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId, + UserEmailEntity userEmailEntity) { return RESTResponse.<Void>async((response) -> { - publisherEntity.ensureDefault(); - boolean ret = metaManagementService.addPublisherMeta(publisherEntity); - String message = "Successfully add publisher meta for " + publisherEntity.getUserId(); + userEmailEntity.setSiteId(siteId); + userEmailEntity.setUserId(userId); + userEmailEntity.setModifiedTime(System.currentTimeMillis()); + boolean ret = metaManagementService.updateUserEmailMeta(userEmailEntity); + String message = "Successfully update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); if (!ret) { - message = "Failed to add publisher meta for " + publisherEntity.getUserId(); + message = "Failed to update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); } response.success(ret).message(message); }).get(); } @DELETE - @Path(PUBLISHER_META_PATH) + @Path(USER_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<Void> deletePublisherMeta(@PathParam(USER_PATH) String userId) { + public RESTResponse<Void> deleteEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId) { return RESTResponse.<Void>async((response) -> { - boolean ret = metaManagementService.deletePublisherMeta(userId); - String message = "Successfully delete publisher meta for " + userId; + boolean ret = metaManagementService.deleteUserEmailMeta(siteId, userId); + String message = "Successfully delete user meta for " + siteId + ":" + userId; if (!ret) { - message = "Failed to delete publisher meta for " + userId; + message = "Failed to delete user meta for " + siteId + ":" + userId; } response.success(ret).message(message); }).get(); } @GET - @Path(PUBLISHER_META_PATH) + @Path(USER_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse<List<PublisherEntity>> getPublisherMeta(@PathParam(USER_PATH) String userId) { - return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get(); + public RESTResponse<List<UserEmailEntity>> getEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId) { + return RESTResponse.async(() -> metaManagementService.getUserEmailMeta(siteId, userId)).get(); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/d766f681/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java index 4c6661a..4ddc27e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -31,14 +31,16 @@ public class Constants { public static final String CONTEXT_PATH = "service.context"; public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds"; - public static final String META_PATH = "/metadata"; - public static final String ANALYZER_PATH = "/job/analyzer"; - public static final String JOB_DEF_PATH = "jobDefId"; - public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}"; + public static final String ANALYZER_PATH = "/analyzer"; - public static final String PUBLISHER_PATH = "/publisher"; - public static final String USER_PATH = "userId"; - public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}"; + public static final String SITE_ID = "siteId"; + public static final String JOB_META_ROOT_PATH = "/jobmeta"; + public static final String JOB_DEF_ID = "jobDefId"; + public static final String JOB_META_PATH = JOB_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + JOB_DEF_ID + "}"; + + public static final String USER_META_ROOT_PATH = "/usermeta"; + public static final String USER_ID = "userId"; + public static final String USER_META_PATH = USER_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + USER_ID + "}"; public static final String PROCESS_NONE = "PROCESS_NONE"; @@ -48,7 +50,7 @@ public class Constants { public static final String ALERT_THRESHOLD_KEY = "alert.threshold"; public static final Map<Result.ResultLevel, Double> DEFAULT_ALERT_THRESHOLD = new HashMap<Result.ResultLevel, Double>() { { - put(Result.ResultLevel.NOTICE, 0.1); + put(Result.ResultLevel.INFO, 0.1); put(Result.ResultLevel.WARNING, 0.3); put(Result.ResultLevel.CRITICAL, 0.5); } @@ -58,7 +60,7 @@ public class Constants { public static final int DEFAULT_DEDUP_INTERVAL = 300; public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport"; - public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s"; + public static final String ANALYZER_REPORT_SUBJECT = "Performance Insights For %s"; public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic"; public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend";
