apucher closed pull request #3496: [TE] rootcause - request chunking for 
aggregates and scores
URL: https://github.com/apache/incubator-pinot/pull/3496
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
index b2c4e35825..6221342e0f 100644
--- 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
+++ 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
@@ -1,14 +1,17 @@
 import Service from '@ember/service';
 import { inject as service } from '@ember/service';
 import {
+  toOffsetUrn,
+  toMetricUrn,
   toAbsoluteUrn,
-  toMetricUrn
+  makeIterable
 } from 'thirdeye-frontend/utils/rca-utils';
 import { checkStatus } from 'thirdeye-frontend/utils/utils';
 import _ from 'lodash';
 
-const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/batch';
+const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/chunk';
 const ROOTCAUSE_AGGREGATES_PRIORITY = 20;
+const ROOTCAUSE_AGGREGATES_CHUNK_SIZE = 10;
 
 export default Service.extend({
   aggregates: null, // {}
@@ -61,43 +64,64 @@ export default Service.extend({
       return;
     }
 
-    const metricUrnToOffestAndUrn = {};
-    missing.forEach(urn => {
-      const metricUrn = toMetricUrn(urn);
-      const offsetsAndUrns = metricUrnToOffestAndUrn[metricUrn] || [];
-      offsetsAndUrns.push([toAbsoluteUrn(urn, 
requestContext.compareMode).split(':')[2].toLowerCase(), urn]);
-      metricUrnToOffestAndUrn[metricUrn] = offsetsAndUrns;
-    });
-
-    [...Object.keys(metricUrnToOffestAndUrn)].sort().forEach((metricUrn, i) => 
{
-      this._fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn, 
i);
-    });
+    // group by metrics and offsets
+    const groupedByUrn = [...missing]
+      .map(urn => toAbsoluteUrn(urn, requestContext.compareMode))
+      .map(urn => { return { urn, base: toMetricUrn(urn), offset: 
urn.split(':')[2] }; })
+      .reduce((agg, obj) => {
+        agg[obj.base] = (agg[obj.base] || new Set());
+        agg[obj.base].add(obj.offset);
+        return agg;
+      }, {});
+
+    // workaround for JS conversion of key values to strings
+    const setsOfOffsets = Object.keys(groupedByUrn)
+      .reduce((agg, urn) => {
+        const offsets = [...groupedByUrn[urn]].sort();
+        const key = offsets.join('_');
+        agg[key] = new Set(offsets);
+        return agg;
+      }, {});
+
+    // hack baseline translation
+    const baselineOffset = requestContext.compareMode === 'WoW' ? 'wo1w' : 
requestContext.compareMode.toLowerCase();
+
+    Object.values(setsOfOffsets)
+      .forEach(offsets => {
+        const urns = Object.keys(groupedByUrn).filter(urn => 
_.isEqual(groupedByUrn[urn], offsets));
+        const chunks = _.chunk(urns.sort(), ROOTCAUSE_AGGREGATES_CHUNK_SIZE);
+        chunks.forEach((urns, i) => {
+          this._fetchChunk(urns, [...offsets].sort(), baselineOffset, 
requestContext, i);
+        });
+      });
   },
 
   /**
    * Fetch the metric data for a row of the metric table
    *
-   * @param {String} metricUrn Metric urn
-   * @param {Object} context Context
-   * @param {Object} metricUrnToOffestAndUrn Hash map from metric urn to 
offset and urn
+   * @param {Array} metricUrns Metric urns
+   * @param {Array} offsets time offsets
+   * @param {string} baselineOffset offset for baseline translation
+   * @param {Object} requestContext Context
    * @returns {undefined}
    */
-  async _fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn, 
index) {
+  async _fetchChunk(metricUrns, offsets, baselineOffset, requestContext, 
index) {
     const fetcher = this.get('fetcher');
 
     const [ start, end ] = requestContext.anomalyRange;
-    const offsets = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[0]);
-    const urns = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[1]);
     const timezone = 'America/Los_Angeles';
 
