http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java index 18089a9..e6b510a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/FloatSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java index d2473a9..f784456 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/IntegerSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java index 76d2294..39baf2b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/JavaObjectSerializer.java @@ -16,14 +16,14 @@ */ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.commons.lang3.SerializationUtils; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.eagle.alert.engine.serialization.Serializer; + public class JavaObjectSerializer implements Serializer<Object> { @Override public void serialize(Object value, DataOutput dataOutput) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java index 8d85c76..116b275 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/LongSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java index 714920e..5a3d77d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/PartitionedEventSerializerImpl.java @@ -16,8 +16,10 @@ */ package org.apache.eagle.alert.engine.serialization.impl; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.model.StreamEvent; @@ -26,7 +28,8 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider import org.apache.eagle.alert.engine.serialization.Serializer; import org.apache.eagle.alert.engine.utils.CompressionUtils; -import java.io.*; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; /** * Stream Metadata Cached Serializer http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java index 113816f..0fb686b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamEventSerializer.java @@ -16,6 +16,11 @@ */ package org.apache.eagle.alert.engine.serialization.impl; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.BitSet; + import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.StreamEvent; @@ -23,11 +28,6 @@ import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider import org.apache.eagle.alert.engine.serialization.Serializer; import org.apache.eagle.alert.engine.serialization.Serializers; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.BitSet; - /** * @see StreamEvent */ http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java index 1268cb8..f35da39 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionDigestSerializer.java @@ -16,13 +16,19 @@ */ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.serialization.Serializer; - -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.*; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.serialization.Serializer; /** * Don't serialize streamId http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java index 67de517..4105277 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StreamPartitionSerializer.java @@ -16,16 +16,16 @@ */ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Don't serialize streamId * http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java index 2a1541a..940024d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/StringSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index cd23405..f54d5cd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -304,7 +304,12 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{ String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum"); - BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum); + BrokerHosts hosts = null; + if (config.hasPath("spout.kafkaBrokerZkBasePath")) { + hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath")); + } else { + hosts = new ZkHosts(kafkaBrokerZkQuorum); + } String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT; if(config.hasPath("spout.stormKafkaTransactionZkPath")) { transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath"); @@ -335,10 +340,14 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime"); } - spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic)); + spoutConfig.scheme = new SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic, conf)); KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, kafkaSpoutMetric); SpoutOutputCollectorWrapper collectorWrapper = new SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, numOfRouterBolts, sds,this.serializer); wrapper.open(conf, context, collectorWrapper); + + if (LOG.isInfoEnabled()) { + LOG.info("create and open kafka wrapper: topic {}, scheme class{} ", topic, schemeClsName); + } return wrapper; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java index a2c9219..7e0dd3f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java @@ -21,23 +21,34 @@ package org.apache.eagle.alert.engine.spout; import java.util.Properties; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; import org.slf4j.Logger; /** * normally this is used in unit test for convenience */ public class CreateTopicUtils { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class); + private static final int partitions = 2; private static final int replicationFactor = 1; - public static void ensureTopicReady(String zkQuorum, String topic){ + + public static void ensureTopicReady(String zkQuorum, String topic) { + ZkConnection zkConnection = new ZkConnection(zkQuorum); ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); - if(!AdminUtils.topicExists(zkClient, topic)) { - LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + replicationFactor); - AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties()); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + if (!AdminUtils.topicExists(zkUtils, topic)) { + LOG.info("create topic " + topic + " with partitions " + partitions + ", and replicationFactor " + + replicationFactor); + AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, new Properties(), + RackAwareMode.Disabled$.MODULE$); } } + } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java index 223f1b5..bfd5da7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java @@ -19,6 +19,8 @@ package org.apache.eagle.alert.engine.spout; +import java.util.Map; + import backtype.storm.spout.Scheme; @@ -28,8 +30,10 @@ import backtype.storm.spout.Scheme; * 2) has one constructor with topic name as parameter */ public class SchemeBuilder { - public static Scheme buildFromClsName(String clsName, String topic) throws Exception{ - Object o = Class.forName(clsName).getConstructor(String.class).newInstance(topic); + + @SuppressWarnings("rawtypes") + public static Scheme buildFromClsName(String clsName, String topic, Map conf) throws Exception{ + Object o = Class.forName(clsName).getConstructor(String.class, Map.class).newInstance(topic, conf); return (Scheme)o; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java index b37f7b3..2f7cc68 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java @@ -18,7 +18,6 @@ */ package org.apache.eagle.alert.engine.spout; -import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -112,7 +111,7 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements Object streamId = convertedTuple.get(1); StreamDefinition sd = sds.get(streamId); - if(sd == null){ + if (sd == null) { LOG.warn("StreamDefinition {} is not found within {}, ignore this message", streamId, sds); spout.ack(newMessageId); return null; @@ -141,17 +140,17 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements } // send message to StreamRouterBolt PartitionedEvent pEvent = new PartitionedEvent(event, groupingStrategy.partition, hash); - if(this.serializer == null){ - delegate.emit(sid, Collections.singletonList(pEvent), newMessageId); - }else { + if (this.serializer == null) { + delegate.emit(sid, Collections.singletonList(pEvent), newMessageId); + } else { try { delegate.emit(sid, Collections.singletonList(serializer.serialize(pEvent)), newMessageId); - } catch (IOException e) { - LOG.error("Failed to serialize {}", pEvent, e); - throw new RuntimeException(e); + } catch (Exception e) { + LOG.error("Failed to serialize {}, this message would be ignored!", pEvent, e); + spout.ack(newMessageId); } } - }else{ + } else { // ******* short-cut ack ******** // we should simply ack those messages which are not processed in this topology because KafkaSpout implementation requires _pending is empty // before moving to next offsets. http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java index f526cad..0fbe7b3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java @@ -16,12 +16,15 @@ */ package org.apache.eagle.alert.engine.utils; -import com.google.common.io.ByteStreams; - -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import com.google.common.io.ByteStreams; + public class CompressionUtils { public static byte[] compress(byte[] source) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf index bfb5f54..e3be1b7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf @@ -24,7 +24,7 @@ "localMode" : "true" }, "spout" : { - "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181", + "kafkaBrokerZkQuorum": "10.254.194.245:2181", "kafkaBrokerZkBasePath": "/brokers", "stormKafkaUseSameZkQuorumWithKafkaBroker": true, "stormKafkaTransactionZkQuorum": "", @@ -34,8 +34,8 @@ "stormKafkaFetchSizeBytes": 1048586, }, "zkConfig" : { - "zkQuorum" : "sandbox.hortonworks.com:2181", - "zkRoot" : "/alert", + "zkQuorum" : "10.254.194.245:2181", + "zkRoot" : "/kafka", "zkSessionTimeoutMs" : 10000, "connectionTimeoutMs" : 10000, "zkRetryTimes" : 3, @@ -47,21 +47,21 @@ }, "metadataService": { "context" : "/rest", - "host" : "localhost", - "port" : 58080 + "host" : "127.0.0.1", + "port" : 8080 }, "coordinatorService": { "host": "localhost", - "port": 58080, + "port": 9090, "context" : "/rest" } "metric":{ "sink": { -// "kafka": { -// "topic": "alert_metric" -// "bootstrap.servers": "localhost:6667" -// } - "stdout": {} + // "kafka": { + // "topic": "alert_metric" + // "bootstrap.servers": "localhost:6667" + // } + // "stdout": {} // "elasticsearch": { // "hosts": ["localhost:9200"] // "index": "alert_metric" http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties index af99e2c..5e3d3b1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties @@ -21,6 +21,7 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n ##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG +log4j.logger.org.apache.eagle.alert.metric=ERROR log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java index 057aa73..927dfd7 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java @@ -17,11 +17,20 @@ package org.apache.eagle.alert.engine.e2e; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.eagle.alert.config.ZKConfig; +import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.internal.Topology; import org.apache.eagle.alert.engine.UnitTopologyMain; @@ -30,7 +39,10 @@ import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; +import org.apache.eagle.alert.utils.KafkaEmbedded; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; @@ -53,6 +65,7 @@ import com.typesafe.config.ConfigFactory; * */ public class Integration1 { + private static final String SIMPLE_CONFIG = "/simple/application-integration.conf"; private static final Logger LOG = LoggerFactory.getLogger(Integration1.class); private static final ObjectMapper om = new ObjectMapper(); @@ -61,16 +74,36 @@ public class Integration1 { inte.args = args; inte.test_simple_threshhold(); } - + private String[] args; - private ExecutorService executors = Executors.newFixedThreadPool(5); + private ExecutorService executors = Executors.newFixedThreadPool(5, new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(); + t.setDaemon(true); + return t; + } + }); + private static KafkaEmbedded kafka; + + @BeforeClass + public static void setup() { +// kafka = new KafkaEmbedded(9092, 2181); +// makeSureTopic("perfmon_metrics"); + } + + @AfterClass + public static void end() { + if (kafka != null) { + kafka.shutdown(); + } + } /** * Assumption: * <p> - * start metadata service 8080, better in docker - * <p> - * start coordinator service 9090, better in docker + * start metadata service 8080 /rest * <p> * datasources : perfmon_datasource * <p> @@ -84,20 +117,29 @@ public class Integration1 { * * @throws InterruptedException */ - @Ignore @Test public void test_simple_threshhold() throws Exception { - System.setProperty("config.resource", "/application-integration.conf"); + System.setProperty("config.resource", SIMPLE_CONFIG); ConfigFactory.invalidateCaches(); Config config = ConfigFactory.load(); System.out.println("loading metadatas..."); - loadMetadatas("/", config); + loadMetadatas("/simple/", config); System.out.println("loading metadatas done!"); + if (args == null) { + args = new String[] { "-f", "simple/application-integration.conf" }; + } + executors.submit(() -> SampleClient1.main(args)); - executors.submit(() -> UnitTopologyMain.main(args)); + executors.submit(() -> { + try { + UnitTopologyMain.main(args); + } catch (Exception e) { + e.printStackTrace(); + } + }); Utils.sleep(1000 * 5l); while (true) { @@ -107,40 +149,20 @@ public class Integration1 { } } - /** - * Test only run expected when there is a missed config in the config file. mark as ignored - * @throws InterruptedException - * @throws ExecutionException - */ - @Ignore - @Test(expected = ExecutionException.class) - public void test_typesafe_config() throws InterruptedException, ExecutionException { - System.setProperty("config.resource", "/application-integration.conf"); + public static void makeSureTopic(String topic) { + System.setProperty("config.resource", SIMPLE_CONFIG); ConfigFactory.invalidateCaches(); - Future<?> f = executors.submit(() -> { - UnitTopologyMain.main(null); - }); + Config config = ConfigFactory.load(); + ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config); - f.get(); + ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$); + Properties topicConfiguration = new Properties(); + ZkConnection zkConnection = new ZkConnection(zkconfig.zkQuorum); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + AdminUtils.createTopic(zkUtils, topic, 1, 1, topicConfiguration, RackAwareMode.Disabled$.MODULE$); } -// @Test -// private void makeSureTopic() { -// System.setProperty("config.resource", "/application-integration.conf"); -// ConfigFactory.invalidateCaches(); -// Config config = ConfigFactory.load(); -// ZKConfig zkconfig = ZKConfigBuilder.getZKConfig(config); -// -// CuratorFramework curator = CuratorFrameworkFactory.newClient( -// zkconfig.zkQuorum, -// zkconfig.zkSessionTimeoutMs, -// zkconfig.connectionTimeoutMs, -// new RetryNTimes(zkconfig.zkRetryTimes, zkconfig.zkRetryInterval) -// ); -// } - public static void proactive_schedule(Config config) throws Exception { - try (CoordinatorClient cc = new CoordinatorClient(config)) { try { String resp = cc.schedule(); @@ -205,7 +227,7 @@ public class Integration1 { public void testJson() throws Exception { { JavaType type = CollectionType.construct(List.class, SimpleType.construct(Topology.class)); - List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/topologies.json"), + List<Topology> l = om.readValue(Integration1.class.getResourceAsStream("/simple/topologies.json"), type); Topology t = (Topology) l.get(0); @@ -216,16 +238,16 @@ public class Integration1 { { JavaType type = CollectionType.construct(List.class, SimpleType.construct(Publishment.class)); // publishment - List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/publishments.json"), type); + List<Publishment> l = om.readValue(Integration1.class.getResourceAsStream("/simple/publishments.json"), type); Publishment p = l.get(0); Assert.assertEquals("KAFKA", p.getType()); } - checkAll("/"); + checkAll("/simple/"); checkAll("/correlation/"); } - private void checkAll(String base) throws Exception { + public static void checkAll(String base) throws Exception { loadEntities(base + "datasources.json", Kafka2TupleMetadata.class); loadEntities(base + "policies.json", PolicyDefinition.class); loadEntities(base + "publishments.json", Publishment.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java index 7ea0e7e..a11cc66 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java @@ -67,7 +67,12 @@ public class Integration2 { Config config = ConfigFactory.load(); Integration1.loadMetadatas("/correlation/", config); - executors.submit(() -> UnitTopologyMain.main(args)); + executors.submit(() -> { + try { + UnitTopologyMain.main(args); + } catch (Exception e) { + e.printStackTrace(); + }}); executors.submit(() -> SampleClient2.main(args)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java new file mode 100644 index 0000000..667d241 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.e2e; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.eagle.alert.engine.UnitTopologyMain; +import org.apache.eagle.alert.utils.KafkaEmbedded; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import backtype.storm.utils.Utils; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * Since 6/29/16. + */ +public class Integration4NoDataAlert { + private String[] args; + + private ExecutorService executors = Executors.newFixedThreadPool(5); + + private static KafkaEmbedded kafka; + + @BeforeClass + public static void setup() { + // FIXME : start local kafka + } + + @AfterClass + public static void end() { + if (kafka != null) { + kafka.shutdown(); + } + } + @Test + public void testTriggerNoData() throws Exception{ + System.setProperty("config.resource", "/nodata/application-nodata.conf"); + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.load(); + + System.out.println("loading metadatas..."); + Integration1.loadMetadatas("/nodata/", config); + System.out.println("loading metadatas done!"); + + + executors.submit(() -> { + try { + UnitTopologyMain.main(args); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + // wait 20 seconds for topology to bring up + try{ + Thread.sleep(20000); + }catch(Exception ex){} + + // send mock data + executors.submit(() -> { + try { + SampleClient4NoDataAlert.main(args); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + + Utils.sleep(1000 * 5l); + while (true) { + Integration1.proactive_schedule(config); + + Utils.sleep(1000 * 60l * 5); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java new file mode 100644 index 0000000..c044da0 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.alert.engine.e2e; + +import java.util.List; + +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.service.IMetadataServiceClient; +import org.apache.eagle.alert.service.MetadataServiceClientImpl; +import org.junit.Ignore; +import org.junit.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class MetadataServiceClientImpTest { + + @Test @Ignore + public void test() { + System.out.println("loading metadatas..."); + try { + System.setProperty("config.resource", "/application-integration.conf"); + ConfigFactory.invalidateCaches(); + Config config = ConfigFactory.load(); + loadMetadatas("/", config); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("loading metadatas done!"); + } + + private void loadMetadatas(String base, Config config) throws Exception { + IMetadataServiceClient client = new MetadataServiceClientImpl(config); + client.clear(); + + List<Kafka2TupleMetadata> metadata = Integration1.loadEntities(base + "datasources.json", Kafka2TupleMetadata.class); + client.addDataSources(metadata); + + List<PolicyDefinition> policies = Integration1.loadEntities(base + "policies.json", PolicyDefinition.class); + client.addPolicies(policies); + + List<Publishment> pubs = Integration1.loadEntities(base + "publishments.json", Publishment.class); + client.addPublishments(pubs); + + List<StreamDefinition> defs = Integration1.loadEntities(base + "streamdefinitions.json", StreamDefinition.class); + client.addStreamDefinitions(defs); + + List<Topology> topos = Integration1.loadEntities(base + "topologies.json", Topology.class); + client.addTopologies(topos); + + client.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java deleted file mode 100644 index 12fbfea..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.e2e; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.List; - -public class MetadataServiceTest { - - @Ignore - @Test - public void test() { - System.out.println("loading metadatas..."); - try { - System.setProperty("config.resource", "/application-integration.conf"); - ConfigFactory.invalidateCaches(); - Config config = ConfigFactory.load(); - loadMetadatas("/", config); - } catch (Exception e) { - e.printStackTrace(); - } - System.out.println("loading metadatas done!"); - } - - private void loadMetadatas(String base, Config config) throws Exception { - IMetadataServiceClient client = new MetadataServiceClientImpl(config); - client.clear(); - - List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class); - client.addDataSources(metadata); - - List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class); - client.addPolicies(policies); - - List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class); - client.addPublishments(pubs); - - List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class); - client.addStreamDefinitions(defs); - - List<Topology> topos = loadEntities(base + "topologies.json", Topology.class); - client.addTopologies(topos); - - client.close(); - } - - private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { - ObjectMapper om = new ObjectMapper(); - JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); - List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), type); - return l; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java index 348bf78..6bff94b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import backtype.storm.utils.Utils; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + /** * @since May 9, 2016 * @@ -60,7 +63,8 @@ public class SampleClient1 { long base = System.currentTimeMillis(); AtomicLong msgCount = new AtomicLong(); - try (KafkaProducer<String, String> proceduer = createProceduer()) { + Config config = ConfigFactory.load(); + try (KafkaProducer<String, String> proceduer = createProceduer(config)) { while (true) { int hostIndex = 6; for (int i = 0; i < hostIndex; i++) { @@ -108,12 +112,10 @@ public class SampleClient1 { return Pair.of(base, JsonUtils.writeValueAsString(e)); } - public static KafkaProducer<String, String> createProceduer() { - + public static KafkaProducer<String, String> createProceduer(Config config) { + String servers = config.getString("kafkaProducer.bootstrapServers"); Properties configMap = new Properties(); - // String broker_list = zkconfig.zkQuorum; - // TODO: replace boot strap servers with new workable server - configMap.put("bootstrap.servers", "localhost:9092"); + configMap.put("bootstrap.servers", servers); // configMap.put("metadata.broker.list", broker_list); configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java index 06148cc..ad0079c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java @@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import backtype.storm.utils.Utils; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + /** * @since May 10, 2016 * @@ -51,7 +54,6 @@ public class SampleClient2 { public String message; public String host; } - /** * @param args @@ -60,8 +62,10 @@ public class SampleClient2 { AtomicLong base1 = new AtomicLong(System.currentTimeMillis()); AtomicLong base2 = new AtomicLong(System.currentTimeMillis()); AtomicLong count = new AtomicLong(); + + Config config = ConfigFactory.load(); - try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer()) { + try (KafkaProducer<String, String> proceduer = SampleClient1.createProceduer(config)) { while (true) { nextUuid = String.format(instanceUuidTemp, UUID.randomUUID().toString()); nextReqId = String.format(reqIdTemp, UUID.randomUUID().toString()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java new file mode 100644 index 0000000..f0e0d80 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.e2e; + +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import backtype.storm.utils.Utils; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * Since 6/29/16. + */ +@SuppressWarnings({ "rawtypes", "unchecked"}) +public class SampleClient4NoDataAlert { + private static final Logger LOG = LoggerFactory.getLogger(SampleClient4NoDataAlert.class); + private static long currentTimestamp = 1467240000000L; + private static long interval = 3000L; + private static volatile boolean host1Muted = false; + private static volatile boolean host2Muted = false; + public static void main(String[] args) throws Exception { + System.setProperty("config.resource", "/nodata/application-nodata.conf"); + ConfigFactory.invalidateCaches(); + + Config config = ConfigFactory.load(); + KafkaProducer producer = createProducer(config); + ProducerRecord record = null; + Thread x = new MuteThread(); + x.start(); + while(true) { + if(!host1Muted) { + record = new ProducerRecord("noDataAlertTopic", createEvent("host1")); + producer.send(record); + } + if(!host2Muted) { + record = new ProducerRecord("noDataAlertTopic", createEvent("host2")); + producer.send(record); + } + record = new ProducerRecord("noDataAlertTopic", createEvent("host3")); + producer.send(record); + Utils.sleep(interval); + currentTimestamp += interval; + } + } + + private static class MuteThread extends Thread{ + @Override + public void run(){ + try { + // sleep 10 seconds + Thread.sleep(10000); + // mute host1 + LOG.info("mute host1"); + host1Muted = true; + // sleep 70 seconds for triggering no data alert + LOG.info("try to sleep 70 seconds for triggering no data alert"); + Thread.sleep(70000); + // unmute host1 + LOG.info("unmute host1"); + host1Muted = false; + Thread.sleep(10000); + // mute host2 + LOG.info("mute host2"); + host2Muted = true; + // sleep 70 seconds for triggering no data alert + LOG.info("try to sleep 70 seconds for triggering no data alert"); + Thread.sleep(70000); + LOG.info("unmute host2"); + host2Muted = false; + }catch(Exception ex){ + ex.printStackTrace(); + } + } + } + + private static class NoDataEvent{ + @JsonProperty + long timestamp; + @JsonProperty + String host; + @JsonProperty + double value; + + public String toString(){ + return "timestamp=" + timestamp + ",host=" + host + ",value=" + value; + } + } + + private static String createEvent(String host) throws Exception{ + NoDataEvent e = new NoDataEvent(); + long expectTS = currentTimestamp + interval; + // adjust back 1 second random + long adjust = Math.round(2*Math.random()); + e.timestamp = expectTS-adjust; + e.host = host; + e.value = 25.6; + LOG.info("sending event {} ", e); + ObjectMapper mapper = new ObjectMapper(); + String value = mapper.writeValueAsString(e); + return value; + } + + + public static KafkaProducer<String, String> createProducer(Config config) { + String servers = config.getString("kafkaProducer.bootstrapServers"); + Properties configMap = new Properties(); + configMap.put("bootstrap.servers", servers); + configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + configMap.put("request.required.acks", "1"); + configMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + configMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaProducer<String, String> proceduer = new KafkaProducer<String, String>(configMap); + return proceduer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java new file mode 100644 index 0000000..f97b1a8 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.nodata; + +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow; +import org.junit.Test; + +/** + * Since 6/28/16. + */ +public class TestDistinctValuesInTimeWindow { + @Test + public void test(){ + DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60*1000); + window.send("1", 0); + window.send("2", 1000); + window.send("3", 1000); + window.send("1", 30000); + window.send("2", 50000); + window.send("1", 62000); + Map<Object, Long> values = window.distinctValues(); + System.out.println(values); + } + + @Test + public void testSort(){ + SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap = + new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator()); + DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0); + timeSortedMap.put(vt1, vt1); + DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000); + timeSortedMap.put(vt2, vt2); + DistinctValuesInTimeWindow.ValueAndTime vt3 = new DistinctValuesInTimeWindow.ValueAndTime("3", 1000); + timeSortedMap.put(vt3, vt3); + timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("1", 0)); + DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000); + timeSortedMap.put(vt4, vt4); + Iterator<?> it = timeSortedMap.entrySet().iterator(); + while(it.hasNext()){ + System.out.println(it.next()); + } + timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000)); + DistinctValuesInTimeWindow.ValueAndTime vt5 = new DistinctValuesInTimeWindow.ValueAndTime("2", 50000); + timeSortedMap.put(vt5, vt5); + DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000); + timeSortedMap.put(vt6, vt6); + it = timeSortedMap.entrySet().iterator(); + while(it.hasNext()){ + System.out.println(it.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java new file mode 100644 index 0000000..4149e17 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.nodata; + +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.output.StreamCallback; +import org.wso2.siddhi.core.util.EventPrinter; + +/** + * Since 6/27/16. + */ +public class TestEventTable { + @Test + public void test() throws Exception{ + ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( + "define stream expectStream (key string, src string);"+ + "define stream appearStream (key string, src string);"+ + "define table expectTable (key string, src string);"+ + "from expectStream insert into expectTable;" + + "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;" + ); + + runtime.addCallback("outputStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + } + }); + + runtime.start(); + runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{"host1","expectStream"}); + Thread.sleep(2000); + runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{"host2","expectStream"}); + Thread.sleep(2000); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java new file mode 100644 index 0000000..569a3b0 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.nodata; + +import org.junit.Test; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.output.StreamCallback; +import org.wso2.siddhi.core.util.EventPrinter; + +/** + * Since 6/27/16. + */ +public class TestNoDataAlert { + @Test + public void test() throws Exception{ + String[] expectHosts = new String[]{"host_1","host_2","host_3","host_4","host_5","host_6","host_7","host_8"}; +// String[] appearHosts = new String[]{"host_6","host_7","host_8"}; +// String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"}; + + ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( + "define stream appearStream (key string, src string);"+ + "define stream expectStream (key string, src string);"+ + "define table expectTable (key string, src string);"+ + "define trigger fiveSecTriggerStream at every 1 sec;"+ + "define trigger initAppearTriggerStream at 'start';"+ + "from expectStream insert into expectTable;"+ + "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;"+ + "from initAppearTriggerStream join expectTable insert into initAppearStream;" +// "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" + +// "from joinStream[k2 is null] select k1 insert current events into missingStream;" + ); + +// ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( +// "define stream appearStream (key string, src string);"+ +// "define stream expectStream (key string, src string);"+ +// "define table expectTable (key string, src string);"+ +// "from expectStream insert into expectTable;"+ +// "from appearStream#window.time(10 sec) as l right outer join expectTable as r on l.key == r.key select r.key as k2, l.key as k1 insert current events into joinStream;" + +// "from joinStream[k1 is null] select k2 insert current events into missingStream;" +//// "from joinStream insert into missingStream;" +// +// ); + + runtime.addCallback("initAppearStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + } + }); + + runtime.start(); + for(String host: expectHosts) { + runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[]{host,"expectStream"}); + } + +// for(String host:appearHosts) { +// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"}); +// } + + Thread.sleep(5000); + +// for(String host:appearHosts) { +// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"}); +// } +// Thread.sleep(10000); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java new file mode 100644 index 0000000..6c48def --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.nodata; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.eagle.alert.engine.Collector; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; +import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.model.StreamEvent; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Since 6/29/16. + */ +public class TestNoDataPolicyHandler { + private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyHandler.class); + private static final String inputStream = "testInputStream"; + private static final String outputStream = "testOutputStream"; + + @Test + public void test() throws Exception{ + test(buildPolicyDef_provided()); + test(buildPolicyDef_dynamic()); + } + + @SuppressWarnings("unchecked") + public void test(PolicyDefinition pd) throws Exception{ + Map<String, StreamDefinition> sds = new HashMap<>(); + StreamDefinition sd = buildStreamDef(); + sds.put("testInputStream", sd); + NoDataPolicyHandler handler = new NoDataPolicyHandler(sds); + + PolicyHandlerContext context = new PolicyHandlerContext(); + context.setPolicyDefinition(pd); + handler.prepare(new TestCollector(), context); + + handler.send(buildStreamEvt(0, "host1", 12.5)); + handler.send(buildStreamEvt(0, "host2", 12.6)); + handler.send(buildStreamEvt(100, "host1", 20.9)); + handler.send(buildStreamEvt(120, "host2", 22.1)); + handler.send(buildStreamEvt(4000, "host2", 22.1)); + handler.send(buildStreamEvt(50000, "host2", 22.1)); + handler.send(buildStreamEvt(60150, "host2", 22.3)); + handler.send(buildStreamEvt(60450, "host2", 22.9)); + handler.send(buildStreamEvt(75000, "host1", 41.6)); + handler.send(buildStreamEvt(85000, "host2", 45.6)); + } + + @SuppressWarnings("rawtypes") + private static class TestCollector implements Collector{ + @Override + public void emit(Object o) { + AlertStreamEvent e = (AlertStreamEvent)o; + Object[] data = e.getData(); + Assert.assertEquals("host2", data[1]); + LOG.info(e.toString()); + } + } + + private PolicyDefinition buildPolicyDef_provided(){ + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("PT1M,provided,1,host,host1,host2"); + def.setType("string"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList(inputStream)); + pd.setOutputStreams(Arrays.asList(outputStream)); + pd.setName("nodataalert-test"); + return pd; + } + + private PolicyDefinition buildPolicyDef_dynamic(){ + PolicyDefinition pd = new PolicyDefinition(); + PolicyDefinition.Definition def = new PolicyDefinition.Definition(); + def.setValue("PT1M,dynamic,1,host"); + def.setType("string"); + pd.setDefinition(def); + pd.setInputStreams(Arrays.asList(inputStream)); + pd.setOutputStreams(Arrays.asList(outputStream)); + pd.setName("nodataalert-test"); + return pd; + } + private StreamDefinition buildStreamDef(){ + StreamDefinition sd = new StreamDefinition(); + StreamColumn tsColumn = new StreamColumn(); + tsColumn.setName("timestamp"); + tsColumn.setType(StreamColumn.Type.LONG); + + StreamColumn hostColumn = new StreamColumn(); + hostColumn.setName("host"); + hostColumn.setType(StreamColumn.Type.STRING); + + StreamColumn valueColumn = new StreamColumn(); + valueColumn.setName("value"); + valueColumn.setType(StreamColumn.Type.DOUBLE); + + sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); + sd.setDataSource("testDataSource"); + sd.setStreamId("testStreamId"); + return sd; + } + + private StreamEvent buildStreamEvt(long ts, String host, double value){ + StreamEvent e = new StreamEvent(); + e.setData(new Object[]{ts, host, value}); + e.setStreamId(inputStream); + e.setTimestamp(ts); + return e; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index af79f96..23ddd69 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -18,10 +18,10 @@ package org.apache.eagle.alert.engine.router; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.eagle.alert.coordination.model.PublishSpec; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.Publishment; @@ -35,24 +35,25 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.SimpleType; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * @Since 5/14/16. */ public class TestAlertPublisherBolt { + @SuppressWarnings("rawtypes") @Ignore @Test public void test() { Config config = ConfigFactory.load("application-test.conf"); AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt"); - publisher.init(config); + publisher.init(config, new HashMap()); PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); publisher.onPublishChange(spec.getPublishments(), null, null, null); AlertStreamEvent event = create("testAlertStream"); @@ -95,7 +96,7 @@ public class TestAlertPublisherBolt { @Test public void testAlertPublisher() throws Exception { AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test"); - List<Publishment> oldPubs = loadEntities("/publishments.json", Publishment.class); + List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class); List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class); alertPublisher.onPublishChange(oldPubs, null, null, null); alertPublisher.onPublishChange(null, null, newPubs, oldPubs); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java index 8c048cb..2de2073 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java @@ -16,16 +16,19 @@ */ package org.apache.eagle.alert.engine.runner; -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; import org.apache.eagle.alert.coordination.model.RouterSpec; import org.apache.eagle.alert.coordination.model.StreamRouterSpec; @@ -45,12 +48,17 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.GeneralTopologyContext; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleImpl; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; public class TestStreamRouterBolt { private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java index 0347d50..a756ebe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java @@ -16,14 +16,9 @@ */ package org.apache.eagle.alert.engine.serialization; -import backtype.storm.serialization.DefaultKryoFactory; -import backtype.storm.serialization.DefaultSerializationDelegate; -import com.esotericsoftware.kryo.*; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.util.BitSet; + import org.apache.commons.lang.time.StopWatch; import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; import org.apache.eagle.alert.engine.model.PartitionedEvent; @@ -35,12 +30,20 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.BitSet; +import backtype.storm.serialization.DefaultKryoFactory; +import backtype.storm.serialization.DefaultSerializationDelegate; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; public class PartitionedEventSerializerTest { private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class); + @SuppressWarnings("deprecation") @Test public void testPartitionEventSerialization() throws IOException { PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());; @@ -76,6 +79,7 @@ public class PartitionedEventSerializerTest { Assert.assertEquals(partitionedEvent,kryoDeserializedEvent); LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}",serializedBytes.length,serializedBytesCompressed.length,serializedBytes2.length,javaSerialization.length,kryoBytes.length,kryoSerialize(serializedBytes).length,kryoSerialize(serializedBytes2).length); } + @SuppressWarnings("deprecation") @Test public void testPartitionEventSerializationEfficiency() throws IOException { PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream",System.currentTimeMillis());;
