Repository: incubator-eagle
Updated Branches:
  refs/heads/master d61d34698 -> 89d8e3a09


EAGLE-576: change dedup db store to per publishment rather than global

Author: Li, Garrett
Reviewer: ralphsu

This closes #463


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/89d8e3a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/89d8e3a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/89d8e3a0

Branch: refs/heads/master
Commit: 89d8e3a0935ac9d6dd04d8b9d9ccf6e3410c732e
Parents: d61d346
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Wed Sep 28 18:31:55 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Wed Sep 28 08:38:32 2016 -0700

----------------------------------------------------------------------
 .../alert/engine/publisher/dedup/DedupCache.java   | 17 ++++++++++-------
 .../alert/engine/publisher/dedup/DedupEntity.java  | 15 +++++++++++++--
 .../publisher/dedup/DedupEventsStoreFactory.java   |  4 ++--
 .../publisher/dedup/MongoDedupEventsStore.java     | 16 +++++++++++-----
 .../engine/publisher/dedup/TransformerUtils.java   | 12 ++++++++++--
 .../publisher/impl/AbstractPublishPlugin.java      |  2 +-
 .../publisher/dedup/DedupCacheStoreTest.java       | 13 +++++++++++--
 .../engine/publisher/dedup/DedupCacheTest.java     |  2 +-
 .../dedup/DefaultDedupWithoutStateTest.java        |  2 +-
 .../publisher/dedup/DefaultDeduplicatorTest.java   |  2 +-
 .../publisher/dedup/MongoDependencyBaseTest.java   |  2 +-
 11 files changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index 503a1ce..b15f93c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -52,8 +52,11 @@ public class DedupCache {
 
     private Config config;
 
-    public DedupCache(Config config) {
+    private String publishName;
+    
+    public DedupCache(Config config, String publishName) {
         this.config = config;
+        this.publishName = publishName;
         // only happens during startup, won't introduce perf issue here
         synchronized (caches) {
             if (caches.size() == 0) {
@@ -94,7 +97,7 @@ public class DedupCache {
             || System.currentTimeMillis() - lastUpdated > 
CACHE_MAX_EXPIRE_TIME_IN_DAYS * DateUtils.MILLIS_PER_DAY
             || events.size() <= 0) {
             lastUpdated = System.currentTimeMillis();
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
             events = accessor.getEvents();
         }
         return events;
@@ -108,7 +111,7 @@ public class DedupCache {
         if (this.contains(eventEniq)) {
             this.events.remove(eventEniq);
 
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
             accessor.remove(eventEniq);
         }
     }
@@ -177,7 +180,7 @@ public class DedupCache {
             LOG.info("Update dedup key {}, and value {}", eventEniq, 
dedupValue);
         }
         if (dedupValue != null) {
-            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
             accessor.add(eventEniq, events.get(eventEniq));
             LOG.info("Store dedup key {}, value {} to DB", eventEniq,
                 Joiner.on(",").join(events.get(eventEniq)));
@@ -193,11 +196,11 @@ public class DedupCache {
     }
 
     public void persistUpdatedEventUniq(EventUniq eventEniq) {
-        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config, this.publishName);
         accessor.add(eventEniq, events.get(eventEniq));
         LOG.info("Store dedup key {}, value {} to DB", eventEniq,
             Joiner.on(",").join(events.get(eventEniq)));
-    }
+    } 
 
     private DedupValue updateCount(EventUniq eventEniq) {
         ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
@@ -212,7 +215,7 @@ public class DedupCache {
                 dedupValue.getStateFieldValue(), dedupValue.getCount());
             if (dedupValue.getCount() > 0 && dedupValue.getCount() % 100 == 0) 
{
                 LOG.info(updateMsg);
-                DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(type, this.config);
+                DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(type, this.config, this.publishName);
                 accessor.add(eventEniq, dedupValues);
             } else if (LOG.isDebugEnabled()) {
                 LOG.debug(updateMsg);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
index 1989c45..86bc9b3 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java
@@ -24,19 +24,30 @@ import 
org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 
 public class DedupEntity {
 
+    private String publishName;
     private EventUniq eventEniq;
     private List<DedupValue> dedupValues = new ArrayList<DedupValue>();
 
-    public DedupEntity(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> 
dedupValues) {
+    public DedupEntity(String publishName, EventUniq eventEniq, 
ConcurrentLinkedDeque<DedupValue> dedupValues) {
+        this.publishName = publishName;
         this.eventEniq = eventEniq;
         this.dedupValues.addAll(dedupValues);
     }
 
-    public DedupEntity(EventUniq eventEniq, List<DedupValue> dedupValues) {
+    public DedupEntity(String publishName, EventUniq eventEniq, 
List<DedupValue> dedupValues) {
+        this.publishName = publishName;
         this.eventEniq = eventEniq;
         this.dedupValues = dedupValues;
     }
 
+    public String getPublishName() {
+        return publishName;
+    }
+
+    public void setPublishName(String publishName) {
+        this.publishName = publishName;
+    }
+
     public EventUniq getEventEniq() {
         return eventEniq;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
index 0a3b206..75d8e53 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEventsStoreFactory.java
@@ -34,14 +34,14 @@ public class DedupEventsStoreFactory {
         customizedStore = store;
     }
 
-    public static DedupEventsStore getStore(DedupEventsStoreType type, Config 
config) {
+    public static DedupEventsStore getStore(DedupEventsStoreType type, Config 
config, String publishName) {
         if (customizedStore != null) {
             return customizedStore;
         }
         switch (type) {
             case Mongo:
                 if (accessor == null) {
-                    accessor = new MongoDedupEventsStore(config);
+                    accessor = new MongoDedupEventsStore(config, publishName);
                 }
                 break;
             case ElasticSearch:

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
index 46a01b0..4140793 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -25,6 +25,7 @@ import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.bson.BsonDocument;
 import org.bson.BsonInt32;
 import org.bson.BsonInt64;
+import org.bson.BsonString;
 import org.bson.Document;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";
     public static final String DEDUP_COUNT = "count";
     public static final String DEDUP_FIRST_OCCURRENCE = "firstOccurrence";
+    public static final String DEDUP_PUBLISH_ID = "publishId";
 
     private static final ObjectMapper mapper = new ObjectMapper();
 
@@ -67,12 +69,14 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     private MongoClient client;
     private MongoDatabase db;
     private MongoCollection<Document> stateCollection;
+    private String publishName;
 
     private static final String DB_NAME = "ump_alert_dedup";
     private static final String ALERT_STATE_COLLECTION = "alert_dedup";
 
-    public MongoDedupEventsStore(Config config) {
+    public MongoDedupEventsStore(Config config, String publishName) {
         this.config = config;
+        this.publishName = publishName;
         this.connection = this.config.getString("connection");
         try {
             this.client = new MongoClient(new MongoClientURI(this.connection));
@@ -96,7 +100,9 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     public Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> getEvents() {
         try {
             Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> result = new 
ConcurrentHashMap<EventUniq, ConcurrentLinkedDeque<DedupValue>>();
-            stateCollection.find().forEach(new Block<Document>() {
+            BsonDocument filter = new BsonDocument();
+            filter.append(DEDUP_PUBLISH_ID, new BsonString(this.publishName));
+            stateCollection.find(filter).forEach(new Block<Document>() {
                 @Override
                 public void apply(final Document doc) {
                     DedupEntity entity = 
TransformerUtils.transform(DedupEntity.class, BsonDocument.parse(doc.toJson()));
@@ -116,9 +122,9 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     @Override
     public void add(EventUniq eventEniq, ConcurrentLinkedDeque<DedupValue> 
dedupStateValues) {
         try {
-            BsonDocument doc = TransformerUtils.transform(new 
DedupEntity(eventEniq, dedupStateValues));
+            BsonDocument doc = TransformerUtils.transform(new 
DedupEntity(this.publishName, eventEniq, dedupStateValues));
             BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            filter.append(DEDUP_ID, new 
BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
             Document returnedDoc = stateCollection.findOneAndReplace(filter, 
Document.parse(doc.toJson()));
             if (returnedDoc == null) {
                 InsertOneOptions option = new InsertOneOptions();
@@ -133,7 +139,7 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     public void remove(EventUniq eventEniq) {
         try {
             BsonDocument filter = new BsonDocument();
-            filter.append(DEDUP_ID, new BsonInt64(eventEniq.hashCode()));
+            filter.append(DEDUP_ID, new 
BsonInt64(TransformerUtils.getUniqueId(this.publishName, eventEniq)));
             stateCollection.deleteOne(filter);
         } catch (Exception e) {
             LOG.error("delete dedup state failed, but the state in memory is 
good, could be ingored.", e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
index e339240..5c18867 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.bson.BsonArray;
 import org.bson.BsonBoolean;
@@ -65,7 +66,8 @@ public class TransformerUtils {
                     MongoDedupEventsStore.DEDUP_FIRST_OCCURRENCE).getValue());
                 dedupValues.add(dedupValue);
             }
-            return (T) new DedupEntity(eventUniq, dedupValues);
+            String publishId = 
doc.getString(MongoDedupEventsStore.DEDUP_PUBLISH_ID).getValue();
+            return (T) new DedupEntity(publishId, eventUniq, dedupValues);
         }
         throw new RuntimeException(String.format("Unknow object type %s, 
cannot transform", klass.getName()));
     }
@@ -74,8 +76,9 @@ public class TransformerUtils {
         if (obj instanceof DedupEntity) {
             BsonDocument doc = new BsonDocument();
             DedupEntity entity = (DedupEntity) obj;
-            doc.put(MongoDedupEventsStore.DEDUP_ID, new 
BsonInt64(entity.getEventEniq().hashCode()));
+            doc.put(MongoDedupEventsStore.DEDUP_ID, new 
BsonInt64(getUniqueId(entity.getPublishName(), entity.getEventEniq())));
             doc.put(MongoDedupEventsStore.DEDUP_STREAM_ID, new 
BsonString(entity.getEventEniq().streamId));
+            doc.put(MongoDedupEventsStore.DEDUP_PUBLISH_ID, new 
BsonString(entity.getPublishName()));
             doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new 
BsonString(entity.getEventEniq().policyId));
             doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new 
BsonInt64(entity.getEventEniq().createdTime));
             doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new 
BsonInt64(entity.getEventEniq().timestamp));
@@ -107,4 +110,9 @@ public class TransformerUtils {
         throw new RuntimeException(String.format("Unknow object type %s, 
cannot transform", obj.getClass().getName()));
     }
 
+    public static int getUniqueId(String publishName, EventUniq eventEniq) {
+        HashCodeBuilder builder = new 
HashCodeBuilder().append(eventEniq).append(publishName);
+        return builder.build();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 3e293cd..87aac29 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -47,7 +47,7 @@ public abstract class AbstractPublishPlugin implements 
AlertPublishPlugin {
     @SuppressWarnings("rawtypes")
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws 
Exception {
-        DedupCache dedupCache = new DedupCache(config);
+        DedupCache dedupCache = new DedupCache(config, publishment.getName());
         OverrideDeduplicatorSpec spec = publishment.getOverrideDeduplicator();
         if (spec != null && StringUtils.isNotBlank(spec.getClassName())) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
index 5e56bb7..54aedb8 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -25,12 +25,14 @@ import 
org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import 
org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.apache.storm.guava.base.Joiner;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
 public class DedupCacheStoreTest extends MongoDependencyBaseTest {
@@ -50,16 +52,19 @@ public class DedupCacheStoreTest extends 
MongoDependencyBaseTest {
                
                System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
                Config config = ConfigFactory.load();
-               DedupCache cache = new DedupCache(config);
+               DedupCache cache = new DedupCache(config, "testPublishment");
                cache.addOrUpdate(eventUniq, (String) 
event.getData()[event.getSchema().getColumnIndex("state")]);
                
-               DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config);
+               DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config, 
"testPublishment");
                Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
accessor.getEvents();
                for (EventUniq one : events.keySet()) {
                        if (one.equals(eventUniq)) {
                                Assert.assertEquals(false, one.removable);
                        }
                }
+               for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry 
: events.entrySet()) {
+                       System.out.println(entry.getKey() + " >>> " + 
Joiner.on("\n\t").join(entry.getValue()));
+               }
                
                eventUniq.removable = true;
                cache.persistUpdatedEventUniq(eventUniq);
@@ -70,6 +75,10 @@ public class DedupCacheStoreTest extends 
MongoDependencyBaseTest {
                                Assert.assertEquals(true, one.removable);
                        }
                }
+               
+               for (Entry<EventUniq, ConcurrentLinkedDeque<DedupValue>> entry 
: events.entrySet()) {
+                       System.out.println(entry.getKey() + " >>> " + 
Joiner.on("\n\t").join(entry.getValue()));
+               }
        }
        
        private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
index e996376..d3dc717 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -55,7 +55,7 @@ public class DedupCacheTest {
        @Test
        public void testNormal() throws Exception {
                Config config = ConfigFactory.load();
-               DedupCache dedupCache = new DedupCache(config);
+               DedupCache dedupCache = new DedupCache(config, 
"testPublishment");
                
                StreamDefinition stream = createStream();
                PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index 57c113b..456b1ea 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -42,7 +42,7 @@ public class DefaultDedupWithoutStateTest {
                // assume state: OPEN, WARN, CLOSE
                System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
                Config config = ConfigFactory.load();
-               DedupCache dedupCache = new DedupCache(config);
+               DedupCache dedupCache = new DedupCache(config, 
"testPublishment");
                DefaultDeduplicator deduplicator = new DefaultDeduplicator(
                                "PT10S", Arrays.asList(new String[] { 
"alertKey" }), null, dedupCache);
                

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 3a17e53..0556e3d 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -40,7 +40,7 @@ public class DefaultDeduplicatorTest extends 
MongoDependencyBaseTest {
                // assume state: OPEN, WARN, CLOSE
                System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
                Config config = ConfigFactory.load();
-               DedupCache dedupCache = new DedupCache(config);
+               DedupCache dedupCache = new DedupCache(config, 
"testPublishment");
                DefaultDeduplicator deduplicator = new DefaultDeduplicator(
                                "PT1M", Arrays.asList(new String[] { "alertKey" 
}), "state", dedupCache);
                

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/89d8e3a0/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
index 75de384..b7d7613 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDependencyBaseTest.java
@@ -54,7 +54,7 @@ public abstract class MongoDependencyBaseTest {
         ConfigFactory.invalidateCaches();
         config = ConfigFactory.load();
         
-        store = new MongoDedupEventsStore(config);
+        store = new MongoDedupEventsStore(config, "testPublishment");
     }
 
     @AfterClass

Reply via email to