-    const url = 
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urn=${metricUrn}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
+    const url = 
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urns=${encodeURIComponent(metricUrns)}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
     try {
       const payload = await fetcher.fetch(url, ROOTCAUSE_AGGREGATES_PRIORITY, 
index);
       const json = await checkStatus(payload);
-      const aggregates = this._extractAggregatesBatch(json, urns);
+      const aggregates = this._extractAggregatesChunk(json, metricUrns, 
offsets, baselineOffset);
       this._complete(requestContext, aggregates);
 
     } catch (error) {
+      const urns = metricUrns.reduce((agg, metricUrn) => {
+        return agg.concat([...offsets, 'baseline'].map(offset => 
toOffsetUrn(metricUrn, offset)));
+      }, []);
       this._handleErrorBatch(urns, error);
     }
   },
@@ -106,11 +130,20 @@ export default Service.extend({
     urns.forEach(urn => this._handleError(urn, error));
   },
 
-  _extractAggregatesBatch(incoming, urns) {
+  _extractAggregatesChunk(incoming, metricUrns, offsets, baselineOffset) {
     const aggregates = {};
-    for (var i = 0; i < urns.length; i++) {
-      aggregates[urns[i]] = incoming[i];
-    }
+    metricUrns.forEach(metricUrn => {
+      offsets.forEach((offset, i) => {
+        const urn = toOffsetUrn(metricUrn, offset);
+        aggregates[urn] = incoming[metricUrn][i];
+
+        // duplicate absolute offset as baseline value
+        if (offset === baselineOffset) {
+          const baselineUrn = toOffsetUrn(metricUrn, 'baseline');
+          aggregates[baselineUrn] = aggregates[urn];
+        }
+      });
+    });
 
     return aggregates;
   },
diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
index cadcdbb551..aaabb85e8d 100644
--- a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
+++ b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
@@ -3,6 +3,8 @@ import { get, getProperties, set, setProperties } from 
'@ember/object';
 import fetch from 'fetch';
 import RSVP from 'rsvp';
 
+// TODO halt loading while not on RCA page
+
 /**
  * Comparator for of MyPromise for priority queue
  */
diff --git 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
index e04d9092bf..6e1d4b5494 100644
--- 
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
+++ 
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
@@ -10,6 +10,7 @@ import _ from 'lodash';
 
 const ROOTCAUSE_SCORES_ENDPOINT = '/rootcause/query';
 const ROOTCAUSE_SCORES_PRIORITY = 20;
+const ROOTCAUSE_SCORES_CHUNK_SIZE = 10;
 
 export default Service.extend({
   scores: null, // {}
@@ -65,14 +66,15 @@ export default Service.extend({
 
     // metrics
     const fetcher = this.get('fetcher');
+    const chunks = _.chunk([...missing].sort(), ROOTCAUSE_SCORES_CHUNK_SIZE);
 
-    [...missing].sort().forEach((urn, i) => {
-      const url = this._makeUrl('metricAnalysis', requestContext, [urn]);
+    chunks.forEach((urns, i) => {
+      const url = this._makeUrl('metricAnalysis', requestContext, urns);
       fetcher.fetch(url, ROOTCAUSE_SCORES_PRIORITY, i)
         .then(checkStatus)
-        .then(res => this._extractScores(res, [urn]))
+        .then(res => this._extractScores(res, urns))
         .then(res => this._complete(requestContext, res))
-        .catch(error => this._handleError([urn], error));
+        .catch(error => this._handleError(urns, error));
     });
   },
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
index a8b2b3fc68..11e9e2e257 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
@@ -16,6 +16,8 @@
 
 package com.linkedin.thirdeye.dashboard.resources.v2;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.linkedin.thirdeye.api.Constants;
 import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.dataframe.DataFrame;
@@ -33,6 +35,7 @@
 import com.wordnik.swagger.annotations.ApiOperation;
 import com.wordnik.swagger.annotations.ApiParam;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -51,6 +54,7 @@
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
@@ -154,7 +158,7 @@ public double getAggregate(@ApiParam(value = "metric urn", 
required = true) @Que
   }
 
   /**
-   * Returns a list of aggregate value for the specified metric and time 
range, and (optionally) a list of offset.
+   * Returns a list of aggregate value for the specified metric and time 
range, and a list of offset.
    * Aligns time stamps if necessary and returns NaN if no data is available 
for the given time range.
    *
    * @param urn metric urn
@@ -180,21 +184,16 @@ public double getAggregate(@ApiParam(value = "metric 
urn", required = true) @Que
       throws Exception {
     List<Double> aggregateValues = new ArrayList<>();
 
+    if (StringUtils.isBlank(timezone)) {
+      timezone = TIMEZONE_DEFAULT;
+    }
+
     offsets = ResourceUtils.parseListParams(offsets);
     List<MetricSlice> slices = new ArrayList<>();
 
     Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
     Map<String, Baseline> offsetToRange = new HashMap<>();
     for (String offset : offsets) {
-      // Put all metric slices together
-      if (StringUtils.isBlank(offset)) {
-        offset = OFFSET_DEFAULT;
-      }
-
-      if (StringUtils.isBlank(timezone)) {
-        timezone = TIMEZONE_DEFAULT;
-      }
-
       MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end), timezone);
       offsetToBaseSlice.put(offset, baseSlice);
 
@@ -222,6 +221,76 @@ public double getAggregate(@ApiParam(value = "metric urn", 
required = true) @Que
     return aggregateValues;
   }
 
+  /**
+   * Returns a map of lists of aggregate values for the specified metrics and 
time range, and a list of offsets.
+   * Aligns time stamps if necessary and returns NaN if no data is available 
for the given time range.
+   *
+   * @param urns metric urns
+   * @param start start time (in millis)
+   * @param end end time (in millis)
+   * @param offsets A list of offset identifiers (e.g. "current", "wo2w")
+   * @param timezone timezone identifier (e.g. "America/Los_Angeles")
+   *
+   * @see BaselineParsingUtils#parseOffset(String, String) supported offsets
+   *
+   * @return map of lists (keyed by urn) of aggregate values, or NaN if data 
not available
+   * @throws Exception on catch-all execution failure
+   */
+  @GET
+  @Path("/aggregate/chunk")
+  @ApiOperation(value = "Returns a map of lists (keyed by urn) of aggregate 
value for the specified metrics and time range, and offsets.")
+  public Map<String, Collection<Double>> getAggregateChunk(
+      @ApiParam(value = "metric urns", required = true) @QueryParam("urns") 
@NotNull List<String> urns,
+      @ApiParam(value = "start time (in millis)", required = true) 
@QueryParam("start") @NotNull long start,
+      @ApiParam(value = "end time (in millis)", required = true) 
@QueryParam("end") @NotNull long end,
+      @ApiParam(value = "A list of offset identifier separated by comma (e.g. 
\"current\", \"wo2w\")") @QueryParam("offsets") List<String> offsets,
+      @ApiParam(value = "timezone identifier (e.g. \"America/Los_Angeles\")") 
@QueryParam("timezone") String timezone)
+      throws Exception {
+    ListMultimap<String, Double> aggregateValues = ArrayListMultimap.create();
+
+    if (StringUtils.isBlank(timezone)) {
+      timezone = TIMEZONE_DEFAULT;
+    }
+
+    urns = ResourceUtils.parseListParams(urns);
+    offsets = ResourceUtils.parseListParams(offsets);
+    List<MetricSlice> slices = new ArrayList<>();
+
+    Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
+    Map<Pair<String, String>, Baseline> tupleToRange = new HashMap<>();
+    for (String urn : urns) {
+      for (String offset : offsets) {
+        MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end), 
timezone);
+        offsetToBaseSlice.put(offset, baseSlice);
+
+        Baseline range = parseOffset(offset, timezone);
+        tupleToRange.put(Pair.of(urn, offset), range);
+
+        List<MetricSlice> currentSlices = range.scatter(baseSlice);
+
+        slices.addAll(currentSlices);
+        logSlices(baseSlice, currentSlices);
+      }
+    }
+
+    // Fetch all aggregates
+    Map<MetricSlice, DataFrame> data = fetchAggregates(slices);
+
+    // Pick the results
+    for (String urn : urns) {
+      for (String offset : offsets) {
+        DataFrame result = tupleToRange.get(Pair.of(urn, 
offset)).gather(offsetToBaseSlice.get(offset), data);
+        if (result.isEmpty()) {
+          aggregateValues.put(urn, Double.NaN);
+        } else {
+          aggregateValues.put(urn, result.getDouble(COL_VALUE, 0));
+        }
+      }
+    }
+
+    return aggregateValues.asMap();
+  }
+
   /**
    * Returns a breakdown (de-aggregation) of the specified metric and time 
range, and (optionally) offset.
    * Aligns time stamps if necessary and omits null values.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to