This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch merger_fix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a0fe925973d3767535181682fb8d4d649ec8f915
Author: Xiaohui Sun <[email protected]>
AuthorDate: Mon Aug 26 14:06:30 2019 -0700

    [TE] Merger fix to consider anomalies generated earlier
---
 .../thirdeye/detection/algorithm/MergeWrapper.java | 42 ++++++++++++++--------
 .../detection/algorithm/MergeWrapperTest.java      | 32 +++++++++--------
 2 files changed, 44 insertions(+), 30 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 2456230..0d4007e 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -63,10 +63,14 @@ public class MergeWrapper extends DetectionPipeline {
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new 
Comparator<MergedAnomalyResultDTO>() {
     @Override
     public int compare(MergedAnomalyResultDTO o1, MergedAnomalyResultDTO o2) {
-      // earlier
+      // earlier for start time
       int res = Long.compare(o1.getStartTime(), o2.getStartTime());
       if (res != 0) return res;
 
+      // later for end time
+      res = Long.compare(o2.getStartTime(), o1.getStartTime());
+      if (res != 0) return res;
+
       // pre-existing
       if (o1.getId() == null && o2.getId() != null) return 1;
       if (o1.getId() != null && o2.getId() == null) return -1;
@@ -162,15 +166,15 @@ public class MergeWrapper extends DetectionPipeline {
   // Merge new anomalies into existing anomalies. Return the anomalies that 
need to update or add.
   // If it is existing anomaly and not updated then it is not returned.
   protected List<MergedAnomalyResultDTO> 
merge(Collection<MergedAnomalyResultDTO> anomalies) {
-    List<MergedAnomalyResultDTO> input = new 
ArrayList<>(enforceMaxDuration(anomalies));
-    Collections.sort(input, COMPARATOR);
+    List<MergedAnomalyResultDTO> inputs = new ArrayList<>(anomalies);
+    Collections.sort(inputs, COMPARATOR);
 
     // stores all the existing anomalies that need to modified
-    Set<Long> modifiedExistingIds = new HashSet<>();
-    List<MergedAnomalyResultDTO> output = new ArrayList<>();
+    Set<MergedAnomalyResultDTO> modifiedExistingAnomalies = new HashSet<>();
+    Set<MergedAnomalyResultDTO> retainedNewAnomalies = new HashSet<>();
 
     Map<AnomalyKey, MergedAnomalyResultDTO> parents = new HashMap<>();
-    for (MergedAnomalyResultDTO anomaly : input) {
+    for (MergedAnomalyResultDTO anomaly : inputs) {
       if (anomaly.isChild()) {
         continue;
       }
@@ -185,7 +189,7 @@ public class MergeWrapper extends DetectionPipeline {
         //
         parents.put(key, anomaly);
         if (!isExistingAnomaly(anomaly)) {
-          output.add(anomaly);
+          retainedNewAnomalies.add(anomaly);
         }
       } else if (anomaly.getEndTime() <= parent.getEndTime() || 
anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) {
         // fully cover
@@ -204,7 +208,15 @@ public class MergeWrapper extends DetectionPipeline {
         properties.putAll(anomaly.getProperties());
         parent.setProperties(properties);
         if (isExistingAnomaly(parent)) {
-          modifiedExistingIds.add(parent.getId());
+          modifiedExistingAnomalies.add(parent);
+        } else {
+          // merge existing anomaly to new anomaly, set id to new anomaly
+          //  parent (new) |-------------------|
+          //         anomaly (existing) |-------------|
+          if (isExistingAnomaly(anomaly)) {
+            parent.setId(anomaly.getId());
+            anomaly.setId(null);
+          }
         }
       } else if (parent.getEndTime() >= anomaly.getStartTime()) {
         // mergeable but exceeds maxDuration, then truncate
@@ -219,24 +231,24 @@ public class MergeWrapper extends DetectionPipeline {
 
         parents.put(key, anomaly);
         if (!isExistingAnomaly(anomaly)) {
-          output.add(anomaly);
+          retainedNewAnomalies.add(anomaly);
         }
         if (isExistingAnomaly(parent)) {
-          modifiedExistingIds.add(parent.getId());
+          modifiedExistingAnomalies.add(parent);
         }
       } else {
         // default to new parent if merge not possible
         parents.put(key, anomaly);
         if (!isExistingAnomaly(anomaly)) {
-          output.add(anomaly);
+          retainedNewAnomalies.add(anomaly);
         }
       }
     }
 
-    // add modified existing anomalies into output
-    output.addAll(input.stream().filter(x -> x.getId()!= null && 
modifiedExistingIds.contains(x.getId())).collect(Collectors.toList()));
-
-    return new ArrayList<>(output);
+    modifiedExistingAnomalies.addAll(retainedNewAnomalies);
+    Collection<MergedAnomalyResultDTO> splitAnomalies
+        = enforceMaxDuration(new ArrayList<>(modifiedExistingAnomalies));
+    return new ArrayList<>(splitAnomalies);
   }
 
   /*
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index 6e5acb0..5a008ab 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -63,11 +63,11 @@ public class MergeWrapperTest {
     Here are the anomalies in the test.
 
     Existing anomalies:
-    0                1000             1500       2000
+   100                1000             1500       2000
     |-----------------|                |----------|
     New  anomalies:
-                       1100  1200                     2200  2300
-                        |-----|                         |-----|
+   50                    1200                     2200  2300
+   |-------------------------|                        |-----|
                            1150 1250                            2400          
2800
                             |-----|                               
|------------|
   */
@@ -100,12 +100,12 @@ public class MergeWrapperTest {
 
     List<MergedAnomalyResultDTO> existing = new ArrayList<>();
     // For existing anomalies add ids.
-    existing.add(setAnomalyId(makeAnomaly(0, 1000), 0));
+    existing.add(setAnomalyId(makeAnomaly(100, 1000), 0));
     existing.add(setAnomalyId(makeAnomaly(1500, 2000), 1));
 
     this.outputs = new ArrayList<>();
 
-    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1100, 
1200), makeAnomaly(2200, 2300)), 2900));
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(50, 
1200), makeAnomaly(2200, 2300)), 2900));
 
     this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(1150, 
1250), makeAnomaly(2400, 2800)), 3000));
 
