Repository: incubator-eagle
Updated Branches:
  refs/heads/master 48a2c6f55 -> 020a5b3cf


EAGLE-550: periodically clean up dedup cache

Author: Li, Garrett
Reviewer: Zeng, Bryant; ralphsu

This closes #442


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/020a5b3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/020a5b3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/020a5b3c

Branch: refs/heads/master
Commit: 020a5b3cfb3dfe37db54db7f91343b925623ec49
Parents: 48a2c6f
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Tue Sep 20 10:51:05 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Thu Sep 22 00:17:15 2016 -0700

----------------------------------------------------------------------
 .../provider/NodataMetadataGenerator.java       |   5 +-
 .../nodata/NoDataPolicyTimeBatchHandler.java    |  21 +--
 .../engine/publisher/dedup/DedupCache.java      |  34 +++++
 .../publisher/dedup/MongoDedupEventsStore.java  |   1 +
 .../publisher/dedup/TransformerUtils.java       |   3 +
 .../alert/engine/publisher/impl/EventUniq.java  |   5 +-
 .../TestNoDataPolicyTimeBatchHandler.java       |  14 +-
 .../publisher/dedup/DedupCacheStoreTest.java    | 142 +++++++++++++++++++
 8 files changed, 206 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
index 3cf5721..7769df1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/NodataMetadataGenerator.java
@@ -213,11 +213,14 @@ public class NodataMetadataGenerator {
 
     private PolicyDefinition buildDynamicNodataPolicy(String streamName, 
String policyName,
                                                       String columnName, 
String expression, List<String> inputStream) {
-        PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         //expression, something like "PT5S,dynamic,1,host"
         def.setValue(expression);
         def.setType(NODATA_ALERT_AGGR_POLICY_TYPE);
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put("nodataColumnName", columnName);
+        def.setProperties(properties);
+        PolicyDefinition pd = new PolicyDefinition();
         pd.setDefinition(def);
         pd.setInputStreams(inputStream);
         pd.setOutputStreams(Arrays.asList(NODATA_ALERT_AGGR_STREAM));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
index 73ee9b2..e2c70a4 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyTimeBatchHandler.java
@@ -65,9 +65,6 @@ public class NoDataPolicyTimeBatchHandler implements 
PolicyStreamHandler {
             throw new IllegalArgumentException("policy outputStream size has 
to be 1 for no data alert");
         }
 
-        String is = inputStreams.get(0);
-        StreamDefinition sd = sds.get(is);
-
         String policyValue = policyDef.getDefinition().getValue();
         // assume that no data alert policy value consists of "windowPeriod,
         // type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value,
@@ -76,19 +73,23 @@ public class NoDataPolicyTimeBatchHandler implements 
PolicyStreamHandler {
         this.wisbType = NoDataWisbType.valueOf(segments[1]);
         // for provided wisb values, need to parse, for dynamic wisb values, it
         // is computed through a window
-        @SuppressWarnings("rawtypes")
-        Set wisbValues = null;
+        Set<String> wisbValues = new HashSet<String>();
         if (wisbType == NoDataWisbType.provided) {
-            wisbValues = new NoDataWisbProvidedParser().parse(segments);
+            for (int i = 2; i < segments.length; i++) {
+                wisbValues.add(segments[i]);
+            }
         }
+        
         long windowPeriod = 
TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0]));
         distinctWindow = new DistinctValuesInTimeBatchWindow(this, 
windowPeriod, wisbValues);
         // populate wisb field names
-        int numOfFields = Integer.parseInt(segments[2]);
-        for (int i = 3; i < 3 + numOfFields; i++) {
-            String fn = segments[i];
-            wisbFieldIndices.add(sd.getColumnIndex(fn));
+        String is = inputStreams.get(0);
+        StreamDefinition sd = sds.get(is);
+        String nodataColumnNameKey = "nodataColumnName";
+        if 
(!policyDef.getDefinition().getProperties().containsKey(nodataColumnNameKey)) {
+            throw new IllegalArgumentException("policy nodata column name has 
to be defined for no data alert");
         }
+        wisbFieldIndices.add(sd.getColumnIndex((String) 
policyDef.getDefinition().getProperties().get(nodataColumnNameKey)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
index abd8004..473cbfc 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java
@@ -17,10 +17,15 @@
 package org.apache.eagle.alert.engine.publisher.dedup;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -57,6 +62,28 @@ public class DedupCache {
         if (INSTANCE == null) {
             INSTANCE = new DedupCache();
             INSTANCE.config = config;
+
+            // create daemon to clean up old removable events periodically
+            ScheduledExecutorService scheduleSrv = 
Executors.newScheduledThreadPool(1, new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = new Thread(r);
+                    t.setDaemon(true);
+                    return t;
+                }
+            });
+            scheduleSrv.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    HashSet<EventUniq> eventUniqs = new 
HashSet<EventUniq>(INSTANCE.getEvents().keySet());
+                    for (EventUniq one : eventUniqs) {
+                        if (one.removable && one.createdTime < 
System.currentTimeMillis() - 3600000 * 24) {
+                            INSTANCE.removeEvent(one);
+                            LOG.info("Remove dedup key {} from cache & db", 
one);
+                        }
+                    }
+                }
+            }, 5, 60, TimeUnit.MINUTES);
         }
         return INSTANCE;
     }
