This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 34d0e12 [TE] rootcause - request chunking for aggregates and scores
(#3496)
34d0e12 is described below
commit 34d0e12cecbb6f86301eb2d421bd5e0518168f9a
Author: Alexander Pucher <[email protected]>
AuthorDate: Mon Nov 19 14:22:57 2018 -0800
[TE] rootcause - request chunking for aggregates and scores (#3496)
The RCA dashboard currently loads multiple offsets per metric at once.
Unfortunately, this is still insufficient for very large dashboards with 1000+
metrics. This PR adds batching of both metrics and offsets within a single
request. Additionally, it enables batching for on-demand scoring of metrics for
outliers. This trades off responsiveness for a larger pipe - ultimately this
should be superseded by a websocket-like implementation.
---
.../services/rootcause-aggregates-cache/service.js | 83 ++++++++++++++------
.../app/pods/services/rootcause-fetcher/service.js | 2 +
.../services/rootcause-scores-cache/service.js | 10 ++-
.../resources/v2/RootCauseMetricResource.java | 89 +++++++++++++++++++---
4 files changed, 145 insertions(+), 39 deletions(-)
diff --git
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
index b2c4e35..6221342 100644
---
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
+++
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-aggregates-cache/service.js
@@ -1,14 +1,17 @@
import Service from '@ember/service';
import { inject as service } from '@ember/service';
import {
+ toOffsetUrn,
+ toMetricUrn,
toAbsoluteUrn,
- toMetricUrn
+ makeIterable
} from 'thirdeye-frontend/utils/rca-utils';
import { checkStatus } from 'thirdeye-frontend/utils/utils';
import _ from 'lodash';
-const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/batch';
+const ROOTCAUSE_AGGREGATES_ENDPOINT = '/rootcause/metric/aggregate/chunk';
const ROOTCAUSE_AGGREGATES_PRIORITY = 20;
+const ROOTCAUSE_AGGREGATES_CHUNK_SIZE = 10;
export default Service.extend({
aggregates: null, // {}
@@ -61,43 +64,64 @@ export default Service.extend({
return;
}
- const metricUrnToOffestAndUrn = {};
- missing.forEach(urn => {
- const metricUrn = toMetricUrn(urn);
- const offsetsAndUrns = metricUrnToOffestAndUrn[metricUrn] || [];
- offsetsAndUrns.push([toAbsoluteUrn(urn,
requestContext.compareMode).split(':')[2].toLowerCase(), urn]);
- metricUrnToOffestAndUrn[metricUrn] = offsetsAndUrns;
- });
-
- [...Object.keys(metricUrnToOffestAndUrn)].sort().forEach((metricUrn, i) =>
{
- this._fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn,
i);
- });
+ // group by metrics and offsets
+ const groupedByUrn = [...missing]
+ .map(urn => toAbsoluteUrn(urn, requestContext.compareMode))
+ .map(urn => { return { urn, base: toMetricUrn(urn), offset:
urn.split(':')[2] }; })
+ .reduce((agg, obj) => {
+ agg[obj.base] = (agg[obj.base] || new Set());
+ agg[obj.base].add(obj.offset);
+ return agg;
+ }, {});
+
+ // workaround for JS conversion of key values to strings
+ const setsOfOffsets = Object.keys(groupedByUrn)
+ .reduce((agg, urn) => {
+ const offsets = [...groupedByUrn[urn]].sort();
+ const key = offsets.join('_');
+ agg[key] = new Set(offsets);
+ return agg;
+ }, {});
+
+ // hack baseline translation
+ const baselineOffset = requestContext.compareMode === 'WoW' ? 'wo1w' :
requestContext.compareMode.toLowerCase();
+
+ Object.values(setsOfOffsets)
+ .forEach(offsets => {
+ const urns = Object.keys(groupedByUrn).filter(urn =>
_.isEqual(groupedByUrn[urn], offsets));
+ const chunks = _.chunk(urns.sort(), ROOTCAUSE_AGGREGATES_CHUNK_SIZE);
+ chunks.forEach((urns, i) => {
+ this._fetchChunk(urns, [...offsets].sort(), baselineOffset,
requestContext, i);
+ });
+ });
},
/**
* Fetch the metric data for a row of the metric table
*
- * @param {String} metricUrn Metric urn
- * @param {Object} context Context
- * @param {Object} metricUrnToOffestAndUrn Hash map from metric urn to
offset and urn
+ * @param {Array} metricUrns Metric urns
+ * @param {Array} offsets time offsets
+ * @param {string} baselineOffset offset for baseline translation
+ * @param {Object} requestContext Context
* @returns {undefined}
*/
- async _fetchRowSlice(metricUrn, requestContext, metricUrnToOffestAndUrn,
index) {
+ async _fetchChunk(metricUrns, offsets, baselineOffset, requestContext,
index) {
const fetcher = this.get('fetcher');
const [ start, end ] = requestContext.anomalyRange;
- const offsets = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[0]);
- const urns = metricUrnToOffestAndUrn[metricUrn].map(tuple => tuple[1]);
const timezone = 'America/Los_Angeles';
- const url =
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urn=${metricUrn}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
+ const url =
`${ROOTCAUSE_AGGREGATES_ENDPOINT}?urns=${encodeURIComponent(metricUrns)}&start=${start}&end=${end}&offsets=${offsets}&timezone=${timezone}`;
try {
const payload = await fetcher.fetch(url, ROOTCAUSE_AGGREGATES_PRIORITY,
index);
const json = await checkStatus(payload);
- const aggregates = this._extractAggregatesBatch(json, urns);
+ const aggregates = this._extractAggregatesChunk(json, metricUrns,
offsets, baselineOffset);
this._complete(requestContext, aggregates);
} catch (error) {
+ const urns = metricUrns.reduce((agg, metricUrn) => {
+ return agg.concat([...offsets, 'baseline'].map(offset =>
toOffsetUrn(metricUrn, offset)));
+ }, []);
this._handleErrorBatch(urns, error);
}
},
@@ -106,11 +130,20 @@ export default Service.extend({
urns.forEach(urn => this._handleError(urn, error));
},
- _extractAggregatesBatch(incoming, urns) {
+ _extractAggregatesChunk(incoming, metricUrns, offsets, baselineOffset) {
const aggregates = {};
- for (var i = 0; i < urns.length; i++) {
- aggregates[urns[i]] = incoming[i];
- }
+ metricUrns.forEach(metricUrn => {
+ offsets.forEach((offset, i) => {
+ const urn = toOffsetUrn(metricUrn, offset);
+ aggregates[urn] = incoming[metricUrn][i];
+
+ // duplicate absolute offset as baseline value
+ if (offset === baselineOffset) {
+ const baselineUrn = toOffsetUrn(metricUrn, 'baseline');
+ aggregates[baselineUrn] = aggregates[urn];
+ }
+ });
+ });
return aggregates;
},
diff --git
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
index cadcdbb..aaabb85 100644
--- a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
+++ b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-fetcher/service.js
@@ -3,6 +3,8 @@ import { get, getProperties, set, setProperties } from
'@ember/object';
import fetch from 'fetch';
import RSVP from 'rsvp';
+// TODO halt loading while not on RCA page
+
/**
* Comparator for of MyPromise for priority queue
*/
diff --git
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
index e04d909..6e1d4b5 100644
---
a/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
+++
b/thirdeye/thirdeye-frontend/app/pods/services/rootcause-scores-cache/service.js
@@ -10,6 +10,7 @@ import _ from 'lodash';
const ROOTCAUSE_SCORES_ENDPOINT = '/rootcause/query';
const ROOTCAUSE_SCORES_PRIORITY = 20;
+const ROOTCAUSE_SCORES_CHUNK_SIZE = 10;
export default Service.extend({
scores: null, // {}
@@ -65,14 +66,15 @@ export default Service.extend({
// metrics
const fetcher = this.get('fetcher');
+ const chunks = _.chunk([...missing].sort(), ROOTCAUSE_SCORES_CHUNK_SIZE);
- [...missing].sort().forEach((urn, i) => {
- const url = this._makeUrl('metricAnalysis', requestContext, [urn]);
+ chunks.forEach((urns, i) => {
+ const url = this._makeUrl('metricAnalysis', requestContext, urns);
fetcher.fetch(url, ROOTCAUSE_SCORES_PRIORITY, i)
.then(checkStatus)
- .then(res => this._extractScores(res, [urn]))
+ .then(res => this._extractScores(res, urns))
.then(res => this._complete(requestContext, res))
- .catch(error => this._handleError([urn], error));
+ .catch(error => this._handleError(urns, error));
});
},
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
index a8b2b3f..11e9e2e 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/dashboard/resources/v2/RootCauseMetricResource.java
@@ -16,6 +16,8 @@
package com.linkedin.thirdeye.dashboard.resources.v2;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
import com.linkedin.thirdeye.api.Constants;
import com.linkedin.thirdeye.api.TimeGranularity;
import com.linkedin.thirdeye.dataframe.DataFrame;
@@ -33,6 +35,7 @@ import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -51,6 +54,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
@@ -154,7 +158,7 @@ public class RootCauseMetricResource {
}
/**
- * Returns a list of aggregate value for the specified metric and time
range, and (optionally) a list of offset.
+ * Returns a list of aggregate value for the specified metric and time
range, and a list of offset.
* Aligns time stamps if necessary and returns NaN if no data is available
for the given time range.
*
* @param urn metric urn
@@ -180,21 +184,16 @@ public class RootCauseMetricResource {
throws Exception {
List<Double> aggregateValues = new ArrayList<>();
+ if (StringUtils.isBlank(timezone)) {
+ timezone = TIMEZONE_DEFAULT;
+ }
+
offsets = ResourceUtils.parseListParams(offsets);
List<MetricSlice> slices = new ArrayList<>();
Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
Map<String, Baseline> offsetToRange = new HashMap<>();
for (String offset : offsets) {
- // Put all metric slices together
- if (StringUtils.isBlank(offset)) {
- offset = OFFSET_DEFAULT;
- }
-
- if (StringUtils.isBlank(timezone)) {
- timezone = TIMEZONE_DEFAULT;
- }
-
MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end), timezone);
offsetToBaseSlice.put(offset, baseSlice);
@@ -223,6 +222,76 @@ public class RootCauseMetricResource {
}
/**
+ * Returns a map of lists of aggregate values for the specified metrics and
time range, and a list of offsets.
+ * Aligns time stamps if necessary and returns NaN if no data is available
for the given time range.
+ *
+ * @param urns metric urns
+ * @param start start time (in millis)
+ * @param end end time (in millis)
+ * @param offsets A list of offset identifiers (e.g. "current", "wo2w")
+ * @param timezone timezone identifier (e.g. "America/Los_Angeles")
+ *
+ * @see BaselineParsingUtils#parseOffset(String, String) supported offsets
+ *
+ * @return map of lists (keyed by urn) of aggregate values, or NaN if data
not available
+ * @throws Exception on catch-all execution failure
+ */
+ @GET
+ @Path("/aggregate/chunk")
+ @ApiOperation(value = "Returns a map of lists (keyed by urn) of aggregate
value for the specified metrics and time range, and offsets.")
+ public Map<String, Collection<Double>> getAggregateChunk(
+ @ApiParam(value = "metric urns", required = true) @QueryParam("urns")
@NotNull List<String> urns,
+ @ApiParam(value = "start time (in millis)", required = true)
@QueryParam("start") @NotNull long start,
+ @ApiParam(value = "end time (in millis)", required = true)
@QueryParam("end") @NotNull long end,
+ @ApiParam(value = "A list of offset identifier separated by comma (e.g.
\"current\", \"wo2w\")") @QueryParam("offsets") List<String> offsets,
+ @ApiParam(value = "timezone identifier (e.g. \"America/Los_Angeles\")")
@QueryParam("timezone") String timezone)
+ throws Exception {
+ ListMultimap<String, Double> aggregateValues = ArrayListMultimap.create();
+
+ if (StringUtils.isBlank(timezone)) {
+ timezone = TIMEZONE_DEFAULT;
+ }
+
+ urns = ResourceUtils.parseListParams(urns);
+ offsets = ResourceUtils.parseListParams(offsets);
+ List<MetricSlice> slices = new ArrayList<>();
+
+ Map<String, MetricSlice> offsetToBaseSlice = new HashMap<>();
+ Map<Pair<String, String>, Baseline> tupleToRange = new HashMap<>();
+ for (String urn : urns) {
+ for (String offset : offsets) {
+ MetricSlice baseSlice = alignSlice(makeSlice(urn, start, end),
timezone);
+ offsetToBaseSlice.put(offset, baseSlice);
+
+ Baseline range = parseOffset(offset, timezone);
+ tupleToRange.put(Pair.of(urn, offset), range);
+
+ List<MetricSlice> currentSlices = range.scatter(baseSlice);
+
+ slices.addAll(currentSlices);
+ logSlices(baseSlice, currentSlices);
+ }
+ }
+
+ // Fetch all aggregates
+ Map<MetricSlice, DataFrame> data = fetchAggregates(slices);
+
+ // Pick the results
+ for (String urn : urns) {
+ for (String offset : offsets) {
+ DataFrame result = tupleToRange.get(Pair.of(urn,
offset)).gather(offsetToBaseSlice.get(offset), data);
+ if (result.isEmpty()) {
+ aggregateValues.put(urn, Double.NaN);
+ } else {
+ aggregateValues.put(urn, result.getDouble(COL_VALUE, 0));
+ }
+ }
+ }
+
+ return aggregateValues.asMap();
+ }
+
+ /**
* Returns a breakdown (de-aggregation) of the specified metric and time
range, and (optionally) offset.
* Aligns time stamps if necessary and omits null values.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]