This is an automated email from the ASF dual-hosted git repository. akshayrai09 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 0675aee [TE] Fix bugs in user dashboard endpoint (#3853) 0675aee is described below commit 0675aee34f00d7684579c3b18a3e4dd6bff7acc1 Author: Akshay Rai <akshayra...@gmail.com> AuthorDate: Tue Feb 19 16:18:32 2019 -0800 [TE] Fix bugs in user dashboard endpoint (#3853) --- .../api/user/dashboard/UserDashboardResource.java | 221 ++++++++++++--------- .../detection/DetectionMigrationResource.java | 2 + .../resource/v2/UserDashboardResourceTest.java | 13 +- 3 files changed, 134 insertions(+), 102 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java index 1e889b0..bbf2e9e 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java @@ -42,7 +42,6 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; import org.apache.commons.lang.StringUtils; import org.apache.pinot.thirdeye.api.Constants; import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType; @@ -65,11 +64,14 @@ import org.apache.pinot.thirdeye.datalayer.util.Predicate; import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry; import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader; import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader; +import org.apache.pinot.thirdeye.detection.ConfigUtils; import org.apache.pinot.thirdeye.detection.CurrentAndBaselineLoader; import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator.*; + /** * Endpoints for user-customized dashboards (currently alerts only) @@ -167,8 +169,6 @@ public class UserDashboardResource { @QueryParam("fetchTrueAnomaly") @DefaultValue("false") boolean fetchTrueAnomaly, @ApiParam(value = "max number of results") @QueryParam("limit") Integer limit) throws Exception { - Map<String, String> responseMessage = new HashMap<>(); - List<Predicate> predicates = new ArrayList<>(); LOG.info("[USER DASHBOARD] Fetching anomalies with filters. Start: " + start + " end: " + end + " metric: " + metric + " dataset: " + dataset + " owner: " + owner + " application: " + application + " group: " + group + " fetchTrueAnomaly: " + fetchTrueAnomaly + " limit: " + limit); @@ -178,98 +178,22 @@ public class UserDashboardResource { LOG.warn("No upper limit specified while fetching anomalies. Defaulting to " + ANOMALIES_LIMIT_DEFAULT); limit = ANOMALIES_LIMIT_DEFAULT; } - - // Filter by metric and dataset - if (metric != null) { - predicates.add(Predicate.EQ("metric", metric)); - } - if (dataset != null) { - predicates.add(Predicate.EQ("collection", dataset)); - } - - // anomaly window start and end Preconditions.checkNotNull(start, "Please specify the start time of the anomaly retrieval window"); - predicates.add(Predicate.GE("endTime", start)); - if (end != null) { - predicates.add(Predicate.LT("startTime", end)); - } // TODO support index select on user-reported anomalies // predicates.add(Predicate.OR( // Predicate.EQ("notified", true), // Predicate.EQ("anomalyResultSource", AnomalyResultSource.USER_LABELED_ANOMALY))); - // application (indirect) - Set<Long> applicationFunctionIds = new HashSet<>(); - if (StringUtils.isNotBlank(application)) { - List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application); - for (AnomalyFunctionDTO function : functions) { - if (function.getIsActive()) { - applicationFunctionIds.add(function.getId()); - } - } - } - - // TODO: deprecate after migration - // Support for partially migrated alerts. - List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application)); - for (DetectionAlertConfigDTO notification : notifications) { - applicationFunctionIds.addAll(notification.getVectorClocks().keySet()); - } - - // group (indirect) - Set<Long> groupFunctionIds = new HashSet<>(); - if (StringUtils.isNotBlank(group)) { - AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group); - if (alert != null) { - groupFunctionIds.addAll(alert.getEmailConfig().getFunctionIds()); - } - } - - // owner (indirect) - Set<Long> ownerFunctionIds = new HashSet<>(); - if (StringUtils.isNotBlank(owner)) { - // TODO: replace database scan with targeted select - List<AnomalyFunctionDTO> functions = this.functionDAO.findAll(); - for (AnomalyFunctionDTO function : functions) { - if (Objects.equals(function.getCreatedBy(), owner)) { - ownerFunctionIds.add(function.getId()); - } - } - } - - // anomaly function ids - List<Predicate> oldPredicates = new ArrayList<>(predicates); - if (StringUtils.isNotBlank(application) || StringUtils.isNotBlank(group) || StringUtils.isNotBlank(owner)) { - Set<Long> functionIds = new HashSet<>(); - functionIds.addAll(applicationFunctionIds); - functionIds.addAll(groupFunctionIds); - functionIds.addAll(ownerFunctionIds); - - oldPredicates.add(Predicate.IN("functionId", functionIds.toArray())); - } - - // fetch legacy anomalies via predicatesprincipal - List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(oldPredicates.toArray(new Predicate[oldPredicates.size()]))); - // filter (un-notified && non-user-reported) anomalies - // TODO remove once index select on user-reported anomalies available - Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator(); - while (itAnomaly.hasNext()) { - MergedAnomalyResultDTO anomaly = itAnomaly.next(); - if (!anomaly.isNotified() && - !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) { - itAnomaly.remove(); - } - } - - // fetch new detection framework anomalies by group - anomalies.addAll(fetchFrameworkAnomaliesByGroup(start, end, group)); - - // fetch new detection framework anomalies by application - anomalies.addAll(fetchFrameworkAnomaliesByApplication(start, end, application)); - - // fetch new detection framework anomalies by metric and/or dataset - anomalies.addAll(fetchFrameworkAnomaliesByMetricDataset(predicates)); + // TODO: Prefer to have intersection of anomalies rather than union + List<MergedAnomalyResultDTO> anomalies = new ArrayList<>(); + anomalies.addAll(fetchLegacyAnomaliesByFunctionId(start, end, group, application, owner)); + // Fetch anomalies by group + anomalies.addAll(fetchAnomaliesBySubsGroup(start, end, group)); + // Fetch anomalies by application + anomalies.addAll(fetchAnomaliesByApplication(start, end, application)); + // Fetch anomalies by metric and/or dataset + anomalies.addAll(fetchAnomaliesByMetricDataset(start, end, metric, dataset)); // sort descending by start time Collections.sort(anomalies, new Comparator<MergedAnomalyResultDTO>() { @@ -370,13 +294,118 @@ public class UserDashboardResource { return output; } - private Collection<? extends MergedAnomalyResultDTO> fetchFrameworkAnomaliesByMetricDataset( - List<Predicate> predicates) { - predicates.add(Predicate.NEQ("detectionConfigId", 0)); + @Deprecated + private Collection<MergedAnomalyResultDTO> fetchLegacyAnomaliesByFunctionId(Long start, Long end, String group, String application, String owner) { + // Find functionIds which belong to application, subscription group and owner. + List<Predicate> predicates = new ArrayList<>(); + Set<Long> functionIds = new HashSet<>(); + + // application (indirect) + if (StringUtils.isNotBlank(application)) { + List<AnomalyFunctionDTO> functions = this.functionDAO.findAllByApplication(application); + for (AnomalyFunctionDTO function : functions) { + if (function.getIsActive()) { + functionIds.add(function.getId()); + } + } + } + // Support for partially migrated alerts. + List<DetectionAlertConfigDTO> notifications = detectionAlertDAO.findByPredicate(Predicate.EQ("application", application)); + for (DetectionAlertConfigDTO notification : notifications) { + for (long id : ConfigUtils.getLongs(notification.getProperties().get(PROP_DETECTION_CONFIG_IDS))) { + AnomalyFunctionDTO function = this.functionDAO.findById(id); + if (function != null && function.getIsActive()) { + functionIds.add(id); + } + } + } + + // group (indirect) + Set<Long> groupFunctionIds = new HashSet<>(); + if (StringUtils.isNotBlank(group)) { + AlertConfigDTO alert = this.alertDAO.findWhereNameEquals(group); + if (alert != null) { + for (long id : alert.getEmailConfig().getFunctionIds()) { + AnomalyFunctionDTO function = this.functionDAO.findById(id); + if (function != null && function.getIsActive()) { + groupFunctionIds.add(id); + } + } + } + } + if (!groupFunctionIds.isEmpty()) { + if (functionIds.isEmpty()) { + functionIds = groupFunctionIds; + } else { + functionIds.retainAll(groupFunctionIds); + } + } + + // owner (indirect) + Set<Long> ownerFunctionIds = new HashSet<>(); + if (StringUtils.isNotBlank(owner)) { + // TODO: replace database scan with targeted select + List<AnomalyFunctionDTO> functions = this.functionDAO.findAll(); + for (AnomalyFunctionDTO function : functions) { + if (function.getIsActive() && Objects.equals(function.getCreatedBy(), owner)) { + ownerFunctionIds.add(function.getId()); + } + } + } + if (!ownerFunctionIds.isEmpty()) { + if (functionIds.isEmpty()) { + functionIds = ownerFunctionIds; + } else { + functionIds.retainAll(ownerFunctionIds); + } + } + + // Predicate on start time end time and function Id. + predicates.add(Predicate.IN("functionId", functionIds.toArray())); + predicates.add(Predicate.GE("endTime", start)); + if (end != null) { + predicates.add(Predicate.LT("startTime", end)); + } + + // Fetch legacy anomalies via predicates + List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()]))); + // filter (un-notified && non-user-reported) anomalies + // TODO remove once index select on user-reported anomalies available + Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator(); + while (itAnomaly.hasNext()) { + MergedAnomalyResultDTO anomaly = itAnomaly.next(); + if (!anomaly.isNotified() && + !AnomalyResultSource.USER_LABELED_ANOMALY.equals(anomaly.getAnomalyResultSource())) { + itAnomaly.remove(); + } + } + + return anomalies; + } + + private Collection<MergedAnomalyResultDTO> fetchAnomaliesByMetricDataset(Long start, Long end, String metric, String dataset) { + if (StringUtils.isBlank(metric) && StringUtils.isBlank(dataset)) { + return Collections.emptyList(); + } + + List<Predicate> predicates = new ArrayList<>(); + predicates.add(Predicate.GE("endTime", start)); + if (end != null) { + predicates.add(Predicate.LT("startTime", end)); + } + + // Filter by metric and dataset + if (metric != null) { + predicates.add(Predicate.EQ("metric", metric)); + } + if (dataset != null) { + predicates.add(Predicate.EQ("collection", dataset)); + } + return this.anomalyDAO.findByPredicate(Predicate.AND(predicates.toArray(new Predicate[predicates.size()]))); } - private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByApplication(Long start, Long end, String application) throws Exception { + private Collection<MergedAnomalyResultDTO> fetchAnomaliesByApplication(Long start, Long end, String application) throws Exception { if (StringUtils.isBlank(application)) { return Collections.emptyList(); } @@ -389,10 +418,10 @@ public class UserDashboardResource { detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet()); } - return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds); + return fetchAnomaliesByConfigIds(start, end, detectionConfigIds); } - private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByGroup(Long start, Long end, String group) throws Exception { + private Collection<MergedAnomalyResultDTO> fetchAnomaliesBySubsGroup(Long start, Long end, String group) throws Exception { if (StringUtils.isBlank(group)) { return Collections.emptyList(); } @@ -405,10 +434,10 @@ public class UserDashboardResource { detectionConfigIds.addAll(alertConfigDTO.getVectorClocks().keySet()); } - return fetchFrameworkAnomaliesByConfigIds(start, end, detectionConfigIds); + return fetchAnomaliesByConfigIds(start, end, detectionConfigIds); } - private Collection<MergedAnomalyResultDTO> fetchFrameworkAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception { + private Collection<MergedAnomalyResultDTO> fetchAnomaliesByConfigIds(Long start, Long end, Set<Long> detectionConfigIds) throws Exception { if (detectionConfigIds.isEmpty()) { return Collections.emptyList(); } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java index d7d728a..2b9269f 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java @@ -617,6 +617,8 @@ public class DetectionMigrationResource { } @POST + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) @ApiOperation("migrate all applications") @Path("/applications") public Response migrateApplication() { diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java index dc26f0c..3f97f7c 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/dashboard/resource/v2/UserDashboardResourceTest.java @@ -75,6 +75,7 @@ public class UserDashboardResourceTest { this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(100, 500, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(800, 1200, this.functionIds.get(0), "test_metric", "test_dataset"))); // func A this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1500, this.functionIds.get(1), "test_metric", "test_dataset"))); // func B + this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric", "test_dataset"))); // func C this.anomalyIds.add(this.anomalyDAO.save(makeAnomaly(300, 1600, this.functionIds.get(2), "test_metric_2", "test_dataset"))); // func C for (Long id : this.anomalyIds) { @@ -124,8 +125,8 @@ public class UserDashboardResourceTest { @Test public void testAnomaliesByGroup() throws Exception { List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, "myAlertB", null, null, false, null); - Assert.assertEquals(anomalies.size(), 1); - Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3))); + Assert.assertEquals(anomalies.size(), 2); + Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(3), this.anomalyIds.get(4))); } @Test @@ -144,15 +145,15 @@ public class UserDashboardResourceTest { @Test public void testAnomaliesByMetric() throws Exception { List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, "test_metric", "test_dataset", false, null); - Assert.assertEquals(anomalies.size(), 2); - Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2))); + Assert.assertEquals(anomalies.size(), 3); + Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3))); } @Test public void testAnomaliesByDataset() throws Exception { List<AnomalySummary> anomalies = this.resource.queryAnomalies(1000L, null, null, null, null, null, "test_dataset", false, null); - Assert.assertEquals(anomalies.size(), 3); - Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3))); + Assert.assertEquals(anomalies.size(), 4); + Assert.assertEquals(extractIds(anomalies), makeSet(this.anomalyIds.get(1), this.anomalyIds.get(2), this.anomalyIds.get(3), this.anomalyIds.get(4))); } private MergedAnomalyResultDTO makeAnomaly(long start, long end, Long functionId, String metric, String dataset) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org