@@ -164,6 +191,13 @@ public class DedupCache {
         }
     }
 
+    public void persistUpdatedEventUniq(EventUniq eventEniq) {
+        DedupEventsStore accessor = DedupEventsStoreFactory.getStore(type, 
this.config);
+        accessor.add(eventEniq, events.get(eventEniq));
+        LOG.info("Store dedup key {}, value {} to DB", eventEniq,
+            Joiner.on(",").join(events.get(eventEniq)));
+    }
+
     private DedupValue updateCount(EventUniq eventEniq) {
         ConcurrentLinkedDeque<DedupValue> dedupValues = events.get(eventEniq);
         if (dedupValues == null || dedupValues.size() <= 0) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
index 19cb716..a536360 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/MongoDedupEventsStore.java
@@ -49,6 +49,7 @@ public class MongoDedupEventsStore implements 
DedupEventsStore {
     public static final String DEDUP_POLICY_ID = "policyId";
     public static final String DEDUP_CREATE_TIME = "createdTime";
     public static final String DEDUP_TIMESTAMP = "timestamp";
+    public static final String DEDUP_REMOVABLE = "removable";
     public static final String DEDUP_CUSTOM_FIELDS_VALUES = 
"customFieldValues";
     public static final String DEDUP_VALUES = "dedupValues";
     public static final String DEDUP_STATE_FIELD_VALUE = "stateFieldValue";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
index aaa95db..e339240 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/TransformerUtils.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 
 import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
 import org.bson.BsonArray;
+import org.bson.BsonBoolean;
 import org.bson.BsonDocument;
 import org.bson.BsonInt64;
 import org.bson.BsonString;
@@ -48,6 +49,7 @@ public class TransformerUtils {
                     dedupCustomFieldValuesDoc.getString(MAP_VALUE).getValue());
             }
             EventUniq eventUniq = new EventUniq(streamId, policyId, timestamp, 
customFieldValues);
+            eventUniq.removable = 
doc.getBoolean(MongoDedupEventsStore.DEDUP_REMOVABLE).getValue();
             eventUniq.createdTime = doc.getInt64(
                 MongoDedupEventsStore.DEDUP_CREATE_TIME, new 
BsonInt64(0)).getValue();
             List<DedupValue> dedupValues = new ArrayList<DedupValue>();
@@ -77,6 +79,7 @@ public class TransformerUtils {
             doc.put(MongoDedupEventsStore.DEDUP_POLICY_ID, new 
BsonString(entity.getEventEniq().policyId));
             doc.put(MongoDedupEventsStore.DEDUP_CREATE_TIME, new 
BsonInt64(entity.getEventEniq().createdTime));
             doc.put(MongoDedupEventsStore.DEDUP_TIMESTAMP, new 
BsonInt64(entity.getEventEniq().timestamp));
+            doc.put(MongoDedupEventsStore.DEDUP_REMOVABLE, new 
BsonBoolean(entity.getEventEniq().removable));
 
             List<BsonDocument> dedupCustomFieldValues = new 
ArrayList<BsonDocument>();
             for (Entry<String, String> entry : 
entity.getEventEniq().customFieldValues.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
index 5d7c67a..2a71411 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java
@@ -34,6 +34,7 @@ public class EventUniq {
     public Long timestamp;     // event's createTimestamp
     public long createdTime; // created time, for cache removal;
     public HashMap<String, String> customFieldValues;
+    public boolean removable = false;
 
     public EventUniq(String streamId, String policyId, long timestamp) {
         this.streamId = streamId;
@@ -75,7 +76,7 @@ public class EventUniq {
 
     @Override
     public String toString() {
-        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: 
%s, customFieldValues: %s]",
-            streamId, policyId, timestamp, 
Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
+        return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: 
%s, removable: %s, customFieldValues: %s]",
+            streamId, policyId, timestamp, removable, 
Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues));
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
index 8821d3e..334db29 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java
@@ -16,6 +16,10 @@
  */
 package org.apache.eagle.alert.engine.nodata;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.eagle.alert.engine.Collector;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
@@ -24,16 +28,11 @@ import 
org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
 import 
org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
 public class TestNoDataPolicyTimeBatchHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class);
@@ -96,8 +95,11 @@ public class TestNoDataPolicyTimeBatchHandler {
     private PolicyDefinition buildPolicyDef_dynamic() {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("PT5S,dynamic,1,host");
+        def.setValue("PT5S,dynamic");
         def.setType("nodataalert");
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put("nodataColumnName", "host");
+        def.setProperties(properties);
         pd.setDefinition(def);
         pd.setInputStreams(Arrays.asList(inputStream));
         pd.setOutputStreams(Arrays.asList(outputStream));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/020a5b3c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
new file mode 100644
index 0000000..c50b582
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheStoreTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.dedup;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import 
org.apache.eagle.alert.engine.publisher.dedup.DedupEventsStoreFactory.DedupEventsStoreType;
+import org.apache.eagle.alert.engine.publisher.impl.EventUniq;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+public class DedupCacheStoreTest extends MongoDependencyBaseTest {
+
+       @Test
+       public void testPersistUpdatedEventUniq() throws Exception {
+               StreamDefinition stream = createStream();
+               PolicyDefinition policy = createPolicy(stream.getStreamId(), 
"testPolicy");
+               
+               AlertStreamEvent event = createEvent(stream, policy, new 
Object[] {
+                               System.currentTimeMillis(), "host1", 
"testPolicy-host1-01", "OPEN", 0, 0
+               });
+               
+               HashMap<String, String> dedupFieldValues = new HashMap<String, 
String>();
+               dedupFieldValues.put("alertKey", (String) 
event.getData()[event.getSchema().getColumnIndex("alertKey")]);
+               EventUniq eventUniq = new EventUniq(event.getStreamId(), 
event.getPolicyId(), event.getCreatedTime(), dedupFieldValues);
+               
+               System.setProperty("config.resource", 
"/application-mongo-statestore.conf");
+               Config config = ConfigFactory.load();
+               DedupCache cache = DedupCache.getInstance(config);
+               cache.addOrUpdate(eventUniq, (String) 
event.getData()[event.getSchema().getColumnIndex("state")]);
+               
+               DedupEventsStore accessor = 
DedupEventsStoreFactory.getStore(DedupEventsStoreType.Mongo, config);
+               Map<EventUniq, ConcurrentLinkedDeque<DedupValue>> events = 
accessor.getEvents();
+               for (EventUniq one : events.keySet()) {
+                       if (one.equals(eventUniq)) {
+                               Assert.assertEquals(false, one.removable);
+                       }
+               }
+               
+               eventUniq.removable = true;
+               cache.persistUpdatedEventUniq(eventUniq);
+               
+               events = accessor.getEvents();
+               for (EventUniq one : events.keySet()) {
+                       if (one.equals(eventUniq)) {
+                               Assert.assertEquals(true, one.removable);
+                       }
+               }
+       }
+       
+       private AlertStreamEvent createEvent(StreamDefinition stream, 
PolicyDefinition policy, Object[] data) {
+               AlertStreamEvent event = new AlertStreamEvent();
+               event.setPolicyId(policy.getName());
+               event.setSchema(stream);
+               event.setStreamId(stream.getStreamId());
+               event.setTimestamp(System.currentTimeMillis());
+               event.setCreatedTime(System.currentTimeMillis());
+               event.setData(data);
+               return event;
+       }
+       
+       private StreamDefinition createStream() {
+               StreamDefinition sd = new StreamDefinition();
+               StreamColumn tsColumn = new StreamColumn();
+               tsColumn.setName("timestamp");
+               tsColumn.setType(StreamColumn.Type.LONG);
+               
+               StreamColumn hostColumn = new StreamColumn();
+               hostColumn.setName("host");
+               hostColumn.setType(StreamColumn.Type.STRING);
+               
+               StreamColumn alertKeyColumn = new StreamColumn();
+               alertKeyColumn.setName("alertKey");
+               alertKeyColumn.setType(StreamColumn.Type.STRING);
+
+               StreamColumn stateColumn = new StreamColumn();
+               stateColumn.setName("state");
+               stateColumn.setType(StreamColumn.Type.STRING);
+               
+               // dedupCount, dedupFirstOccurrence
+               
+               StreamColumn dedupCountColumn = new StreamColumn();
+               dedupCountColumn.setName("dedupCount");
+               dedupCountColumn.setType(StreamColumn.Type.LONG);
+               
+               StreamColumn dedupFirstOccurrenceColumn = new StreamColumn();
+               dedupFirstOccurrenceColumn.setName("dedupFirstOccurrence");
+               dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG);
+               
+               sd.setColumns(Arrays.asList(tsColumn, hostColumn, 
alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn));
+               sd.setDataSource("testDatasource");
+               sd.setStreamId("testStream");
+               sd.setDescription("test stream");
+               return sd;
+       }
+       
+       private PolicyDefinition createPolicy(String streamName, String 
policyName) {
+               PolicyDefinition pd = new PolicyDefinition();
+               PolicyDefinition.Definition def = new 
PolicyDefinition.Definition();
+               //expression, something like "PT5S,dynamic,1,host"
+               def.setValue("test");
+               def.setType("siddhi");
+               pd.setDefinition(def);
+               pd.setInputStreams(Arrays.asList("inputStream"));
+               pd.setOutputStreams(Arrays.asList("outputStream"));
+               pd.setName(policyName);
+               pd.setDescription(String.format("Test policy for stream %s", 
streamName));
+               
+               StreamPartition sp = new StreamPartition();
+               sp.setStreamId(streamName);
+               sp.setColumns(Arrays.asList("host"));
+               sp.setType(StreamPartition.Type.GROUPBY);
+               pd.addPartition(sp);
+               return pd;
+       }
+       
+}

Reply via email to