This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a969ede [TE] Add maxValueDaily and maxValueHourly data filters (#5528)
a969ede is described below
commit a969ede4f2fd5194ba615f1c7782dce462ef722e
Author: Akshay Rai <[email protected]>
AuthorDate: Mon Jun 22 00:37:50 2020 -0700
[TE] Add maxValueDaily and maxValueHourly data filters (#5528)
---
.../pinot/thirdeye/common/utils/MetricUtils.java | 40 ++++++++++++++++++++++
.../detection/algorithm/DimensionWrapper.java | 27 ++++++++++++---
.../components/ThresholdRuleAnomalyFilter.java | 16 ++++-----
.../detection/algorithm/DimensionWrapperTest.java | 40 ++++++++++------------
.../MergeDimensionThresholdIntegrationTest.java | 13 +++++--
5 files changed, 97 insertions(+), 39 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
new file mode 100644
index 0000000..bf632e5
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/common/utils/MetricUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.common.utils;
+
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+
+
+/**
+ * Utility class for ThirdEye metrics
+ */
+public class MetricUtils {
+
+ private MetricUtils() {}
+
+ /**
+ * check if the metric aggregation is cumulative
+ */
+ public static boolean isAggCumulative(MetricConfigDTO metric) {
+ MetricAggFunction aggFunction = metric.getDefaultAggFunction();
+ return aggFunction.equals(MetricAggFunction.SUM) ||
aggFunction.equals(MetricAggFunction.COUNT);
+ }
+}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index d9d034d..4e3f0b5 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.MapUtils;
+import org.apache.pinot.thirdeye.common.utils.MetricUtils;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
@@ -52,7 +53,6 @@ import org.apache.pinot.thirdeye.detection.PredictionResult;
import org.apache.pinot.thirdeye.detection.cache.CacheConfig;
import
org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import org.joda.time.DateTime;
@@ -101,7 +101,9 @@ public class DimensionWrapper extends DetectionPipeline {
private final double minContribution;
private final double minValue;
private final double minValueHourly;
+ private final double maxValueHourly;
private final double minValueDaily;
+ private final double maxValueDaily;
private final double minLiveZone;
private final double liveBucketPercentageThreshold;
private final Period lookback;
@@ -122,10 +124,13 @@ public class DimensionWrapper extends DetectionPipeline {
// the metric used in dimension exploration
this.metricUrn = MapUtils.getString(config.getProperties(), "metricUrn",
null);
+
this.minContribution = MapUtils.getDoubleValue(config.getProperties(),
"minContribution", Double.NaN);
this.minValue = MapUtils.getDoubleValue(config.getProperties(),
"minValue", Double.NaN);
this.minValueHourly = MapUtils.getDoubleValue(config.getProperties(),
"minValueHourly", Double.NaN);
+ this.maxValueHourly = MapUtils.getDoubleValue(config.getProperties(),
"maxValueHourly", Double.NaN);
this.minValueDaily = MapUtils.getDoubleValue(config.getProperties(),
"minValueDaily", Double.NaN);
+ this.maxValueDaily = MapUtils.getDoubleValue(config.getProperties(),
"maxValueDaily", Double.NaN);
this.k = MapUtils.getIntValue(config.getProperties(), "k", -1);
this.dimensions =
ConfigUtils.getList(config.getProperties().get("dimensions"));
this.lookback =
ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "lookback",
"1w"));
@@ -168,6 +173,7 @@ public class DimensionWrapper extends DetectionPipeline {
Period testPeriod = new Period(this.start, this.end);
MetricEntity metric = MetricEntity.fromURN(this.metricUrn);
+ MetricConfigDTO metricConfig =
this.provider.fetchMetrics(Collections.singleton(metric.getId())).get(metric.getId());
MetricSlice slice = MetricSlice.from(metric.getId(),
this.start.getMillis(), this.end.getMillis(), metric.getFilters());
// We can push down the top k filter if min contribution is not defined.
@@ -195,14 +201,25 @@ public class DimensionWrapper extends DetectionPipeline {
aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).gte(this.minValue)).dropNull();
}
+ double hourlyMultiplier = MetricUtils.isAggCumulative(metricConfig) ?
+ (TimeUnit.HOURS.toMillis(1) / (double)
testPeriod.toDurationFrom(start).getMillis()) : 1.0;
+ double dailyMultiplier = MetricUtils.isAggCumulative(metricConfig) ?
+ (TimeUnit.DAYS.toMillis(1) / (double)
testPeriod.toDurationFrom(start).getMillis()) : 1.0;
+
if (!Double.isNaN(this.minValueHourly)) {
- double multiplier = TimeUnit.HOURS.toMillis(1) / (double)
testPeriod.toDurationFrom(start).getMillis();
- aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueHourly)).dropNull();
+ aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).gte(this.minValueHourly)).dropNull();
+ }
+
+ if (!Double.isNaN(this.maxValueHourly)) {
+ aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(hourlyMultiplier).lte(this.maxValueHourly)).dropNull();
}
if (!Double.isNaN(this.minValueDaily)) {
- double multiplier = TimeUnit.DAYS.toMillis(1) / (double)
testPeriod.toDurationFrom(start).getMillis();
- aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(multiplier).gte(this.minValueDaily)).dropNull();
+ aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).gte(this.minValueDaily)).dropNull();
+ }
+
+ if (!Double.isNaN(this.maxValueDaily)) {
+ aggregates =
aggregates.filter(aggregates.getDoubles(COL_VALUE).multiply(dailyMultiplier).lte(this.maxValueDaily)).dropNull();
}
aggregates = aggregates.sortedBy(COL_VALUE).reverse();
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
index 68a1369..d59b19b 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -21,7 +21,7 @@ package org.apache.pinot.thirdeye.detection.components;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.common.utils.MetricUtils;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.detection.InputDataFetcher;
@@ -57,11 +57,12 @@ public class ThresholdRuleAnomalyFilter implements
AnomalyFilter<ThresholdRuleFi
double currentValue = anomaly.getAvgCurrentVal();
Interval anomalyInterval = new Interval(anomaly.getStartTime(),
anomaly.getEndTime());
+
// apply multiplier if the metric is aggregated by SUM or COUNT
- double hourlyMultiplier =
- isAdditive(metric) ? (TimeUnit.HOURS.toMillis(1) / (double)
anomalyInterval.toDurationMillis()) : 1.0;
- double dailyMultiplier =
- isAdditive(metric) ? (TimeUnit.DAYS.toMillis(1) / (double)
anomalyInterval.toDurationMillis()) : 1.0;
+ double hourlyMultiplier = MetricUtils.isAggCumulative(metric) ?
+ (TimeUnit.HOURS.toMillis(1) / (double)
anomalyInterval.toDurationMillis()) : 1.0;
+ double dailyMultiplier = MetricUtils.isAggCumulative(metric) ?
+ (TimeUnit.DAYS.toMillis(1) / (double)
anomalyInterval.toDurationMillis()) : 1.0;
if (!Double.isNaN(this.minValue) && currentValue < this.minValue
|| !Double.isNaN(this.maxValue) && currentValue > this.maxValue) {
@@ -82,11 +83,6 @@ public class ThresholdRuleAnomalyFilter implements
AnomalyFilter<ThresholdRuleFi
return true;
}
- private boolean isAdditive(MetricConfigDTO metric) {
- MetricAggFunction aggFunction = metric.getDefaultAggFunction();
- return aggFunction.equals(MetricAggFunction.SUM) ||
aggFunction.equals(MetricAggFunction.COUNT);
- }
-
@Override
public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher)
{
this.minValueHourly = spec.getMinValueHourly();
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
index b1008c1..98e2462 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapperTest.java
@@ -17,6 +17,7 @@
package org.apache.pinot.thirdeye.detection.algorithm;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
import org.apache.pinot.thirdeye.dataframe.StringSeries;
@@ -76,10 +77,18 @@ public class DimensionWrapperTest {
private Map<String, Object> nestedProperties;
private Map<MetricSlice, DataFrame> aggregates;
+ private MetricConfigDTO createTestMetricConfig(long id) {
+ MetricConfigDTO metric = new MetricConfigDTO();
+ metric.setDataset("TEST");
+ metric.setId(id);
+ metric.setDefaultAggFunction(MetricAggFunction.SUM);
+ return metric;
+ }
+
@BeforeMethod
public void beforeMethod() {
this.aggregates = new HashMap<>();
- this.aggregates.put(MetricSlice.from(1, 10, 15),
+ this.aggregates.put(MetricSlice.from(2, 10, 15),
new DataFrame()
.addSeries("a", StringSeries.buildFrom("1", "1", "1", "1", "1",
"2", "2", "2", "2", "2"))
.addSeries("b", StringSeries.buildFrom("1", "2", "1", "2", "3",
"1", "2", "1", "2", "3"))
@@ -93,18 +102,10 @@ public class DimensionWrapperTest {
dataset.setDataset("TEST");
dataset.setNonAdditiveBucketSize(5);
dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS);
- MetricConfigDTO metric1 = new MetricConfigDTO();
- metric1.setDataset("TEST");
- metric1.setId(2L);
- MetricConfigDTO metric2 = new MetricConfigDTO();
- metric2.setDataset("TEST");
- metric2.setId(10L);
- MetricConfigDTO metric3 = new MetricConfigDTO();
- metric3.setDataset("TEST");
- metric3.setId(11L);
- MetricConfigDTO metric4 = new MetricConfigDTO();
- metric4.setDataset("TEST");
- metric4.setId(12L);
+ MetricConfigDTO metric1 = createTestMetricConfig(2L);
+ MetricConfigDTO metric2 = createTestMetricConfig(10L);
+ MetricConfigDTO metric3 = createTestMetricConfig(11L);
+ MetricConfigDTO metric4 = createTestMetricConfig(12L);
this.provider = new MockDataProvider()
.setAggregates(this.aggregates)
.setMetrics(Arrays.asList(metric1, metric2, metric3, metric4))
@@ -117,7 +118,7 @@ public class DimensionWrapperTest {
this.nestedProperties.put("key", "value");
this.properties = new HashMap<>();
- this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1");
+ this.properties.put(PROP_METRIC_URN, "thirdeye:metric:2");
this.properties.put(PROP_DIMENSIONS, Arrays.asList("a", "b"));
this.properties.put(PROP_NESTED_METRIC_URN_KEY,
PROP_NESTED_METRIC_URN_KEY_VALUE);
this.properties.put(PROP_NESTED_METRIC_URNS,
PROP_NESTED_METRIC_URN_VALUES);
@@ -238,16 +239,13 @@ public class DimensionWrapperTest {
dataset.setDataset("TEST");
dataset.setNonAdditiveBucketSize(5);
dataset.setNonAdditiveBucketUnit(TimeUnit.MILLISECONDS);
- MetricConfigDTO metric1 = new MetricConfigDTO();
- metric1.setDataset("TEST");
- metric1.setId(10L);
- MetricConfigDTO metric2 = new MetricConfigDTO();
- metric2.setDataset("TEST");
- metric2.setId(11L);
+ MetricConfigDTO metric0 = createTestMetricConfig(2L);
+ MetricConfigDTO metric1 = createTestMetricConfig(10L);
+ MetricConfigDTO metric2 = createTestMetricConfig(11L);
this.provider = new MockDataProvider()
.setAggregates(this.aggregates)
- .setMetrics(Arrays.asList(metric1, metric2))
+ .setMetrics(Arrays.asList(metric0, metric1, metric2))
.setDatasets(Collections.singletonList(dataset))
.setAnomalies(Collections.emptyList())
.setLoader(new MockPipelineLoader(this.runs, this.outputs));
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
index ad490fa..af024f8 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
@@ -45,7 +45,8 @@ import org.testng.annotations.Test;
public class MergeDimensionThresholdIntegrationTest {
private static final ObjectMapper MAPPER = new ObjectMapper();
- private static final String METRIC = "myMetric2";
+ private static final String METRIC1 = "myMetric1";
+ private static final String METRIC2 = "myMetric2";
private static final String DATASET = "myDataset2";
private static final Map<String, String> ONE_TWO = new HashMap<>();
static {
@@ -60,6 +61,7 @@ public class MergeDimensionThresholdIntegrationTest {
private DataFrame data1;
private DataFrame data2;
+ private MetricConfigDTO metric1;
private MetricConfigDTO metric2;
private DatasetConfigDTO dataset;
@@ -90,13 +92,18 @@ public class MergeDimensionThresholdIntegrationTest {
this.timeseries = new HashMap<>();
this.timeseries.put(MetricSlice.from(2, 0, 18000), this.data2);
+ this.metric1 = new MetricConfigDTO();
+ this.metric1.setId(1L);
+ this.metric1.setName(METRIC1);
+ this.metric1.setDataset(DATASET);
this.metric2 = new MetricConfigDTO();
this.metric2.setId(2L);
- this.metric2.setName(METRIC);
+ this.metric2.setName(METRIC2);
this.metric2.setDataset(DATASET);
this.metrics = new ArrayList<>();
this.metrics.add(this.metric2);
+ this.metrics.add(this.metric1);
this.dataset = new DatasetConfigDTO();
this.dataset.setId(3L);
@@ -144,7 +151,7 @@ public class MergeDimensionThresholdIntegrationTest {
dimensions.put(entry.getKey(), entry.getValue());
}
- MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L,
start, end, METRIC, DATASET, dimensions);
+ MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L,
start, end, METRIC2, DATASET, dimensions);
anomaly.setMetricUrn(metricUrn);
return anomaly;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]