This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch merger_fix in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a0fe925973d3767535181682fb8d4d649ec8f915 Author: Xiaohui Sun <[email protected]> AuthorDate: Mon Aug 26 14:06:30 2019 -0700 [TE] Merger fix to consider anomalies generated earlier --- .../thirdeye/detection/algorithm/MergeWrapper.java | 42 ++++++++++++++-------- .../detection/algorithm/MergeWrapperTest.java | 32 +++++++++-------- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java index 2456230..0d4007e 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java @@ -63,10 +63,14 @@ public class MergeWrapper extends DetectionPipeline { protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new Comparator<MergedAnomalyResultDTO>() { @Override public int compare(MergedAnomalyResultDTO o1, MergedAnomalyResultDTO o2) { - // earlier + // earlier for start time int res = Long.compare(o1.getStartTime(), o2.getStartTime()); if (res != 0) return res; + // later for end time + res = Long.compare(o2.getStartTime(), o1.getStartTime()); + if (res != 0) return res; + // pre-existing if (o1.getId() == null && o2.getId() != null) return 1; if (o1.getId() != null && o2.getId() == null) return -1; @@ -162,15 +166,15 @@ public class MergeWrapper extends DetectionPipeline { // Merge new anomalies into existing anomalies. Return the anomalies that need to update or add. // If it is existing anomaly and not updated then it is not returned. protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) { - List<MergedAnomalyResultDTO> input = new ArrayList<>(enforceMaxDuration(anomalies)); - Collections.sort(input, COMPARATOR); + List<MergedAnomalyResultDTO> inputs = new ArrayList<>(anomalies); + Collections.sort(inputs, COMPARATOR); // stores all the existing anomalies that need to modified - Set<Long> modifiedExistingIds = new HashSet<>(); - List<MergedAnomalyResultDTO> output = new ArrayList<>(); + Set<MergedAnomalyResultDTO> modifiedExistingAnomalies = new HashSet<>(); + Set<MergedAnomalyResultDTO> retainedNewAnomalies = new HashSet<>(); Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>(); - for (MergedAnomalyResultDTO anomaly : input) { + for (MergedAnomalyResultDTO anomaly : inputs) { if (anomaly.isChild()) { continue; } @@ -185,7 +189,7 @@ public class MergeWrapper extends DetectionPipeline { // parents.put(key, anomaly); if (!isExistingAnomaly(anomaly)) { - output.add(anomaly); + retainedNewAnomalies.add(anomaly); } } else if (anomaly.getEndTime() <= parent.getEndTime() || anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) { // fully cover @@ -204,7 +208,15 @@ public class MergeWrapper extends DetectionPipeline { properties.putAll(anomaly.getProperties()); parent.setProperties(properties); if (isExistingAnomaly(parent)) { - modifiedExistingIds.add(parent.getId()); + modifiedExistingAnomalies.add(parent); + } else { + // merge existing anomaly to new anomaly, set id to new anomaly + // parent (new) |-------------------| + // anomaly (existing) |-------------| + if (isExistingAnomaly(anomaly)) { + parent.setId(anomaly.getId()); + anomaly.setId(null); + } } } else if (parent.getEndTime() >= anomaly.getStartTime()) { // mergeable but exceeds maxDuration, then truncate @@ -219,24 +231,24 @@ public class MergeWrapper extends DetectionPipeline { parents.put(key, anomaly); if (!isExistingAnomaly(anomaly)) { - output.add(anomaly); + retainedNewAnomalies.add(anomaly); } if (isExistingAnomaly(parent)) { - modifiedExistingIds.add(parent.getId()); + modifiedExistingAnomalies.add(parent); } } else { // default to new parent if merge not possible parents.put(key, anomaly); if (!isExistingAnomaly(anomaly)) { - output.add(anomaly); + retainedNewAnomalies.add(anomaly); } } } - // add modified existing anomalies into output - output.addAll(input.stream().filter(x -> x.getId()!= null && modifiedExistingIds.contains(x.getId())).collect(Collectors.toList())); - - return new ArrayList<>(output); + modifiedExistingAnomalies.addAll(retainedNewAnomalies); + Collection<MergedAnomalyResultDTO> splitAnomalies + = enforceMaxDuration(new ArrayList<>(modifiedExistingAnomalies)); + return new ArrayList<>(splitAnomalies); } /* diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java index 6e5acb0..5a008ab 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java @@ -63,11 +63,11 @@ public class MergeWrapperTest { Here are the anomalies in the test. Existing anomalies: - 0 1000 1500 2000 + 100 1000 1500 2000 |-----------------| |----------| New anomalies: - 1100 1200 2200 2300 - |-----| |-----| + 50 1200 2200 2300 + |-------------------------| |-----| 1150 1250 2400 2800 |-----| |------------| */ @@ -100,12 +100,12 @@ public class MergeWrapperTest { List<MergedAnomalyResultDTO> existing = new ArrayList<>(); // For existing anomalies add ids. - existing.add(setAnomalyId(makeAnomaly(0, 1000), 0)); + existing.add(setAnomalyId(makeAnomaly(100, 1000), 0)); existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1)); this.outputs = new ArrayList<>(); - this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1100, 1200), makeAnomaly(2200, 2300)), 2900)); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(50, 1200), makeAnomaly(2200, 2300)), 2900)); this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1150, 1250), makeAnomaly(2400, 2800)), 3000)); @@ -121,7 +121,7 @@ public class MergeWrapperTest { DetectionPipelineResult output = this.wrapper.run(); Assert.assertEquals(output.getAnomalies().size(), 3); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0L))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); Assert.assertEquals(output.getLastTimestamp(), 3000); @@ -130,6 +130,7 @@ public class MergeWrapperTest { @Test public void testMergerMaxGap() throws Exception { this.config.getProperties().put(PROP_MAX_GAP, 100); + this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(0, 1200)), 2900)); this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000); DetectionPipelineResult output = this.wrapper.run(); @@ -137,8 +138,8 @@ public class MergeWrapperTest { // anomaly [1500, 2000] is not modified Assert.assertEquals(output.getAnomalies().size(), 2); Assert.assertEquals(output.getLastTimestamp(), 3000); - // anomalies [1100, 1200] and [1150,1250] are merged into [0, 1000] - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 1250), 0))); + // anomalies [100, 1000] and [1150,1250] are merged into [50, 1200] + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); // anomalies [2200, 2300] and [2400, 2800] are merged Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800))); } @@ -153,7 +154,7 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 3); Assert.assertEquals(output.getLastTimestamp(), 3000); - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 1250), 0))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); } @@ -176,7 +177,7 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 4); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 1250), 0))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800))); @@ -200,7 +201,7 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 4); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 1250), 0))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800))); @@ -221,9 +222,10 @@ public class MergeWrapperTest { this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000); DetectionPipelineResult output = this.wrapper.run(); - Assert.assertEquals(output.getAnomalies().size(), 5); + Assert.assertEquals(output.getAnomalies().size(), 6); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1200), 0))); + Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1200, 1250))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400))); @@ -284,7 +286,7 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 6); Assert.assertEquals(output.getLastTimestamp(), 3000); - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 1250), 0))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 1250), 0))); Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 2300), 1))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800))); Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1150, 1300, Collections.singletonMap("key", "value")))); @@ -315,7 +317,7 @@ public class MergeWrapperTest { Assert.assertEquals(output.getAnomalies().size(), 1); Assert.assertEquals(output.getLastTimestamp(), 3700); - Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 2800), 0))); + Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 2800), 0))); Assert.assertTrue(output.getAnomalies().get(0).getProperties().get(propertyKey).equals(propertyValue)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
