Repository: incubator-eagle
Updated Branches:
  refs/heads/master a0005997c -> 3706dc7e6


EAGLE-544: Enhance dedup to support extended deduplicator

Author: Li, Garrett
Reviewer: ralphsu

This closes #437


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/3706dc7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/3706dc7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/3706dc7e

Branch: refs/heads/master
Commit: 3706dc7e628f5bf5375dd88638796f99f1db06e7
Parents: a000599
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Sun Sep 18 09:55:42 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Sun Sep 18 15:40:06 2016 -0700

----------------------------------------------------------------------
 .../coordinator/OverrideDeduplicatorSpec.java   |  48 ++++++
 .../alert/engine/coordinator/Publishment.java   |  17 ++-
 .../engine/publisher/dedup/DedupCache.java      | 102 +++++++++++--
 .../publisher/dedup/ExtendedDeduplicator.java   |  70 +++++++++
 .../publisher/impl/AbstractPublishPlugin.java   |  46 +++++-
 .../publisher/impl/DefaultDeduplicator.java     |  89 ++---------
 .../engine/publisher/dedup/DedupCacheTest.java  | 147 +++++++++++++++++++
 .../dedup/DefaultDeduplicatorTest.java          |   3 +-
 .../dedup/ExtendedDeduplicatorTest.java         | 109 ++++++++++++++
 .../publisher/dedup/MongoDedupStoreTest.java    |   5 +
 .../publisher/dedup/TestDeduplicator.java       |  82 +++++++++++
 .../engine/router/TestAlertPublisherBolt.java   |   6 +-
 .../publishments-extended-deduplicator.json     |  31 ++++
 .../src/test/resources/router/publishments.json |   1 -
 14 files changed, 643 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpec.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpec.java
