This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 34d0e12  [TE] rootcause - request chunking for aggregates and scores 
(#3496)
34d0e12 is described below

commit 34d0e12cecbb6f86301eb2d421bd5e0518168f9a
Author: Alexander Pucher <[email protected]>
AuthorDate: Mon Nov 19 14:22:57 2018 -0800

    [TE] rootcause - request chunking for aggregates and scores (#3496)
    
    The RCA dashboard currently loads multiple offsets per metric at once. 
Unfortunately, this is still insufficient for very large dashboards with 1000+ 
metrics. This PR adds batching of both metrics and offsets within a single 
request. Additionally, it enables batching for on-demand scoring of metrics for 
outliers. This trades off responsiveness for a larger pipe - ultimately this 
should be superseded by a websocket-like implementation.
---
 .../services/rootcause-aggregates-cache/service.js | 83 ++++++++++++++------
 .../app/pods/services/rootcause-fetcher/service.js |  2 +
 .../services/rootcause-scores-cache/service.js     | 10 ++-
 .../resources/v2/RootCauseMetricResource.java      | 89 +++++++++++++++++++---
 4 files changed, 145 insertions(+), 39 deletions(-)

diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
index b2c4e35..6221342 100644
--- 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
+++ 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
@@ -1,14 +1,17 @@
 import Service from '@ember/service';
 import { inject as service } from '@ember/service';
 import {
+  toOffsetUrn,
+  toMetricUrn,
   toAbsoluteUrn,
-  toMetricUrn
+  makeIterable
 } from 'thirdeye-frontend/utils/rca-utils';
 import { checkStatus } from 'thirdeye-frontend/utils/utils';
 import _ from 'lodash';
 
-const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/batch';
+const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/chunk';
 const ROOTCAUSE_AGGREGATES_PRIORITY = 20;
+const ROOTCAUSE_AGGREGATES_CHUNK_SIZE = 10;
 
 export default Service.extend({
   aggregates: null, // {}
@@ -61,43 +64,64 @@ export default Service.extend({
       return;
     }
 
-    const metricUrnToOffestAndUrn = {};
-    missing.forEach(urn => {
-      const metricUrn = toMetricUrn(urn);
-      const offsetsAndUrns = metricUrnToOffestAndUrn[metricUrn] || [];
-      offsetsAndUrns.push([toAbsoluteUrn(urn, 
requestContext.compareMode).split(':')[2].toLowerCase(), urn]);
-      metricUrnToOffestAndUrn[metricUrn] = offsetsAndUrns;
-    });
-
-    [...Object.keys(metricUrnToOffestAndUrn)].sort().forEach((metricUrn, i) => 
{
-      this._fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn, 
i);
-    });
+    // group by metrics and offsets
+    const groupedByUrn = [...missing]
+      .map(urn => toAbsoluteUrn(urn, requestContext.compareMode))
+      .map(urn => { return { urn, base: toMetricUrn(urn), offset: 
urn.split(':')[2] }; })
+      .reduce((agg, obj) => {
+        agg[obj.base] = (agg[obj.base] || new Set());
+        agg[obj.base].add(obj.offset);
+        return agg;
+      }, {});
+
+    // workaround for JS conversion of key values to strings
+    const setsOfOffsets = Object.keys(groupedByUrn)
+      .reduce((agg, urn) => {
+        const offsets = [...groupedByUrn[urn]].sort();
+        const key = offsets.join('_');
+        agg[key] = new Set(offsets);
+        return agg;
+      }, {});
+
+    // hack baseline translation
+    const baselineOffset = requestContext.compareMode === 'WoW' ? 'wo1w' : 
requestContext.compareMode.toLowerCase();
+
+    Object.values(setsOfOffsets)
+      .forEach(offsets => {
+        const urns = Object.keys(groupedByUrn).filter(urn => 
_.isEqual(groupedByUrn[urn], offsets));
+        const chunks = _.chunk(urns.sort(), ROOTCAUSE_AGGREGATES_CHUNK_SIZE);
+        chunks.forEach((urns, i) => {
+          this._fetchChunk(urns, [...offsets].sort(), baselineOffset, 
requestContext, i);
+        });
+      });
   },
 
   /**
    * Fetch the metric data for a row of the metric table
    *
-   * @param {String} metricUrn Metric urn
-   * @param {Object} context Context
-   * @param {Object} metricUrnToOffestAndUrn Hash map from metric urn to 
offset and urn
+   * @param {Array} metricUrns Metric urns
+   * @param {Array} offsets time offsets
+   * @param {string} baselineOffset offset for baseline translation
+   * @param {Object} requestContext Context
    * @returns {undefined}
    */
-  async _fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn, 
index) {
+  async _fetchChunk(metricUrns, offsets, baselineOffset, requestContext, 
index) {
     const fetcher = this.get('fetcher');
 
     const [ start, end ] = requestContext.anomalyRange;
-    const offsets = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[0]);
-    const urns = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[1]);
     const timezone = 'America/Los_Angeles';
 
-    const url = 
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urn=${metricUrn}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
+    const url = 
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urns=${encodeURIComponent(metricUrns)}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
     try {
       const payload = await fetcher.fetch(url, ROOTCAUSE_AGGREGATES_PRIORITY, 
index);
       const json = await checkStatus(payload);
-      const aggregates = this._extractAggregatesBatch(json, urns);
+      const aggregates = this._extractAggregatesChunk(json, metricUrns, 
offsets, baselineOffset);
       this._complete(requestContext, aggregates);
 
     } catch (error) {
+      const urns = metricUrns.reduce((agg, metricUrn) => {
+        return agg.concat([...offsets, 'baseline'].map(offset => 
toOffsetUrn(metricUrn, offset)));
+      }, []);
       this._handleErrorBatch(urns, error);
     }
   },
@@ -106,11 +130,20 @@ export default Service.extend({
     urns.forEach(urn => this._handleError(urn, error));
   },
 
-  _extractAggregatesBatch(incoming, urns) {
+  _extractAggregatesChunk(incoming, metricUrns, offsets, baselineOffset) {
     const aggregates = {};
-    for (var i = 0; i < urns.length; i++) {
-      aggregates[urns[i]] = incoming[i];
-    }
+    metricUrns.forEach(metricUrn => {
+      offsets.forEach((offset, i) => {
+        const urn = toOffsetUrn(metricUrn, offset);
+        aggregates[urn] = incoming[metricUrn][i];
+
+        // duplicate absolute offset as baseline value
+        if (offset === baselineOffset) {
+          const baselineUrn = toOffsetUrn(metricUrn, 'baseline');
+          aggregates[baselineUrn] = aggregates[urn];
+        }
+      });
+    });
 
     return aggregates;
   },
diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
index cadcdbb..aaabb85 100644
--- a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
+++ b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
@@ -3,6 +3,8 @@ import { get, getProperties, set, setProperties } from 
'@ember/object';
 import fetch from 'fetch';
 import RSVP from 'rsvp';
 
+// TODO halt loading while not on RCA page
+
 /**
  * Comparator for of MyPromise for priority queue
  */
diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
index e04d909..6e1d4b5 100644
--- 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
+++ 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
@@ -10,6 +10,7 @@ import _ from 'lodash';
 
 const ROOTCAUSE_SCORES_ENDPOINT = '/rootcause/query';
 const ROOTCAUSE_SCORES_PRIORITY = 20;
+const ROOTCAUSE_SCORES_CHUNK_SIZE = 10;
 
 export default Service.extend({
   scores: null, // {}
@@ -65,14 +66,15 @@ export default Service.extend({
 
     // metrics
     const fetcher = this.get('fetcher');
+    const chunks = _.chunk([...missing].sort(), ROOTCAUSE_SCORES_CHUNK_SIZE);
 
-    [...missing].sort().forEach((urn, i) => {
-      const url = this._makeUrl('metricAnalysis', requestContext, [urn]);
+    chunks.forEach((urns, i) => {
+      const url = this._makeUrl('metricAnalysis', requestContext, urns);
       fetcher.fetch(url, ROOTCAUSE_SCORES_PRIORITY, i)
         .then(checkStatus)
-        .then(res => this._extractScores(res, [urn]))
+        .then(res => this._extractScores(res, urns))
         .then(res => this._complete(requestContext, res))
-        .catch(error => this._handleError([urn], error));
+        .catch(error => this._handleError(urns, error));
     });
   },
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
index a8b2b3f..11e9e2e 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
@@ -16,6 +16,8 @@
 
 package com.linkedin.thirdeye.dashboard.resources.v2;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.linkedin.thirdeye.api.Constants;
 import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.dataframe.DataFrame;
@@ -33,6 +35,7 @@ import com.wordnik.swagger.annotations.Api;
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +54,7 @@ import javax.ws.rs.core.MediaType;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
@@ -154,7 +158,7 @@ public class RootCauseMetricResource {
   }
 
   /**
-   * Returns a list of aggregate value for the specified metric and time 
range, and (optionally) a list of offset.
+   * Returns a list of aggregate value for the specified metric and time 
range, and a list of offset.
    * Aligns time stamps if necessary and returns NaN if no data is available 
for the given time range.
    *
    * @param urn metric urn
@@ -180,21 +184,16 @@ public class RootCauseMetricResource {
       throws Exception {
     List<Double> aggregateValues = new ArrayList<>();
 
+    if (StringUtils.isBlank(timezone)) {
+      timezone = TIMEZONE_DEFAULT;
+    }
+
     offsets = ResourceUtils.parseListParams(offsets);
     List<MetricSlice> slices = new ArrayList<>();
 
     Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
     Map<String, Baseline> offsetToRange = new HashMap<>();
     for (String offset : offsets) {
-      // Put all metric slices together
-      if (StringUtils.isBlank(offset)) {
-        offset = OFFSET_DEFAULT;
-      }
-
-      if (StringUtils.isBlank(timezone)) {
-        timezone = TIMEZONE_DEFAULT;
-      }
-
       MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end), timezone);
       offsetToBaseSlice.put(offset, baseSlice);
 
@@ -223,6 +222,76 @@ public class RootCauseMetricResource {
   }
 
   /**
+   * Returns a map of lists of aggregate values for the specified metrics and 
time range, and a list of offsets.
+   * Aligns time stamps if necessary and returns NaN if no data is available 
for the given time range.
+   *
+   * @param urns metric urns
+   * @param start start time (in millis)
+   * @param end end time (in millis)
+   * @param offsets A list of offset identifiers (e.g. "current", "wo2w")
+   * @param timezone timezone identifier (e.g. "America/Los_Angeles")
+   *
+   * @see BaselineParsingUtils#parseOffset(String, String) supported offsets
+   *
+   * @return map of lists (keyed by urn) of aggregate values, or NaN if data 
not available
+   * @throws Exception on catch-all execution failure
+   */
+  @GET
+  @Path("/aggregate/chunk")
+  @ApiOperation(value = "Returns a map of lists (keyed by urn) of aggregate 
value for the specified metrics and time range, and offsets.")
+  public Map<String, Collection<Double>> getAggregateChunk(
+      @ApiParam(value = "metric urns", required = true) @QueryParam("urns") 
@NotNull List<String> urns,
+      @ApiParam(value = "start time (in millis)", required = true) 
@QueryParam("start") @NotNull long start,
+      @ApiParam(value = "end time (in millis)", required = true) 
@QueryParam("end") @NotNull long end,
+      @ApiParam(value = "A list of offset identifier separated by comma (e.g. 
\"current\", \"wo2w\")") @QueryParam("offsets") List<String> offsets,
+      @ApiParam(value = "timezone identifier (e.g. \"America/Los_Angeles\")") 
@QueryParam("timezone") String timezone)
+      throws Exception {
+    ListMultimap<String, Double> aggregateValues = ArrayListMultimap.create();
+
+    if (StringUtils.isBlank(timezone)) {
+      timezone = TIMEZONE_DEFAULT;
+    }
+
+    urns = ResourceUtils.parseListParams(urns);
+    offsets = ResourceUtils.parseListParams(offsets);
+    List<MetricSlice> slices = new ArrayList<>();
+
+    Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
+    Map<Pair<String, String>, Baseline> tupleToRange = new HashMap<>();
+    for (String urn : urns) {
+      for (String offset : offsets) {
+        MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end), 
timezone);
+        offsetToBaseSlice.put(offset, baseSlice);
+
+        Baseline range = parseOffset(offset, timezone);
+        tupleToRange.put(Pair.of(urn, offset), range);
+
+        List<MetricSlice> currentSlices = range.scatter(baseSlice);
+
+        slices.addAll(currentSlices);
+        logSlices(baseSlice, currentSlices);
+      }
+    }
+
+    // Fetch all aggregates
+    Map<MetricSlice, DataFrame> data = fetchAggregates(slices);
+
+    // Pick the results
+    for (String urn : urns) {
+      for (String offset : offsets) {
+        DataFrame result = tupleToRange.get(Pair.of(urn, 
offset)).gather(offsetToBaseSlice.get(offset), data);
+        if (result.isEmpty()) {
+          aggregateValues.put(urn, Double.NaN);
+        } else {
+          aggregateValues.put(urn, result.getDouble(COL_VALUE, 0));
+        }
+      }
+    }
+
+    return aggregateValues.asMap();
+  }
+
+  /**
    * Returns a breakdown (de-aggregation) of the specified metric and time 
range, and (optionally) offset.
    * Aligns time stamps if necessary and omits null values.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to