This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a174d81 [TE] Update AnomalyFlattenResource (#4505)
a174d81 is described below
commit a174d812011f5d6a6aa7834007809a67e9bd100c
Author: Yungyu Chung <[email protected]>
AuthorDate: Fri Aug 9 11:22:20 2019 -0700
[TE] Update AnomalyFlattenResource (#4505)
* [TE] Update AnomalyFlattenResource
- Update api interface
- Genreate table from metric ids and dimensions assignments
- Update test class
* [TE] Multithread Aggregated Metric Fetching and update comments
- updates comments
- multithread data frame fetching
* [TE] Clean up code and API
- Use List instead of concated string as API input
- Remove unused codes.
* [TE] Retain adding order to the returned map
- Use ListOrderedMap as the map container
* [TE] Clean up code per comments
- Simplify the coding of a if-else statement to one-line code
---
.../dashboard/ThirdEyeDashboardApplication.java | 3 +-
.../resources/AnomalyFlattenResource.java | 203 ++++++++++++++++-----
.../resources/TestAnomalyFlattenResource.java | 73 +++++---
3 files changed, 214 insertions(+), 65 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index e68606f..a08b0d4 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -182,7 +182,8 @@ public class ThirdEyeDashboardApplication
env.jersey().register(new ConfigResource(DAO_REGISTRY.getConfigDAO()));
env.jersey().register(new
CustomizedEventResource(DAO_REGISTRY.getEventDAO()));
env.jersey().register(new TimeSeriesResource());
- env.jersey().register(new
AnomalyFlattenResource(DAO_REGISTRY.getMergedAnomalyResultDAO()));
+ env.jersey().register(new
AnomalyFlattenResource(DAO_REGISTRY.getMergedAnomalyResultDAO(),
+ DAO_REGISTRY.getDatasetConfigDAO(),
DAO_REGISTRY.getMetricConfigDAO()));
env.jersey().register(new UserDashboardResource(
DAO_REGISTRY.getMergedAnomalyResultDAO(),
DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getDatasetConfigDAO(),
DAO_REGISTRY.getDetectionConfigManager(),
DAO_REGISTRY.getDetectionAlertConfigManager()));
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
index ef48a9e..147200a 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
@@ -24,86 +24,205 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.validation.constraints.NotNull;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import org.apache.commons.collections4.map.ListOrderedMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
import org.apache.pinot.thirdeye.api.Constants;
import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
/**
- * Flatten the anomaly results for UI purpose.
- * Convert a list of anomalies to rows of columns so that UI can directly
convert the anomalies to table
+ * Provide a table combining metrics data and anomaly feedback for UI
representation
*/
@Path("thirdeye/table")
@Api(tags = {Constants.ANOMALY_TAG})
public class AnomalyFlattenResource {
- private MergedAnomalyResultManager mergedAnomalyResultDAO;
- public static final String ANOMALY_ID = "anomalyId";
+ private final MergedAnomalyResultManager mergedAnomalyResultDAO;
+ private final MetricConfigManager metricConfigDAO;
+ private final DatasetConfigManager datasetConfgDAO;
+
+ private final ExecutorService executor;
+
public static final String ANOMALY_COMMENT = "comment";
+ private static final String DATAFRAME_VALUE = "value";
+ private static final String DEFAULT_COMMENT_FORMAT = "#%d is %s";
+ private static final int DEFAULT_THREAD_POOLS_SIZE = 5;
+ private static final long DEFAULT_TIME_OUT_IN_MINUTES = 1;
- public AnomalyFlattenResource(MergedAnomalyResultManager
mergedAnomalyResultDAO) {
+ public AnomalyFlattenResource(MergedAnomalyResultManager
mergedAnomalyResultDAO, DatasetConfigManager datasetConfgDAO,
+ MetricConfigManager metricConfigDAO) {
this.mergedAnomalyResultDAO = mergedAnomalyResultDAO;
+ this.datasetConfgDAO = datasetConfgDAO;
+ this.metricConfigDAO = metricConfigDAO;
+ this.executor = Executors.newFixedThreadPool(DEFAULT_THREAD_POOLS_SIZE);
}
+
/**
- * Returns a list of formatted merged anomalies for UI to render a table
- * @param detectionConfigId detectionConfigId
+ * Returns a list of formatted metric values and anomaly comments for UI to
generate a table
+ * @param metricIds a list of metric ids
* @param start start time in epoc milliseconds
* @param end end time in epoc milliseconds
- * @param dimensionKeys a list of keys in dimensions; if null, will return
all dimension keys for the anomaly
- * @return a list of formatted anomalies
+ * @param dimensionKeys a list of keys in dimensions
+ * @return a list of formatted metric info and anomaly comments
*/
@GET
- @ApiOperation(value = "Returns a list of formatted merged anomalies ")
- public List<Map<String, Object>> flatAnomalyResults(
- @ApiParam("detection config id") @QueryParam("detectionConfigId") long
detectionConfigId,
+ @ApiOperation(value = "View a collection of metrics and anonalies feedback
in a list of maps")
+ @Produces("Application/json")
+ public List<Map<String, Object>> listDimensionValues(
+ @ApiParam("metric config id") @NotNull @QueryParam("metricIds")
List<Long> metricIds,
@ApiParam("start time for anomalies") @QueryParam("start") long start,
@ApiParam("end time for anomalies") @QueryParam("end") long end,
- @ApiParam("dimension keys") @QueryParam("dimensionKeys") List<String>
dimensionKeys) {
- // Retrieve anomalies
- List<MergedAnomalyResultDTO> anomalies = mergedAnomalyResultDAO.
- findByStartTimeInRangeAndDetectionConfigId(start, end,
detectionConfigId);
+ @ApiParam("dimension keys") @NotNull @QueryParam("dimensionKeys")
List<String> dimensionKeys) throws Exception {
+ Preconditions.checkArgument(!metricIds.isEmpty());
+ List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+ List<MetricConfigDTO> metrics = new ArrayList<>();
+ for (long id : metricIds) {
+ metrics.add(metricConfigDAO.findById(id));
+
anomalies.addAll(mergedAnomalyResultDAO.findAnomaliesByMetricIdAndTimeRange(id,
start, end));
+ }
- // flatten anomaly result information
- List<Map<String, Object>> resultList = new ArrayList<>();
- for (MergedAnomalyResultDTO result : anomalies) {
- resultList.add(flatAnomalyResult(result, dimensionKeys));
+ Map<String, DataFrame> metricDataFrame = new HashMap<>();
+
+ AggregationLoader aggregationLoader =
+ new DefaultAggregationLoader(metricConfigDAO, datasetConfgDAO,
+ ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+ ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+ Map<String, Future<DataFrame>> futureMap = new HashMap<String,
Future<DataFrame>>() {{
+ for (MetricConfigDTO metricDTO : metrics) {
+ Future<DataFrame> future = fetchAggregatedMetric(aggregationLoader,
metricDTO.getId(), start, end,
+ dimensionKeys);
+ put(metricDTO.getName(), future);
+ }
+ }};
+ for (String metric : futureMap.keySet()) {
+ Future<DataFrame> future = futureMap.get(metric);
+ metricDataFrame.put(metric, future.get(DEFAULT_TIME_OUT_IN_MINUTES,
TimeUnit.MINUTES));
}
- return resultList;
+
+ return reformatDataFrameAndAnomalies(metricDataFrame, anomalies,
dimensionKeys);
+ }
+
+ /**
+ * Return a Future thread with aggregated metric value as return
+ * @param aggregationLoader an aggregation loader
+ * @param metricId the metric id
+ * @param start the start time in epoch time
+ * @param end the end time in epoch time
+ * @param dimensionKeys the list of dimension keys
+ * @return a Future thread with aggregated metric value
+ */
+ private Future<DataFrame> fetchAggregatedMetric(AggregationLoader
aggregationLoader, long metricId, long start,
+ long end, List<String> dimensionKeys) {
+ return executor.submit(() -> {
+ MetricSlice metricSlice = MetricSlice.from(metricId, start, end);
+ DataFrame resultDataFrame = null;
+ try {
+ resultDataFrame = aggregationLoader.loadAggregate(metricSlice,
dimensionKeys, -1);
+ } catch (Exception e) {
+ throw new ExecutionException(e);
+ }
+ return resultDataFrame;
+ });
}
/**
- * Flat an anomaly to a flat map structure
- * @param anomalyResult an instance of MergedAnomalyResultDTO
- * @param tableKeys a list of keys in dimensions; if null, use all keys in
dimension
- * @return a map of information in the anomaly result with the required keys
+ * Reformat the rows in data frame and anomaly information into a list of map
+ * @param metricDataFrame a map from metric name to dataframe with
aggregated time series
+ * @param anomalies a list of anomalies within the same time interval as
dataframe
+ * @param dimensions a list of dimensions
+ * @return a list of maps with dataframe and anomaly information
*/
- public static Map<String, Object> flatAnomalyResult(MergedAnomalyResultDTO
anomalyResult,
- List<String> tableKeys) {
- Preconditions.checkNotNull(anomalyResult);
- Map<String, Object> flatMap = new HashMap<>();
- flatMap.put(ANOMALY_ID, anomalyResult.getId());
- DimensionMap dimension = anomalyResult.getDimensions();
- if (tableKeys == null) {
- tableKeys = new ArrayList<>(dimension.keySet());
+ public static List<Map<String, Object>>
reformatDataFrameAndAnomalies(Map<String, DataFrame> metricDataFrame,
+ List<MergedAnomalyResultDTO> anomalies, List<String> dimensions) {
+ List<Map<String, Object>> resultList = new ArrayList<>();
+ Map<DimensionMap, Map<Long, String>> anomalyCommentMap =
extractComments(anomalies);
+ List<String> metrics = new ArrayList<>(metricDataFrame.keySet());
+ Map<DimensionMap, Map<String, Double>> metricValues = new HashMap<>();
+ for (String metric : metrics) {
+ DataFrame dataFrame = metricDataFrame.get(metric);
+ for (int i = 0; i < dataFrame.size(); i++) {
+ DimensionMap dimensionMap = new DimensionMap();
+ for (String dimensionKey : dimensions) {
+ String dimensionValue = dataFrame.getString(dimensionKey, i);
+ dimensionMap.put(dimensionKey, dimensionValue);
+ }
+ if (!metricValues.containsKey(dimensionMap)) {
+ metricValues.put(dimensionMap, new HashMap<>());
+ }
+ metricValues.get(dimensionMap).put(metric,
dataFrame.getDouble(DATAFRAME_VALUE, i));
+ }
}
- for (String key : tableKeys) {
- flatMap.put(key, dimension.getOrDefault(key, ""));
+
+ for (DimensionMap dimensionMap : metricValues.keySet()) {
+ Map<String, Object> resultMap = new ListOrderedMap<>();
+ for (String key : dimensionMap.keySet()) {
+ resultMap.put(key, dimensionMap.get(key));
+ }
+ for (String metric : metrics) {
+ resultMap.put(metric, metricValues.getOrDefault(dimensionMap,
Collections.emptyMap()).getOrDefault(metric, Double.NaN));
+ }
+ Map<Long, String> comment = Collections.emptyMap();
+ if (anomalyCommentMap.containsKey(dimensionMap)) {
+ comment = anomalyCommentMap.get(dimensionMap);
+ }
+ resultMap.put(ANOMALY_COMMENT, comment);
+
+ resultList.add(resultMap);
}
- AnomalyFeedback feedback = anomalyResult.getFeedback();
- if (feedback != null) {
- flatMap.put(ANOMALY_COMMENT, feedback.getComment());
- } else {
- flatMap.put(ANOMALY_COMMENT, "");
+
+ return resultList;
+ }
+
+ /**
+ * Extract comments from anomalies and format it as a map
+ * @param anomalies a list of merged anomaly results
+ * @return a map from dimension map to a map of anomaly id to its comment
+ */
+ public static Map<DimensionMap, Map<Long, String>> extractComments
(List<MergedAnomalyResultDTO> anomalies) {
+ Map<DimensionMap, Map<Long, String>> resultMap = new HashMap<>();
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ DimensionMap dimensions = anomaly.getDimensions();
+ if (!resultMap.containsKey(dimensions)) {
+ resultMap.put(dimensions, new HashMap<>());
+ }
+ long anomalyId = anomaly.getId();
+ String comment = String.format(DEFAULT_COMMENT_FORMAT, anomalyId, "Not
Reviewed Yet");
+ AnomalyFeedback feedback = anomaly.getFeedback();
+ if (feedback != null) {
+ if (StringUtils.isNoneBlank(feedback.getComment())) {
+ comment = feedback.getComment();
+ } else {
+ comment = String.format(DEFAULT_COMMENT_FORMAT, anomalyId,
feedback.getFeedbackType().toString());
+ }
+ }
+ resultMap.get(dimensions).put(anomalyId, comment);
}
- flatMap.put(anomalyResult.getMetric(), anomalyResult.getAvgCurrentVal());
- return flatMap;
+ return resultMap;
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
index f501248..658e601 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
@@ -23,14 +23,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
import org.apache.pinot.thirdeye.dashboard.resources.AnomalyFlattenResource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.bao.TestMergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -63,30 +64,40 @@ public class TestAnomalyFlattenResource {
}
@Test
- public void testFlatAnomalyResults() {
- AnomalyFlattenResource resource = new
AnomalyFlattenResource(this.mergedAnomalyResultDAO);
- List<Map<String, Object>> actualResults =
resource.flatAnomalyResults(this.detectionConfigId, new DateTime(2019, 1, 1, 0,
0).getMillis(), new DateTime(2019, 1, 3, 0, 0).getMillis(), null);
+ public void testReformatDataFrameAndAnomalies() {
+ Map<String, DataFrame> metricDataFrame = new HashMap<String, DataFrame>()
{{
+ put("metric", mockDataFrame());
+ }};
+
+ List<MergedAnomalyResultDTO> anomalies =
TestMergedAnomalyResultManager.mockAnomalies(detectionConfigId);
+ for (int i = 0; i < anomalies.size(); i++) {
+ anomalies.get(i).setId((long) i + 1);
+ }
+ List<Map<String, Object>> actualResults =
AnomalyFlattenResource.reformatDataFrameAndAnomalies(metricDataFrame,
+ anomalies, Arrays.asList("what", "where", "when", "how"));
List<Map<String, Object>> expectedResults = Arrays.asList(
new HashMap<String, Object>() {
{
- put(ANOMALY_ID, 2L);
put("what", "a");
put("where", "b");
put("when", "c");
put("how", "d");
- put("metric", 0d);
- put(ANOMALY_COMMENT, "");
+ put("metric", 0.1d);
+ put(ANOMALY_COMMENT, new HashMap<Long, String>(){{
+ put(1l, "#1 is Not Reviewed Yet");
+ }});
}
},
new HashMap<String, Object>() {
{
- put(ANOMALY_ID, 3L);
put("what", "e");
put("where", "f");
put("when", "g");
put("how", "h");
- put("metric", 0d);
- put(ANOMALY_COMMENT, "");
+ put("metric", 0.2d);
+ put(ANOMALY_COMMENT, new HashMap<Long, String>(){{
+ put(2l, "#2 is Not Reviewed Yet");
+ }});
}
}
);
@@ -94,22 +105,40 @@ public class TestAnomalyFlattenResource {
}
@Test
- public void testFlatAnomalyResult() {
+ public void testExtractComments() {
List<MergedAnomalyResultDTO> anomalies =
TestMergedAnomalyResultManager.mockAnomalies(detectionConfigId);
- MergedAnomalyResultDTO anomaly = anomalies.get(0);
- anomaly.setId(1L);
- Map<String, Object> actualMap =
AnomalyFlattenResource.flatAnomalyResult(anomaly, null);
- Map<String, Object> expectMap = new HashMap<String, Object>() {
+ for (int i = 0; i < anomalies.size(); i++) {
+ anomalies.get(i).setId((long) i + 1);
+ }
+ Map<DimensionMap, Map<Long, String>> actualMap =
AnomalyFlattenResource.extractComments(anomalies);
+ Map<DimensionMap, Map<Long, String>> expectMap = new HashMap<DimensionMap,
Map<Long, String>>() {
{
- put(ANOMALY_ID, anomaly.getId());
- put("what", "a");
- put("where", "b");
- put("when", "c");
- put("how", "d");
- put("metric", 0d);
- put(ANOMALY_COMMENT, "");
+ DimensionMap dm = new DimensionMap();
+ dm.put("what", "a");
+ dm.put("where", "b");
+ dm.put("when", "c");
+ dm.put("how", "d");
+ put(dm, new HashMap<Long, String>(){
+ {
+ put(1l, "#1 is Not Reviewed Yet");
+ }});
+
+ dm = new DimensionMap();
+ dm.put("what", "e");
+ dm.put("where", "f");
+ dm.put("when", "g");
+ dm.put("how", "h");
+ put(dm, new HashMap<Long, String>(){
+ {
+ put(2l, "#2 is Not Reviewed Yet");
+ }});
}
};
Assert.assertEquals(actualMap, expectMap);
}
+
+ private DataFrame mockDataFrame() {
+ return DataFrame.builder("what", "where", "when", "how", "value")
+ .append("a", "b", "c", "d", 0.1d).append("e", "f", "g", "h",
0.2d).build();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]