This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a174d81  [TE] Update AnomalyFlattenResource (#4505)
a174d81 is described below

commit a174d812011f5d6a6aa7834007809a67e9bd100c
Author: Yungyu Chung <[email protected]>
AuthorDate: Fri Aug 9 11:22:20 2019 -0700

    [TE] Update AnomalyFlattenResource (#4505)
    
    * [TE] Update AnomalyFlattenResource
    
    - Update api interface
    - Genreate table from metric ids and dimensions assignments
    - Update test class
    
    * [TE] Multithread Aggregated Metric Fetching and update comments
    
    - updates comments
    - multithread data frame fetching
    
    * [TE] Clean up code and API
    
    - Use List instead of concated string as API input
    - Remove unused codes.
    
    * [TE] Retain adding order to the returned map
    
    - Use ListOrderedMap as the map container
    
    * [TE] Clean up code per comments
    
    - Simplify the coding of a if-else statement to one-line code
---
 .../dashboard/ThirdEyeDashboardApplication.java    |   3 +-
 .../resources/AnomalyFlattenResource.java          | 203 ++++++++++++++++-----
 .../resources/TestAnomalyFlattenResource.java      |  73 +++++---
 3 files changed, 214 insertions(+), 65 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
index e68606f..a08b0d4 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/ThirdEyeDashboardApplication.java
@@ -182,7 +182,8 @@ public class ThirdEyeDashboardApplication
     env.jersey().register(new ConfigResource(DAO_REGISTRY.getConfigDAO()));
     env.jersey().register(new 
CustomizedEventResource(DAO_REGISTRY.getEventDAO()));
     env.jersey().register(new TimeSeriesResource());
-    env.jersey().register(new 
AnomalyFlattenResource(DAO_REGISTRY.getMergedAnomalyResultDAO()));
+    env.jersey().register(new 
AnomalyFlattenResource(DAO_REGISTRY.getMergedAnomalyResultDAO(),
+        DAO_REGISTRY.getDatasetConfigDAO(), 
DAO_REGISTRY.getMetricConfigDAO()));
     env.jersey().register(new UserDashboardResource(
         DAO_REGISTRY.getMergedAnomalyResultDAO(), 
DAO_REGISTRY.getMetricConfigDAO(), DAO_REGISTRY.getDatasetConfigDAO(),
         DAO_REGISTRY.getDetectionConfigManager(), 
DAO_REGISTRY.getDetectionAlertConfigManager()));
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
index ef48a9e..147200a 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/AnomalyFlattenResource.java
@@ -24,86 +24,205 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.validation.constraints.NotNull;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import org.apache.commons.collections4.map.ListOrderedMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
 import org.apache.pinot.thirdeye.api.Constants;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
 
 
 /**
- * Flatten the anomaly results for UI purpose.
- * Convert a list of anomalies to rows of columns so that UI can directly 
convert the anomalies to table
+ * Provide a table combining metrics data and anomaly feedback for UI 
representation
  */
 @Path("thirdeye/table")
 @Api(tags = {Constants.ANOMALY_TAG})
 public class AnomalyFlattenResource {
-  private MergedAnomalyResultManager mergedAnomalyResultDAO;
-  public static final String ANOMALY_ID = "anomalyId";
+  private final MergedAnomalyResultManager mergedAnomalyResultDAO;
+  private final MetricConfigManager metricConfigDAO;
+  private final DatasetConfigManager datasetConfgDAO;
+
+  private final ExecutorService executor;
+
   public static final String ANOMALY_COMMENT = "comment";
+  private static final String DATAFRAME_VALUE = "value";
+  private static final String DEFAULT_COMMENT_FORMAT = "#%d is %s";
+  private static final int DEFAULT_THREAD_POOLS_SIZE = 5;
+  private static final long DEFAULT_TIME_OUT_IN_MINUTES = 1;
 
-  public AnomalyFlattenResource(MergedAnomalyResultManager 
mergedAnomalyResultDAO) {
+  public AnomalyFlattenResource(MergedAnomalyResultManager 
mergedAnomalyResultDAO, DatasetConfigManager datasetConfgDAO,
+      MetricConfigManager metricConfigDAO) {
     this.mergedAnomalyResultDAO = mergedAnomalyResultDAO;
+    this.datasetConfgDAO = datasetConfgDAO;
+    this.metricConfigDAO = metricConfigDAO;
+    this.executor = Executors.newFixedThreadPool(DEFAULT_THREAD_POOLS_SIZE);
   }
 
+
   /**
-   * Returns a list of formatted merged anomalies for UI to render a table
-   * @param detectionConfigId detectionConfigId
+   * Returns a list of formatted metric values and anomaly comments for UI to 
generate a table
+   * @param metricIds a list of metric ids
    * @param start start time in epoc milliseconds
    * @param end end time in epoc milliseconds
-   * @param dimensionKeys a list of keys in dimensions; if null, will return 
all  dimension keys for the anomaly
-   * @return a list of formatted anomalies
+   * @param dimensionKeys a list of keys in dimensions
+   * @return a list of formatted metric info and anomaly comments
    */
   @GET
-  @ApiOperation(value = "Returns a list of formatted merged anomalies ")
-  public List<Map<String, Object>> flatAnomalyResults(
-      @ApiParam("detection config id") @QueryParam("detectionConfigId") long 
detectionConfigId,
+  @ApiOperation(value = "View a collection of metrics and anonalies feedback 
in a list of maps")
+  @Produces("Application/json")
+  public List<Map<String, Object>> listDimensionValues(
+      @ApiParam("metric config id") @NotNull @QueryParam("metricIds") 
List<Long> metricIds,
       @ApiParam("start time for anomalies") @QueryParam("start") long start,
       @ApiParam("end time for anomalies") @QueryParam("end") long end,
-      @ApiParam("dimension keys") @QueryParam("dimensionKeys") List<String> 
dimensionKeys) {
-    // Retrieve anomalies
-    List<MergedAnomalyResultDTO> anomalies = mergedAnomalyResultDAO.
-        findByStartTimeInRangeAndDetectionConfigId(start, end, 
detectionConfigId);
+      @ApiParam("dimension keys") @NotNull @QueryParam("dimensionKeys") 
List<String> dimensionKeys) throws Exception {
+    Preconditions.checkArgument(!metricIds.isEmpty());
+    List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
+    List<MetricConfigDTO> metrics = new ArrayList<>();
+    for (long id : metricIds) {
+      metrics.add(metricConfigDAO.findById(id));
+      
anomalies.addAll(mergedAnomalyResultDAO.findAnomaliesByMetricIdAndTimeRange(id, 
start, end));
+    }
 
-    // flatten anomaly result information
-    List<Map<String, Object>> resultList = new ArrayList<>();
-    for (MergedAnomalyResultDTO result : anomalies) {
-      resultList.add(flatAnomalyResult(result, dimensionKeys));
+    Map<String, DataFrame> metricDataFrame = new HashMap<>();
+
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricConfigDAO, datasetConfgDAO,
+            ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    Map<String, Future<DataFrame>> futureMap = new HashMap<String, 
Future<DataFrame>>() {{
+      for (MetricConfigDTO metricDTO : metrics) {
+        Future<DataFrame> future = fetchAggregatedMetric(aggregationLoader, 
metricDTO.getId(), start, end,
+            dimensionKeys);
+        put(metricDTO.getName(), future);
+      }
+    }};
+    for (String metric : futureMap.keySet()) {
+      Future<DataFrame> future = futureMap.get(metric);
+      metricDataFrame.put(metric, future.get(DEFAULT_TIME_OUT_IN_MINUTES, 
TimeUnit.MINUTES));
     }
-    return resultList;
+
+    return reformatDataFrameAndAnomalies(metricDataFrame, anomalies, 
dimensionKeys);
+  }
+
+  /**
+   * Return a Future thread with aggregated metric value as return
+   * @param aggregationLoader an aggregation loader
+   * @param metricId the metric id
+   * @param start the start time in epoch time
+   * @param end the end time in epoch time
+   * @param dimensionKeys the list of dimension keys
+   * @return a Future thread with aggregated metric value
+   */
+  private Future<DataFrame> fetchAggregatedMetric(AggregationLoader 
aggregationLoader, long metricId, long start,
+      long end, List<String> dimensionKeys) {
+    return executor.submit(() -> {
+      MetricSlice metricSlice = MetricSlice.from(metricId, start, end);
+      DataFrame resultDataFrame = null;
+      try {
+        resultDataFrame = aggregationLoader.loadAggregate(metricSlice, 
dimensionKeys, -1);
+      } catch (Exception e) {
+        throw new ExecutionException(e);
+      }
+      return resultDataFrame;
+    });
   }
 
   /**
-   * Flat an anomaly to a flat map structure
-   * @param anomalyResult an instance of MergedAnomalyResultDTO
-   * @param tableKeys a list of keys in dimensions; if null, use all keys in 
dimension
-   * @return a map of information in the anomaly result with the required keys
+   * Reformat the rows in data frame and anomaly information into a list of map
+   * @param metricDataFrame a map from metric name to dataframe with 
aggregated time series
+   * @param anomalies a list of anomalies within the same time interval as 
dataframe
+   * @param dimensions a list of dimensions
+   * @return a list of maps with dataframe and anomaly information
    */
-  public static Map<String, Object> flatAnomalyResult(MergedAnomalyResultDTO 
anomalyResult,
-      List<String> tableKeys) {
-    Preconditions.checkNotNull(anomalyResult);
-    Map<String, Object> flatMap = new HashMap<>();
-    flatMap.put(ANOMALY_ID, anomalyResult.getId());
-    DimensionMap dimension = anomalyResult.getDimensions();
-    if (tableKeys == null) {
-      tableKeys = new ArrayList<>(dimension.keySet());
+  public static List<Map<String, Object>> 
reformatDataFrameAndAnomalies(Map<String, DataFrame> metricDataFrame,
+      List<MergedAnomalyResultDTO> anomalies, List<String> dimensions) {
+    List<Map<String, Object>> resultList = new ArrayList<>();
+    Map<DimensionMap, Map<Long, String>> anomalyCommentMap = 
extractComments(anomalies);
+    List<String> metrics = new ArrayList<>(metricDataFrame.keySet());
+    Map<DimensionMap, Map<String, Double>> metricValues = new HashMap<>();
+    for (String metric : metrics) {
+      DataFrame dataFrame = metricDataFrame.get(metric);
+      for (int i = 0; i < dataFrame.size(); i++) {
+        DimensionMap dimensionMap = new DimensionMap();
+        for (String dimensionKey : dimensions) {
+          String dimensionValue = dataFrame.getString(dimensionKey, i);
+          dimensionMap.put(dimensionKey, dimensionValue);
+        }
+        if (!metricValues.containsKey(dimensionMap)) {
+          metricValues.put(dimensionMap, new HashMap<>());
+        }
+        metricValues.get(dimensionMap).put(metric, 
dataFrame.getDouble(DATAFRAME_VALUE, i));
+      }
     }
-    for (String key : tableKeys) {
-      flatMap.put(key, dimension.getOrDefault(key, ""));
+
+    for (DimensionMap dimensionMap : metricValues.keySet()) {
+      Map<String, Object> resultMap = new ListOrderedMap<>();
+      for (String key : dimensionMap.keySet()) {
+        resultMap.put(key, dimensionMap.get(key));
+      }
+      for (String metric : metrics) {
+        resultMap.put(metric, metricValues.getOrDefault(dimensionMap, 
Collections.emptyMap()).getOrDefault(metric, Double.NaN));
+      }
+      Map<Long, String> comment = Collections.emptyMap();
+      if (anomalyCommentMap.containsKey(dimensionMap)) {
+        comment = anomalyCommentMap.get(dimensionMap);
+      }
+      resultMap.put(ANOMALY_COMMENT, comment);
+
+      resultList.add(resultMap);
     }
-    AnomalyFeedback feedback = anomalyResult.getFeedback();
-    if (feedback != null) {
-      flatMap.put(ANOMALY_COMMENT, feedback.getComment());
-    } else {
-      flatMap.put(ANOMALY_COMMENT, "");
+
+    return resultList;
+  }
+
+  /**
+   * Extract comments from anomalies and format it as a map
+   * @param anomalies a list of merged anomaly results
+   * @return a map from dimension map to a map of anomaly id to its comment
+   */
+  public static Map<DimensionMap, Map<Long, String>> extractComments 
(List<MergedAnomalyResultDTO> anomalies) {
+    Map<DimensionMap, Map<Long, String>> resultMap = new HashMap<>();
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      DimensionMap dimensions = anomaly.getDimensions();
+      if (!resultMap.containsKey(dimensions)) {
+        resultMap.put(dimensions, new HashMap<>());
+      }
+      long anomalyId = anomaly.getId();
+      String comment = String.format(DEFAULT_COMMENT_FORMAT, anomalyId, "Not 
Reviewed Yet");
+      AnomalyFeedback feedback = anomaly.getFeedback();
+      if (feedback != null) {
+        if (StringUtils.isNoneBlank(feedback.getComment())) {
+          comment = feedback.getComment();
+        } else {
+          comment = String.format(DEFAULT_COMMENT_FORMAT, anomalyId, 
feedback.getFeedbackType().toString());
+        }
+      }
+      resultMap.get(dimensions).put(anomalyId, comment);
     }
-    flatMap.put(anomalyResult.getMetric(), anomalyResult.getAvgCurrentVal());
-    return flatMap;
+    return resultMap;
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
index f501248..658e601 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/resources/TestAnomalyFlattenResource.java
@@ -23,14 +23,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.dashboard.resources.AnomalyFlattenResource;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TestMergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.joda.time.DateTime;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -63,30 +64,40 @@ public class TestAnomalyFlattenResource {
   }
 
   @Test
-  public void testFlatAnomalyResults() {
-    AnomalyFlattenResource resource = new 
AnomalyFlattenResource(this.mergedAnomalyResultDAO);
-    List<Map<String, Object>> actualResults = 
resource.flatAnomalyResults(this.detectionConfigId, new DateTime(2019, 1, 1, 0, 
0).getMillis(), new DateTime(2019, 1, 3, 0, 0).getMillis(), null);
+  public void testReformatDataFrameAndAnomalies() {
+    Map<String, DataFrame> metricDataFrame = new HashMap<String, DataFrame>() 
{{
+      put("metric", mockDataFrame());
+    }};
+
+    List<MergedAnomalyResultDTO> anomalies = 
TestMergedAnomalyResultManager.mockAnomalies(detectionConfigId);
+    for (int i = 0; i < anomalies.size(); i++) {
+      anomalies.get(i).setId((long) i + 1);
+    }
+    List<Map<String, Object>> actualResults = 
AnomalyFlattenResource.reformatDataFrameAndAnomalies(metricDataFrame,
+        anomalies, Arrays.asList("what", "where", "when", "how"));
     List<Map<String, Object>> expectedResults = Arrays.asList(
         new HashMap<String, Object>() {
           {
-            put(ANOMALY_ID, 2L);
             put("what", "a");
             put("where", "b");
             put("when", "c");
             put("how", "d");
-            put("metric", 0d);
-            put(ANOMALY_COMMENT, "");
+            put("metric", 0.1d);
+            put(ANOMALY_COMMENT, new HashMap<Long, String>(){{
+              put(1l, "#1 is Not Reviewed Yet");
+            }});
           }
         },
         new HashMap<String, Object>() {
           {
-            put(ANOMALY_ID, 3L);
             put("what", "e");
             put("where", "f");
             put("when", "g");
             put("how", "h");
-            put("metric", 0d);
-            put(ANOMALY_COMMENT, "");
+            put("metric", 0.2d);
+            put(ANOMALY_COMMENT, new HashMap<Long, String>(){{
+              put(2l, "#2 is Not Reviewed Yet");
+            }});
           }
         }
     );
@@ -94,22 +105,40 @@ public class TestAnomalyFlattenResource {
   }
 
   @Test
-  public void testFlatAnomalyResult() {
+  public void testExtractComments() {
     List<MergedAnomalyResultDTO> anomalies = 
TestMergedAnomalyResultManager.mockAnomalies(detectionConfigId);
-    MergedAnomalyResultDTO anomaly = anomalies.get(0);
-    anomaly.setId(1L);
-    Map<String, Object> actualMap = 
AnomalyFlattenResource.flatAnomalyResult(anomaly, null);
-    Map<String, Object> expectMap = new HashMap<String, Object>() {
+    for (int i = 0; i < anomalies.size(); i++) {
+      anomalies.get(i).setId((long) i + 1);
+    }
+    Map<DimensionMap, Map<Long, String>> actualMap = 
AnomalyFlattenResource.extractComments(anomalies);
+    Map<DimensionMap, Map<Long, String>> expectMap = new HashMap<DimensionMap, 
Map<Long, String>>() {
       {
-        put(ANOMALY_ID, anomaly.getId());
-        put("what", "a");
-        put("where", "b");
-        put("when", "c");
-        put("how", "d");
-        put("metric", 0d);
-        put(ANOMALY_COMMENT, "");
+        DimensionMap dm = new DimensionMap();
+        dm.put("what", "a");
+        dm.put("where", "b");
+        dm.put("when", "c");
+        dm.put("how", "d");
+        put(dm, new HashMap<Long, String>(){
+          {
+            put(1l, "#1 is Not Reviewed Yet");
+          }});
+
+        dm = new DimensionMap();
+        dm.put("what", "e");
+        dm.put("where", "f");
+        dm.put("when", "g");
+        dm.put("how", "h");
+        put(dm, new HashMap<Long, String>(){
+          {
+            put(2l, "#2 is Not Reviewed Yet");
+          }});
       }
     };
     Assert.assertEquals(actualMap, expectMap);
   }
+
+  private DataFrame mockDataFrame() {
+    return DataFrame.builder("what", "where", "when", "how", "value")
+        .append("a", "b", "c", "d", 0.1d).append("e", "f", "g", "h", 
0.2d).build();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to