@@ -121,7 +121,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0L)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertEquals(output.getLastTimestamp(), 3000);
@@ -130,6 +130,7 @@ public class MergeWrapperTest {
   @Test
   public void testMergerMaxGap() throws Exception {
     this.config.getProperties().put(PROP_MAX_GAP, 100);
+    this.outputs.add(new MockPipelineOutput(Arrays.asList(makeAnomaly(0, 
1200)), 2900));
 
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 3000);
     DetectionPipelineResult output = this.wrapper.run();
@@ -137,8 +138,8 @@ public class MergeWrapperTest {
     // anomaly [1500, 2000] is not modified
     Assert.assertEquals(output.getAnomalies().size(), 2);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    // anomalies [1100, 1200] and [1150,1250] are merged into [0, 1000]
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
1250), 0)));
+    // anomalies [100, 1000] and [1150,1250] are merged into [50, 1200]
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0)));
     // anomalies [2200, 2300] and [2400, 2800] are merged
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800)));
   }
@@ -153,7 +154,7 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
1250), 0)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0)));
     
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 
2300), 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
   }
@@ -176,7 +177,7 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
1250), 0)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0)));
     
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 
2300), 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
@@ -200,7 +201,7 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
1250), 0)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0)));
     
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 
2300), 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(3650, 3800)));
@@ -221,9 +222,10 @@ public class MergeWrapperTest {
     this.wrapper = new MergeWrapper(this.provider, this.config, 1000, 4000);
     DetectionPipelineResult output = this.wrapper.run();
 
-    Assert.assertEquals(output.getAnomalies().size(), 5);
+    Assert.assertEquals(output.getAnomalies().size(), 6);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1100, 1250)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1200), 0)));
+    Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1200, 1250)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2900)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2900, 3400)));
@@ -284,7 +286,7 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 6);
     Assert.assertEquals(output.getLastTimestamp(), 3000);
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
1250), 0)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
1250), 0)));
     
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(1500, 
2300), 1)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1150, 1300, 
Collections.singletonMap("key", "value"))));
@@ -315,7 +317,7 @@ public class MergeWrapperTest {
 
     Assert.assertEquals(output.getAnomalies().size(), 1);
     Assert.assertEquals(output.getLastTimestamp(), 3700);
-    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(0, 
2800), 0)));
+    
Assert.assertTrue(output.getAnomalies().contains(setAnomalyId(makeAnomaly(50, 
2800), 0)));
     
Assert.assertTrue(output.getAnomalies().get(0).getProperties().get(propertyKey).equals(propertyValue));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to