This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f815e2e [TE] detection - align metric slices (#3981)
f815e2e is described below
commit f815e2ea569dfe63d74e69e8dff69fe2f5a2b559
Author: Jihao Zhang <[email protected]>
AuthorDate: Mon Mar 18 17:30:32 2019 -0700
[TE] detection - align metric slices (#3981)
Align metric slices to the data granularity.
---
.../thirdeye/detection/DefaultDataProvider.java | 42 ++++++++++++++++++++--
.../detection/wrapper/AnomalyDetectorWrapper.java | 2 +-
2 files changed, 41 insertions(+), 3 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index a7e9026..935b629 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -39,6 +39,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -51,10 +53,12 @@ import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.comparison.Row;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
+import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,11 +158,15 @@ public class DefaultDataProvider implements DataProvider {
@Override
public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice>
slices) {
try {
- Map<MetricSlice, DataFrame> cacheResult =
DETECTION_TIME_SERIES_CACHE.getAll(slices);
+ Map<MetricSlice, MetricSlice> alignedMetricSlicesToOriginalSlice = new
HashMap<>();
+ for (MetricSlice slice: slices) {
+ alignedMetricSlicesToOriginalSlice.put(alignSlice(slice), slice);
+ }
+ Map<MetricSlice, DataFrame> cacheResult =
DETECTION_TIME_SERIES_CACHE.getAll(alignedMetricSlicesToOriginalSlice.keySet());
Map<MetricSlice, DataFrame> timeseriesResult = new HashMap<>();
for (Map.Entry<MetricSlice, DataFrame> entry : cacheResult.entrySet()){
// make a copy of the result so that cache won't be contaminated by
client code
- timeseriesResult.put(entry.getKey(), entry.getValue().copy());
+
timeseriesResult.put(alignedMetricSlicesToOriginalSlice.get(entry.getKey()),
entry.getValue().copy());
}
return timeseriesResult;
} catch (Exception e) {
@@ -314,6 +322,36 @@ public class DefaultDataProvider implements DataProvider {
return diff > 0 ? diff : 0;
}
+ /**
+ * Aligns a metric slice based on its granularity, or the dataset
granularity.
+ *
+ * @param slice metric slice
+ * @return aligned metric slice
+ */
+ private MetricSlice alignSlice(MetricSlice slice) {
+ MetricConfigDTO metric = this.metricDAO.findById(slice.getMetricId());
+ if (metric == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve
metric id %d", slice.getMetricId()));
+ }
+
+ DatasetConfigDTO dataset =
this.datasetDAO.findByDataset(metric.getDataset());
+ if (dataset == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve
dataset '%s' for metric id %d", metric.getDataset(), slice.getMetricId()));
+ }
+
+ TimeGranularity granularity = dataset.bucketTimeGranularity();
+ if (!MetricSlice.NATIVE_GRANULARITY.equals(slice.getGranularity())) {
+ granularity = slice.getGranularity();
+ }
+
+ // align to time buckets and request time zone
+ long timeGranularity = granularity.toMillis();
+ long start = (slice.getStart() / timeGranularity) * timeGranularity;
+ long end = ((slice.getEnd() + timeGranularity - 1) / timeGranularity) *
timeGranularity;
+
+ return slice.withStart(start).withEnd(end).withGranularity(granularity);
+ }
+
public static void cleanCache() {
if (DETECTION_TIME_SERIES_CACHE != null) {
DETECTION_TIME_SERIES_CACHE.cleanUp();
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 8450cf6..eb7117e 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -153,7 +153,7 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
LOG.info("[New Pipeline] running detection for config {} metricUrn {}.
start time {}, end time {}", config.getId(), metricUrn, window.getStart(),
window.getEnd());
long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window,
this.metricUrn);
- LOG.info("[New Pipeline] run anomaly detection for window {} - {} used
{} milliseconds", window.getStart(), window.getEnd(),
System.currentTimeMillis() - ts);
+ LOG.info("[New Pipeline] run anomaly detection for window {} - {} used
{} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(),
System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to
{} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]