new file mode 100644
index 0000000..0a65ee6
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/OverrideDeduplicatorSpec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class OverrideDeduplicatorSpec implements Serializable {
+
+    private static final long serialVersionUID = 6087460266979449001L;
+
+    private String className;
+    private Map<String, String> properties;
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index 8176e03..8055144 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -16,13 +16,14 @@
  */
 package org.apache.eagle.alert.engine.coordinator;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
 /**
  * @since Apr 11, 2016.
  */
@@ -35,7 +36,7 @@ public class Publishment {
     private String dedupIntervalMin;
     private List<String> dedupFields;
     private String dedupStateField;
-    private String dedupStateCloseValue;
+    private OverrideDeduplicatorSpec overrideDeduplicator;
     private Map<String, String> properties;
     // the class name to extend the IEventSerializer interface
     private String serializer;
@@ -56,12 +57,12 @@ public class Publishment {
         this.dedupStateField = dedupStateField;
     }
 
-    public String getDedupStateCloseValue() {
-        return dedupStateCloseValue;
+    public OverrideDeduplicatorSpec getOverrideDeduplicator() {
+        return overrideDeduplicator;
     }
 
-    public void setDedupStateCloseValue(String dedupStateCloseValue) {
-        this.dedupStateCloseValue = dedupStateCloseValue;
+    public void setOverrideDeduplicator(OverrideDeduplicatorSpec 
overrideDeduplicator) {
+        this.overrideDeduplicator = overrideDeduplicator;
     }
 
     public String getSerializer() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index 2036bca..abd8004 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -16,12 +16,15 @@
  */
 package org.apache.eagle.alert.engine.publisher.dedup;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
 import org.apache.commons.lang.time.DateUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import 
org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.slf4j.Logger;
@@ -31,9 +34,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.typesafe.config.Config;
 
-/*
- * it is not thread safe, we need to handle concurrency issue out of this class
- */
 public class DedupCache {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DedupCache.class);
@@ -41,6 +41,9 @@ public class DedupCache {
     private static final long CACHE_MAX_EXPIRE_TIME_IN_DAYS = 30;
     private static final long CACHE_MAX_EVENT_QUEUE_SIZE = 10;
 
+    private static final String DEDUP_COUNT = "dedupCount";
+    private static final String DEDUP_FIRST_OCCURRENCE = 
"dedupFirstOccurrence";
+
     private static final DedupEventsStoreType type = 
DedupEventsStoreType.Mongo;
 
     private long lastUpdated = -1;
@@ -69,7 +72,55 @@ public class DedupCache {
         return events;
     }
 
-    public DedupValue[] add(EventUniq eventEniq, String stateFieldValue, 
String dedupStateCloseValue) {
+    public boolean contains(EventUniq eventEniq) {
+        return this.getEvents().containsKey(eventEniq);
+    }
+
+    public void removeEvent(EventUniq eventEniq) {
+        if (this.contains(eventEniq)) {
+            this.events.remove(eventEniq);
+
+            DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+            accessor.remove(eventEniq);
+        }
+    }
+
+    public List<AlertStreamEvent> dedup(AlertStreamEvent event, EventUniq 
eventEniq,
+                                        String dedupStateField, String 
stateFieldValue) {
+        DedupValue[] dedupValues = this.addOrUpdate(eventEniq, 
stateFieldValue);
+        if (dedupValues != null) {
+            // any of dedupValues won't be null
+            if (dedupValues.length == 2) {
+                // emit last event which includes count of dedup events & new 
state event
+                return Arrays.asList(
+                    this.mergeEventWithDedupValue(event, dedupValues[0], 
dedupStateField),
+                    this.mergeEventWithDedupValue(event, dedupValues[1], 
dedupStateField));
+            } else if (dedupValues.length == 1) {
+                //populate firstOccurrenceTime & count
+                return Arrays.asList(this.mergeEventWithDedupValue(event, 
dedupValues[0], dedupStateField));
+            }
+        }
+        // duplicated, will be ignored
+        return null;
+    }
+
+    public synchronized DedupValue[] addOrUpdate(EventUniq eventEniq, String 
stateFieldValue) {
+        Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
this.getEvents();
+        if (!events.containsKey(eventEniq)
+            || (events.containsKey(eventEniq)
+            && events.get(eventEniq).size() > 0
+            && !Objects.equal(stateFieldValue,
+            events.get(eventEniq).getLast().getStateFieldValue()))) {
+            DedupValue[] dedupValues = this.add(eventEniq, stateFieldValue);
+            return dedupValues;
+        } else {
+            // update count
+            this.updateCount(eventEniq);
+            return null;
+        }
+    }
+
+    private DedupValue[] add(EventUniq eventEniq, String stateFieldValue) {
         DedupValue dedupValue = null;
         DedupValue lastDedupValue = null;
         if (!events.containsKey(eventEniq)) {
@@ -91,19 +142,13 @@ public class DedupCache {
             if (dedupValues.size() > CACHE_MAX_EVENT_QUEUE_SIZE) {
                 dedupValues = new ConcurrentLinkedDeque<DedupValue>();
                 dedupValues.add(lastDedupValue);
+                LOG.info("Reset dedup key {} to value {} since meets maximum 
{}",
+                    eventEniq, dedupValue, CACHE_MAX_EVENT_QUEUE_SIZE);
             }
             dedupValues.add(dedupValue);
             LOG.info("Update dedup key {}, and value {}", eventEniq, 
dedupValue);
         }
         if (dedupValue != null) {
-            // reset the list if close state reached
-            if (StringUtils.isNotBlank(dedupStateCloseValue)
-                && Objects.equal(stateFieldValue, dedupStateCloseValue)) {
-                events.put(eventEniq, new ConcurrentLinkedDeque<DedupValue>());
-                events.get(eventEniq).add(dedupValue);
-                LOG.info("Reset dedup key {} to value {}", eventEniq, 
dedupValue);
-            }
-
             DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
             accessor.add(eventEniq, events.get(eventEniq));
             LOG.info("Store dedup key {}, value {} to DB", eventEniq,
@@ -119,7 +164,7 @@ public class DedupCache {
         }
     }
 
-    public DedupValue updateCount(EventUniq eventEniq) {
+    private DedupValue updateCount(EventUniq eventEniq) {
         ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
         if (dedupValues == null || dedupValues.size() <= 0) {
             LOG.warn("No dedup values found for {}, cannot update count", 
eventEniq);
@@ -133,4 +178,33 @@ public class DedupCache {
         }
     }
 
+    private AlertStreamEvent mergeEventWithDedupValue(AlertStreamEvent 
originalEvent,
+                                                      DedupValue dedupValue, 
String dedupStateField) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        Object[] newdata = new Object[originalEvent.getData().length];
+        for (int i = 0; i < originalEvent.getData().length; i++) {
+            newdata[i] = originalEvent.getData()[i];
+        }
+        event.setData(newdata);
+        event.setSchema(originalEvent.getSchema());
+        event.setPolicyId(originalEvent.getPolicyId());
+        event.setCreatedTime(originalEvent.getCreatedTime());
+        event.setCreatedBy(originalEvent.getCreatedBy());
+        event.setTimestamp(originalEvent.getTimestamp());
+        StreamDefinition streamDefinition = event.getSchema();
+        for (int i = 0; i < event.getData().length; i++) {
+            String colName = streamDefinition.getColumns().get(i).getName();
+            if (Objects.equal(colName, dedupStateField)) {
+                event.getData()[i] = dedupValue.getStateFieldValue();
+            }
+            if (Objects.equal(colName, DEDUP_COUNT)) {
+                event.getData()[i] = dedupValue.getCount();
+            }
+            if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
+                event.getData()[i] = dedupValue.getFirstOccurrence();
+            }
+        }
+        return event;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicator.java
new file mode 100644
index 0000000..9faaed4
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
+
+import com.typesafe.config.Config;
+
+public abstract class ExtendedDeduplicator implements AlertDeduplicator {
+
+    private Map<String, String> properties;
+    private DedupCache dedupCache;
+    private List<String> customDedupFields;
+    private String dedupStateField;
+    private Config config;
+
+    public ExtendedDeduplicator(Config config,
+                                Map<String, String> properties,
+                                List<String> customDedupFields,
+                                String dedupStateField,
+                                DedupCache dedupCache) {
+        this.properties = properties;
+        if (this.properties == null) {
+            this.properties = new HashMap<String, String>();
+        }
+        this.dedupCache = dedupCache;
+        this.customDedupFields = customDedupFields;
+        this.dedupStateField = dedupStateField;
+        this.config = config;
+    }
+
+    public Map<String, String> getProperties() {
+        return this.properties;
+    }
+
+    public DedupCache getDedupCache() {
+        return this.dedupCache;
+    }
+
+    public List<String> getCustomDedupFields() {
+        return customDedupFields;
+    }
+
+    public String getDedupStateField() {
+        return dedupStateField;
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
index 15e27a3..743ce91 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java
@@ -16,16 +16,23 @@
  */
 package org.apache.eagle.alert.engine.publisher.impl;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.codec.IEventSerializer;
+import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
+import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator;
 import org.slf4j.Logger;
 
-import java.util.List;
-import java.util.Map;
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
 
 /**
  * @since Jun 3, 2016.
@@ -40,11 +47,34 @@ public abstract class AbstractPublishPlugin implements 
AlertPublishPlugin {
     @SuppressWarnings("rawtypes")
     @Override
     public void init(Config config, Publishment publishment, Map conf) throws 
Exception {
-        this.deduplicator = new 
DefaultDeduplicator(publishment.getDedupIntervalMin(),
-            publishment.getDedupFields(), publishment.getDedupStateField(),
-            publishment.getDedupStateCloseValue(),
-            config);
-        this.pubName = publishment.getName();
+        DedupCache dedupCache = DedupCache.getInstance(config);
+
+        OverrideDeduplicatorSpec spec = publishment.getOverrideDeduplicator();
+        if (spec != null && StringUtils.isNotBlank(spec.getClassName())) {
+            try {
+                this.deduplicator = (ExtendedDeduplicator) Class.forName(
+                    spec.getClassName()).getConstructor(
+                    Config.class,
+                    Map.class,
+                    List.class,
+                    String.class,
+                    DedupCache.class).newInstance(
+                    config,
+                    spec.getProperties(),
+                    publishment.getDedupFields(),
+                    publishment.getDedupStateField(),
+                    dedupCache);
+                getLogger().info("initiliazed extended deduplicator {} with 
properties {} successfully",
+                    spec.getClassName(), 
Joiner.on(",").withKeyValueSeparator(">").join(
+                        spec.getProperties() == null ? new HashMap<String, 
String>() : spec.getProperties()));
+            } catch (Throwable t) {
+                getLogger().error(String.format("initialize extended 
deduplicator %s failed", spec.getClassName()), t);
+            }
+        } else {
+            this.deduplicator = new 
DefaultDeduplicator(publishment.getDedupIntervalMin(),
+                publishment.getDedupFields(), 
publishment.getDedupStateField(), dedupCache);
+            this.pubName = publishment.getName();
+        }
         String serializerClz = publishment.getSerializer();
         try {
             Object obj = 
Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
index de4ae09..0e79fca 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java
@@ -21,22 +21,16 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
-import org.apache.storm.guava.base.Objects;
 import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.typesafe.config.Config;
-
 public class DefaultDeduplicator implements AlertDeduplicator {
 
     private static Logger LOG = 
LoggerFactory.getLogger(DefaultDeduplicator.class);
@@ -45,11 +39,6 @@ public class DefaultDeduplicator implements 
AlertDeduplicator {
     private long dedupIntervalMin;
     private List<String> customDedupFields = new ArrayList<>();
     private String dedupStateField;
-    private String dedupStateCloseValue;
-    private Config config;
-
-    private static final String DEDUP_COUNT = "dedupCount";
-    private static final String DEDUP_FIRST_OCCURRENCE = 
"dedupFirstOccurrence";
 
     private DedupCache dedupCache;
 
@@ -66,7 +55,7 @@ public class DefaultDeduplicator implements AlertDeduplicator 
{
     }
 
     public DefaultDeduplicator(String intervalMin, List<String> 
customDedupFields,
-                               String dedupStateField, String 
dedupStateCloseValue, Config config) {
+                               String dedupStateField, DedupCache dedupCache) {
         setDedupIntervalMin(intervalMin);
         if (customDedupFields != null) {
             this.customDedupFields = customDedupFields;
@@ -74,11 +63,7 @@ public class DefaultDeduplicator implements 
AlertDeduplicator {
         if (StringUtils.isNotBlank(dedupStateField)) {
             this.dedupStateField = dedupStateField;
         }
-        if (StringUtils.isNotBlank(dedupStateCloseValue)) {
-            this.dedupStateCloseValue = dedupStateCloseValue;
-        }
-        this.config = config;
-        this.dedupCache = DedupCache.getInstance(this.config);
+        this.dedupCache = dedupCache;
     }
 
     /*
@@ -88,63 +73,10 @@ public class DefaultDeduplicator implements 
AlertDeduplicator {
     public List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq 
key, String stateFiledValue) {
         if (StringUtils.isBlank(stateFiledValue)) {
             // without state field, we cannot determine whether it is 
duplicated
+            // without custom filed values, we cannot determine whether it is 
duplicated
             return Arrays.asList(event);
         }
-        synchronized (dedupCache) {
-            Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
dedupCache.getEvents();
-            if (!events.containsKey(key)
-                || (events.containsKey(key)
-                && events.get(key).size() > 0
-                && !Objects.equal(stateFiledValue,
-                        events.get(key).getLast().getStateFieldValue()))) {
-                DedupValue[] dedupValues = dedupCache.add(key, 
stateFiledValue, dedupStateCloseValue);
-                if (dedupValues != null) {
-                    // any of dedupValues won't be null
-                    if (dedupValues.length == 2) {
-                        // emit last event which includes count of dedup 
events & new state event
-                        return Arrays.asList(
-                            mergeEventWithDedupValue(event, dedupValues[0]),
-                            mergeEventWithDedupValue(event, dedupValues[1]));
-                    } else if (dedupValues.length == 1) {
-                        //populate firstOccurrenceTime & count
-                        return Arrays.asList(mergeEventWithDedupValue(event, 
dedupValues[0]));
-                    }
-                }
-            } else {
-                // update count
-                dedupCache.updateCount(key);
-            }
-        }
-        // duplicated, will be ignored
-        return null;
-    }
-
-    private AlertStreamEvent mergeEventWithDedupValue(AlertStreamEvent 
originalEvent, DedupValue dedupValue) {
-        AlertStreamEvent event = new AlertStreamEvent();
-        Object[] newdata = new Object[originalEvent.getData().length];
-        for (int i = 0; i < originalEvent.getData().length; i++) {
-            newdata[i] = originalEvent.getData()[i];
-        }
-        event.setData(newdata);
-        event.setSchema(originalEvent.getSchema());
-        event.setPolicyId(originalEvent.getPolicyId());
-        event.setCreatedTime(originalEvent.getCreatedTime());
-        event.setCreatedBy(originalEvent.getCreatedBy());
-        event.setTimestamp(originalEvent.getTimestamp());
-        StreamDefinition streamDefinition = event.getSchema();
-        for (int i = 0; i < event.getData().length; i++) {
-            String colName = streamDefinition.getColumns().get(i).getName();
-            if (Objects.equal(colName, dedupStateField)) {
-                event.getData()[i] = dedupValue.getStateFieldValue();
-            }
-            if (Objects.equal(colName, DEDUP_COUNT)) {
-                event.getData()[i] = dedupValue.getCount();
-            }
-            if (Objects.equal(colName, DEDUP_FIRST_OCCURRENCE)) {
-                event.getData()[i] = dedupValue.getFirstOccurrence();
-            }
-        }
-        return event;
+        return dedupCache.dedup(event, key, dedupStateField, stateFiledValue);
     }
 
     public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
@@ -168,10 +100,15 @@ public class DefaultDeduplicator implements 
AlertDeduplicator {
                 stateFiledValue = event.getData()[i].toString();
             }
 
-            for (String field : customDedupFields) {
-                if (colName.equals(field)) {
-                    customFieldValues.put(field, 
event.getData()[i].toString());
-                    break;
+            // make all of the field as unique key if no custom dedup field 
provided
+            if (customDedupFields == null || customDedupFields.size() <= 0) {
+                customFieldValues.put(colName, event.getData()[i].toString());
+            } else {
+                for (String field : customDedupFields) {
+                    if (colName.equals(field)) {
+                        customFieldValues.put(field, 
event.getData()[i].toString());
+                        break;
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
new file mode 100644
index 0000000..6da6d4a
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DedupCacheTest {
+       
+       private DedupEventsStore store;
+       
+       @Before
+       public void setUp() {
+               store = Mockito.mock(DedupEventsStore.class);
+       DedupEventsStoreFactory.customizeStore(store);
+       }
+       
+       @After
+       public void tearDown() {
+        Mockito.reset(store);
+       }
+
+       @Test
+       public void testNormal() throws Exception {
+               Config config = ConfigFactory.load();
+               DedupCache dedupCache = DedupCache.getInstance(config);
+               
+               StreamDefinition stream = createStream();
+               PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
+               
+               String[] states = new String[] { "OPEN", "WARN", "CLOSE" };
+               Random random = new Random();
+               for (int i = 0; i < 20; i ++) {
+                       AlertStreamEvent event = createEvent(stream, policy, 
new Object[] {
+                                       System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", states[random.nextInt(3)], 0, 0
+                       });
+                       HashMap<String, String> dedupFieldValues = new 
HashMap<String, String>();
+                       dedupFieldValues.put("alertKey", (String) 
event.getData()[event.getSchema().getColumnIndex("alertKey")]);
+                       List<AlertStreamEvent> result = dedupCache.dedup(event, 
+                                       new EventUniq(event.getStreamId(), 
event.getPolicyId(), event.getCreatedTime(), dedupFieldValues), 
+                                       "state", 
+                                       (String) 
event.getData()[event.getSchema().getColumnIndex("state")]);
+                       System.out.println((i + 1) + " >>>> " + 
ToStringBuilder.reflectionToString(result));
+               }
+               
+               Assert.assertTrue(true);
+       }
+       
+       private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
+               AlertStreamEvent event = new AlertStreamEvent();
+               event.setPolicyId(policy.getName());
+               event.setSchema(stream);
+               event.setStreamId(stream.getStreamId());
+               event.setTimestamp(System.currentTimeMillis());
+               event.setCreatedTime(System.currentTimeMillis());
+               event.setData(data);
+               return event;
+       }
+       
+       private StreamDefinition createStream() {
+               StreamDefinition sd = new StreamDefinition();
+               StreamColumn tsColumn = new StreamColumn();
+               tsColumn.setName("timestamp");
+               tsColumn.setType(StreamColumn.Type.LONG);
+               
+               StreamColumn hostColumn = new StreamColumn();
+               hostColumn.setName("host");
+               hostColumn.setType(StreamColumn.Type.STRING);
+               
+               StreamColumn alertKeyColumn = new StreamColumn();
+               alertKeyColumn.setName("alertKey");
+               alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+               StreamColumn stateColumn = new StreamColumn();
+               stateColumn.setName("state");
+               stateColumn.setType(StreamColumn.Type.STRING);
+               
+               // dedupCount, dedupFirstOccurrence
+               
+               StreamColumn dedupCountColumn = new StreamColumn();
+               dedupCountColumn.setName("dedupCount");
+               dedupCountColumn.setType(StreamColumn.Type.LONG);
+               
+               StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+               dedupFirstOccurrenceColumn.setName("dedupFirstOccurrence");
+               dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+               
+               sd.setColumns(Arrays.asList(tsColumn, hostColumn, 
alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+               sd.setDataSource("testDatasource");
+               sd.setStreamId("testStream");
+               sd.setDescription("test stream");
+               return sd;
+       }
+       
+       private PolicyDefinition createPolicy(String streamName, String 
policyName) {
+               PolicyDefinition pd = new PolicyDefinition();
+               PolicyDefinition.Definition def = new 
PolicyDefinition.Definition();
+               //expression, something like "PT5S,dynamic,1,host"
+               def.setValue("test");
+               def.setType("siddhi");
+               pd.setDefinition(def);
+               pd.setInputStreams(Arrays.asList("inputStream"));
+               pd.setOutputStreams(Arrays.asList("outputStream"));
+               pd.setName(policyName);
+               pd.setDescription(String.format("Test policy for stream %s", 
streamName));
+               
+               StreamPartition sp = new StreamPartition();
+               sp.setStreamId(streamName);
+               sp.setColumns(Arrays.asList("host"));
+               sp.setType(StreamPartition.Type.GROUPBY);
+               pd.addPartition(sp);
+               return pd;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 4ec9b42..1afb745 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -41,8 +41,9 @@ public class DefaultDeduplicatorTest extends 
MongoDependencyBaseTest {
                // assume state: OPEN, WARN, CLOSE
                System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
                Config config = ConfigFactory.load();
+               DedupCache dedupCache = DedupCache.getInstance(config);
                DefaultDeduplicator deduplicator = new DefaultDeduplicator(
-                               "PT1M", Arrays.asList(new String[] { "alertKey" 
}), "state", "CLOSE", config);
+                               "PT1M", Arrays.asList(new String[] { "alertKey" 
}), "state", dedupCache);
                
                StreamDefinition stream = createStream();
                PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
new file mode 100644
index 0000000..52d4460
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
+import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
+import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+
+public class ExtendedDeduplicatorTest {
+
+       private DedupEventsStore store;
+       
+       @Before
+       public void setUp() {
+               store = Mockito.mock(DedupEventsStore.class);
+       DedupEventsStoreFactory.customizeStore(store);
+       }
+       
+       @After
+       public void tearDown() {
+        Mockito.reset(store);
+       }
+       
+       @Test
+       public void testNormal() throws Exception {
+               List<Publishment> pubs = 
loadEntities("/router/publishments-extended-deduplicator.json", 
Publishment.class);
+
+        AlertPublishPlugin plugin = 
AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
+        AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", 
"extended_dedup_testapp1", "OPEN");
+        AlertStreamEvent event2 = createWithStreamDef("extended_dedup_host2", 
"extended_dedup_testapp1", "OPEN");
+        AlertStreamEvent event3 = createWithStreamDef("extended_dedup_host2", 
"extended_dedup_testapp2", "CLOSE");
+
+        Assert.assertNotNull(plugin.dedup(event1));
+        Assert.assertNull(plugin.dedup(event2));
+        Assert.assertNotNull(plugin.dedup(event3));
+        
+        Mockito.verify(store, Mockito.atLeastOnce()).add(Mockito.anyObject(), 
Mockito.anyObject());
+       }
+       
+       private <T> List<T> loadEntities(String path, Class<T> tClz) throws 
Exception {
+        ObjectMapper objectMapper = new ObjectMapper();
+        JavaType type = CollectionType.construct(List.class, 
SimpleType.construct(tClz));
+        List<T> l = 
objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), 
type);
+        return l;
+    }
+
+    private AlertStreamEvent createWithStreamDef(String hostname, String 
appName, String state) {
+        AlertStreamEvent alert = new AlertStreamEvent();
+        PolicyDefinition policy = new PolicyDefinition();
+        policy.setName("perfmon_cpu_host_check");
+        alert.setPolicyId(policy.getName());
+        alert.setCreatedTime(System.currentTimeMillis());
+        alert.setData(new Object[] {appName, hostname, state});
+        alert.setStreamId("testAlertStream");
+        alert.setCreatedBy(this.toString());
+
+        // build stream definition
+        StreamDefinition sd = new StreamDefinition();
+        StreamColumn appColumn = new StreamColumn();
+        appColumn.setName("appname");
+        appColumn.setType(StreamColumn.Type.STRING);
+
+        StreamColumn hostColumn = new StreamColumn();
+        hostColumn.setName("hostname");
+        hostColumn.setType(StreamColumn.Type.STRING);
+        
+        StreamColumn stateColumn = new StreamColumn();
+        stateColumn.setName("state");
+        stateColumn.setType(StreamColumn.Type.STRING);
+
+        sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn));
+
+        alert.setSchema(sd);
+        return alert;
+    }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
index bdd5e0b..dc05342 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupStoreTest.java
@@ -56,6 +56,11 @@ public class MongoDedupStoreTest extends 
MongoDependencyBaseTest {
                Assert.assertEquals(streamId, entry.getKey().streamId);
                Assert.assertEquals(1, entry.getValue().size());
                Assert.assertEquals(2, entry.getValue().getLast().getCount());
+               
+               store.remove(events.keySet().iterator().next());
+               events = store.getEvents();
+               Assert.assertNotNull(events);
+               Assert.assertEquals(0, events.size());
        }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
new file mode 100644
index 0000000..7e175c8
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Ignore
+public class TestDeduplicator extends ExtendedDeduplicator {
+
+    public TestDeduplicator(Config config, Map<String, String> properties, 
List<String> customDedupFields,
+                       String dedupStateField, DedupCache dedupCache) {
+               super(config, properties, customDedupFields, dedupStateField, 
dedupCache);
+       }
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestDeduplicator.class);
+
+    @Override
+    public List<AlertStreamEvent> dedup(AlertStreamEvent event) {
+        StreamDefinition streamDefinition = event.getSchema();
+        HashMap<String, String> customFieldValues = new HashMap<>();
+        String stateFiledValue = null;
+        for (int i = 0; i < event.getData().length; i++) {
+            if (i > streamDefinition.getColumns().size()) {
+                continue;
+            }
+            String colName = streamDefinition.getColumns().get(i).getName();
+
+            if (colName.equals(this.getDedupStateField())) {
+                stateFiledValue = event.getData()[i].toString();
+            }
+
+            // make all of the field as unique key if no custom dedup field 
provided
+            if (this.getCustomDedupFields() == null || 
this.getCustomDedupFields().size() <= 0) {
+                customFieldValues.put(colName, event.getData()[i].toString());
+            } else {
+                for (String field : this.getCustomDedupFields()) {
+                    if (colName.equals(field)) {
+                        customFieldValues.put(field, 
event.getData()[i].toString());
+                        break;
+                    }
+                }
+            }
+        }
+        LOG.info("event: " + event);
+        EventUniq eventkey = new EventUniq(event.getStreamId(), 
event.getPolicyId(), event.getCreatedTime(), customFieldValues);
+        LOG.info("event key: " + eventkey);
+        LOG.info("dedup field: " + this.getDedupStateField());
+        LOG.info("dedup value: " + stateFiledValue);
+        List<AlertStreamEvent> result = this.getDedupCache().dedup(event, 
eventkey, this.getDedupStateField(), stateFiledValue);
+        return result;
+    }
+
+    @Override
+    public void setDedupIntervalMin(String intervalMin) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
index 6ebae63..3432496 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedDeque;
 
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
@@ -34,10 +33,8 @@ import 
org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStore;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory;
-import org.apache.eagle.alert.engine.publisher.dedup.DedupValue;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
-import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.apache.eagle.alert.engine.runner.AlertPublisherBolt;
 import org.apache.eagle.alert.engine.runner.MapComparator;
 import org.apache.eagle.alert.engine.utils.MetadataSerDeser;
@@ -217,7 +214,6 @@ public class TestAlertPublisherBolt {
         return alert;
     }
 
-       @SuppressWarnings("unchecked")
        @Test
     public void testCustomFieldDedupEvent() throws Exception {
         List<Publishment> pubs = loadEntities("/router/publishments.json", 
Publishment.class);
@@ -241,7 +237,7 @@ public class TestAlertPublisherBolt {
 
         AlertPublishPlugin plugin = 
AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null);
         AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", 
"OPEN");
-        AlertStreamEvent event2 = createWithStreamDef("host2", "testapp2", 
"OPEN");
+        AlertStreamEvent event2 = createWithStreamDef("host1", "testapp1", 
"OPEN");
 
         Assert.assertNotNull(plugin.dedup(event1));
         Assert.assertNull(plugin.dedup(event2));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-extended-deduplicator.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-extended-deduplicator.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-extended-deduplicator.json
new file mode 100644
index 0000000..5e14e78
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments-extended-deduplicator.json
@@ -0,0 +1,31 @@
+[
+  {
+    "name": "test-stream-output",
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "perfmon_cpu_host_check",
+      "perfmon_cpu_pool_check"
+    ],
+    "properties": {
+      "subject": "Eagle Test Alert",
+      "template": "",
+      "sender": "sen...@corp.com",
+      "recipients": "recei...@corp.com",
+      "smtp.server": "mailhost.com",
+      "connection": "plaintext",
+      "smtp.port": "25"
+    },
+    "dedupIntervalMin": "PT1M",
+    "dedupFields": [
+      "appname"
+    ],
+    "dedupStateField": "state",
+    "overrideDeduplicator": {
+               "className": 
"org.apache.eagle.alert.engine.publisher.dedup.TestDeduplicator",
+               "properties": {
+                       "docIdField": "docId"
+               }
+       },
+    "serializer": 
"org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3706dc7e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
index d93b8e2..1f4ddcc 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/router/publishments.json
@@ -20,7 +20,6 @@
       "appname"
     ],
     "dedupStateField": "state",
-       "dedupStateCloseValue": "CLOSE",
     "serializer": 
"org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
   }
 ]
\ No newline at end of file

Reply via email to