http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java index 2bca329..57529b6 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java @@ -19,6 +19,10 @@ package org.apache.eagle.alert.engine; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; import org.apache.eagle.alert.config.ZKConfig; import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; @@ -40,19 +44,43 @@ import com.typesafe.config.ConfigFactory; */ public class UnitTopologyMain { - public static void main(String[] args) { + public static void main(String[] args) throws Exception { + // command line parse + Options options = new Options(); + options.addOption("c", true, + "config URL (valid file name) - defaults application.conf according to typesafe config default behavior."); + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + if (cmd.hasOption("c")) { + String fileName = cmd.getOptionValue("c", "application.conf"); + System.setProperty("config.resource", fileName.startsWith("/") ? fileName : "/" + fileName); + ConfigFactory.invalidateCaches(); + } Config config = ConfigFactory.load(); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - String topologyId = config.getString("topology.name"); - ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId); + // load config and start + String topologyId = config.getString("topology.name"); + ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); new UnitTopologyRunner(changeNotifyService).run(topologyId, config); } + + public static void runTopology(Config config, backtype.storm.Config stormConfig) { + // load config and start + String topologyId = config.getString("topology.name"); + ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); + new UnitTopologyRunner(changeNotifyService, stormConfig).run(topologyId, config); + } - public static StormTopology createTopology(Config config) { + private static ZKMetadataChangeNotifyService createZKNotifyService(Config config, String topologyId) { ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - String topologyId = config.getString("topology.name"); ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId); + return changeNotifyService; + } + + public static StormTopology createTopology(Config config) { + String topologyId = config.getString("topology.name"); + ZKMetadataChangeNotifyService changeNotifyService = createZKNotifyService(config, topologyId); return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config); }
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java index 2aa70e8..e8f736c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyStreamHandlers.java @@ -20,13 +20,17 @@ import java.util.Map; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.evaluator.impl.SiddhiPolicyHandler; +import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler; public class PolicyStreamHandlers { public static final String SIDDHI_ENGINE ="siddhi"; + public static final String NO_DATA_ALERT_ENGINE ="nodataalert"; public static PolicyStreamHandler createHandler(String type, Map<String, StreamDefinition> sds){ if(SIDDHI_ENGINE.equals(type)) { return new SiddhiPolicyHandler(sds); + }else if(NO_DATA_ALERT_ENGINE.equals(type)){ + return new NoDataPolicyHandler(sds); } throw new IllegalArgumentException("Illegal policy stream handler type: "+type); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java new file mode 100644 index 0000000..8a681da --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/DistinctValuesInTimeWindow.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.impl; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +/** + * Since 6/28/16. + * to get distinct values within a specified time window + * valueMaxTimeMap : each distinct value is associated with max timestamp it ever had + * timeSortedMap : map sorted by timestamp first and then value + * With the above 2 data structure, we can get distinct values in LOG(N) + */ +public class DistinctValuesInTimeWindow { + public static class ValueAndTime{ + Object value; + long timestamp; + public ValueAndTime(Object value, long timestamp){ + this.value = value; + this.timestamp = timestamp; + } + + public String toString(){ + return "[" + value + "," + timestamp + "]"; + } + + public int hashCode(){ + return new HashCodeBuilder().append(value).append(timestamp).toHashCode(); + } + + public boolean equals(Object that){ + if(!(that instanceof ValueAndTime)) + return false; + ValueAndTime another = (ValueAndTime)that; + return another.timestamp == this.timestamp && another.value.equals(this.value); + } + } + + public static class ValueAndTimeComparator implements Comparator<ValueAndTime>{ + @Override + public int compare(ValueAndTime o1, ValueAndTime o2) { + if(o1.timestamp != o2.timestamp) + return (o1.timestamp > o2.timestamp) ? 1 : -1; + if(o1.value.equals(o2.value)) + return 0; + else { + // this is not strictly correct, but I don't want to write too many comparators here :-) + if(o1.hashCode() > o2.hashCode()) + return 1; + else + return -1; + } + } + } + + /** + * map from value to max timestamp for this value + */ + private Map<Object, Long> valueMaxTimeMap = new HashMap<>(); + /** + * map sorted by time(max timestamp for the value) and then value + */ + private SortedMap<ValueAndTime, ValueAndTime> timeSortedMap = new TreeMap<>(new ValueAndTimeComparator()); + private long maxTimestamp = 0L; + private long window; + private boolean windowSlided; + + /** + * @param window - milliseconds + */ + public DistinctValuesInTimeWindow(long window){ + this.window = window; + } + + public void send(Object value, long timestamp){ + ValueAndTime vt = new ValueAndTime(value, timestamp); + + // todo think of time out of order + if(valueMaxTimeMap.containsKey(value)){ + // remove that entry with old timestamp in timeSortedMap + long oldTime = valueMaxTimeMap.get(value); + if(oldTime >= timestamp){ + // no any effect as the new timestamp is equal or even less than old timestamp + return; + } + timeSortedMap.remove(new ValueAndTime(value, oldTime)); + } + // insert entry with new timestamp in timeSortedMap + timeSortedMap.put(vt, vt); + // update new timestamp in valueMaxTimeMap + valueMaxTimeMap.put(value, timestamp); + + // evict old entries + // store max timestamp if possible + maxTimestamp = Math.max(maxTimestamp, timestamp); + + // check if some values should be evicted because of time window + Iterator<Map.Entry<ValueAndTime, ValueAndTime>> it = timeSortedMap.entrySet().iterator(); + while(it.hasNext()){ + Map.Entry<ValueAndTime, ValueAndTime> entry = it.next(); + if(entry.getKey().timestamp < maxTimestamp - window){ + // should remove the entry in valueMaxTimeMap and timeSortedMap + valueMaxTimeMap.remove(entry.getKey().value); + windowSlided = true; + + it.remove(); + }else { + break; + } + } + } + + public Map<Object, Long> distinctValues(){ + return valueMaxTimeMap; + } + + public boolean windowSlided(){ + return windowSlided; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java index f063618..8a1f04a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java @@ -90,7 +90,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { policyStreamHandler.getValue().send(partitionedEvent.getEvent()); } catch (Exception e) { this.context.counter().scope("fail_count").incr(); - LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent()); + LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent(), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java index dfa5612..ed26408 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java @@ -76,7 +76,9 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { */ @Override public void receive(Event[] events) { - LOG.info("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId()); + } for(Event e : events) { AlertStreamEvent event = new AlertStreamEvent(); event.setTimestamp(e.getTimestamp()); @@ -131,6 +133,10 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { if(inputHandler != null){ context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr(); inputHandler.send(event.getTimestamp(),event.getData()); + + if (LOG.isDebugEnabled()) { + LOG.debug("sent event to siddhi stream {} ", streamId); + } }else{ context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr(); LOG.warn("No input handler found for stream {}",streamId); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java new file mode 100644 index 0000000..ed13f71 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.nodata; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.eagle.alert.engine.Collector; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; +import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; +import org.apache.eagle.alert.engine.evaluator.impl.DistinctValuesInTimeWindow; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.model.StreamEvent; +import org.apache.eagle.alert.utils.TimePeriodUtils; +import org.joda.time.Period; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Since 6/28/16. + * No Data Policy engine + * based on the following information + * 1. stream definition: group by columns + * 2. timestamp field: timestamp column + * 3. wiri safe time window: how long window is good for full set of wiri + * 4. wisb: full set + * + * No data policy definition should include + * fixed fields and dynamic fields + * fixed fields are leading fields : windowPeriod, type, numOfFields, f1_name, f2_name + * dynamic fields depend on wisb type. + */ +public class NoDataPolicyHandler implements PolicyStreamHandler{ + private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class); + private Map<String, StreamDefinition> sds; + + // wisb(what is should be) set for expected full set value of multiple columns + @SuppressWarnings("rawtypes") + private volatile Set wisbValues = null; + private volatile List<Integer> wisbFieldIndices = new ArrayList<>(); + // reuse PolicyDefinition.defintion.value field to store full set of values separated by comma + private volatile PolicyDefinition policyDef; + private volatile DistinctValuesInTimeWindow distinctWindow; + private volatile Collector<AlertStreamEvent> collector; + private volatile PolicyHandlerContext context; + private volatile NoDataWisbType wisbType; + + public NoDataPolicyHandler(Map<String, StreamDefinition> sds){ + this.sds = sds; + } + @Override + public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { + this.collector = collector; + this.context = context; + this.policyDef = context.getPolicyDefinition(); + List<String> inputStreams = policyDef.getInputStreams(); + // validate inputStreams has to contain only one stream + if(inputStreams.size() != 1) + throw new IllegalArgumentException("policy inputStream size has to be 1 for no data alert"); + // validate outputStream has to contain only one stream + if(policyDef.getOutputStreams().size() != 1) + throw new IllegalArgumentException("policy outputStream size has to be 1 for no data alert"); + + String is = inputStreams.get(0); + StreamDefinition sd = sds.get(is); + + String policyValue = policyDef.getDefinition().getValue(); + // assume that no data alert policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value} + String[] segments = policyValue.split(","); + long windowPeriod = TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(segments[0])); + distinctWindow = new DistinctValuesInTimeWindow(windowPeriod); + this.wisbType = NoDataWisbType.valueOf(segments[1]); + // for provided wisb values, need to parse, for dynamic wisb values, it is computed through a window + if(wisbType == NoDataWisbType.provided) { + wisbValues = new NoDataWisbProvidedParser().parse(segments); + } + // populate wisb field names + int numOfFields = Integer.parseInt(segments[2]); + for(int i = 3; i < 3+numOfFields; i++){ + String fn = segments[i]; + wisbFieldIndices.add(sd.getColumnIndex(fn)); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void send(StreamEvent event) throws Exception { + Object[] data = event.getData(); + List<Object> columnValues = new ArrayList<>(); + for(int i=0; i<wisbFieldIndices.size(); i++){ + Object o = data[wisbFieldIndices.get(i)]; + // convert value to string + columnValues.add(o.toString()); + } + distinctWindow.send(columnValues, event.getTimestamp()); + Set wiriValues = distinctWindow.distinctValues().keySet(); + + LOG.debug("window slided: {}, with wiri: {}", distinctWindow.windowSlided(), distinctWindow.distinctValues()); + + if(distinctWindow.windowSlided()) { + compareAndEmit(wisbValues, wiriValues, event); + } + + if(wisbType == NoDataWisbType.dynamic) { + // deep copy + wisbValues = new HashSet<>(wiriValues); + } + } + + @SuppressWarnings("rawtypes") + private void compareAndEmit(Set wisb, Set wiri, StreamEvent event){ + // compare with wisbValues if wisbValues are already there for dynamic type + Collection noDataValues = CollectionUtils.subtract(wisb, wiri); + LOG.debug("nodatavalues:" + noDataValues + ", wisb: " + wisb + ", wiri: " + wiri); + if (noDataValues != null && noDataValues.size() > 0) { + LOG.info("No data alert is triggered with no data values {} and wisb {}", noDataValues, wisbValues); + AlertStreamEvent alertEvent = createAlertEvent(event.getTimestamp(), event.getData()); + collector.emit(alertEvent); + } + } + + private AlertStreamEvent createAlertEvent(long timestamp, Object[] triggerEvent){ + String is = policyDef.getInputStreams().get(0); + StreamDefinition sd = sds.get(is); + + AlertStreamEvent event = new AlertStreamEvent(); + event.setTimestamp(timestamp); + event.setData(triggerEvent); + event.setStreamId(policyDef.getOutputStreams().get(0)); + event.setPolicy(context.getPolicyDefinition()); + if (this.context.getParentEvaluator() != null) { + event.setCreatedBy(context.getParentEvaluator().getName()); + } + event.setCreatedTime(System.currentTimeMillis()); + event.setSchema(sd); + return event; + } + + @Override + public void close() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java new file mode 100644 index 0000000..fe06067 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbParser.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.nodata; + +import java.util.List; +import java.util.Set; + +/** + * Since 6/29/16. + */ +public interface NoDataWisbParser { + /** + * parse policy definition and return WISB values for one or multiple fields + * for example host and data center are 2 fields for no data alert, then WISB is a list of two values + * @param args some information parsed from policy defintion + * @return list of list of field values + */ + Set<List<String>> parse(String[] args); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java new file mode 100644 index 0000000..e13826a --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbProvidedParser.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.nodata; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Since 6/29/16. + */ +public class NoDataWisbProvidedParser implements NoDataWisbParser{ + @Override + /** + * policy value consists of "windowPeriod, type, numOfFields, f1_name, f2_name, f1_value, f2_value, f1_value, f2_value" + */ + public Set<List<String>> parse(String[] args) { + int numOfFields = Integer.parseInt(args[2]); + Set<List<String>> wisbValues = new HashSet<>(); + int i = 3 + numOfFields; + while(i<args.length){ + List<String> fields = new ArrayList<>(); + for(int j=0; j<numOfFields; j++){ + fields.add(args[i+j]); + } + wisbValues.add(fields); + i += numOfFields; + } + return wisbValues; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java new file mode 100644 index 0000000..887d099 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataWisbType.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.nodata; + +/** + * Since 6/29/16. + */ +public enum NoDataWisbType { + provided, + dynamic +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java index 644fe2b..d24bdb0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java @@ -17,25 +17,29 @@ */ package org.apache.eagle.alert.engine.publisher; +import java.io.Closeable; +import java.util.Map; + import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.impl.PublishStatus; import com.typesafe.config.Config; -import java.io.Closeable; -import java.util.Map; - /** * Created on 2/10/16. * Notification Plug-in interface which provide abstraction layer to notify to different system */ public interface AlertPublishPlugin extends Closeable { /** - * for initialization + * + * @param config + * @param publishment + * @param configProperties - storm config that would be useful for some implementation * @throws Exception */ - void init(Config config, Publishment publishment) throws Exception; + @SuppressWarnings("rawtypes") + void init(Config config, Publishment publishment, Map configProperties) throws Exception; void update(String dedupIntervalMin, Map<String, String> pluginProperties); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java index 7a44009..5c0e597 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java @@ -2,6 +2,7 @@ package org.apache.eagle.alert.engine.publisher; import java.io.Serializable; +import java.util.Map; import org.apache.eagle.alert.engine.model.AlertStreamEvent; @@ -24,7 +25,8 @@ import com.typesafe.config.Config; * limitations under the License. */ public interface AlertPublisher extends AlertPublishListener, Serializable { - void init(Config config); + @SuppressWarnings("rawtypes") + void init(Config config, Map stormConfig); String getName(); void nextEvent(AlertStreamEvent event); void close(); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java new file mode 100644 index 0000000..bd21415 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.publisher.impl; + +import java.util.Map; + +import org.apache.eagle.alert.engine.codec.IEventSerializer; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; +import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; +import org.slf4j.Logger; + +import com.typesafe.config.Config; + +/** + * @since Jun 3, 2016 + * + */ +public abstract class AbstractPublishPlugin implements AlertPublishPlugin { + + protected AlertDeduplicator deduplicator; + protected PublishStatus status; + protected IEventSerializer serializer; + protected String pubName; + + @SuppressWarnings("rawtypes") + @Override + public void init(Config config, Publishment publishment, Map conf) throws Exception { + this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin()); + this.pubName = publishment.getName(); + String serializerClz = publishment.getSerializer(); + try { + Object obj = Class.forName(serializerClz).getConstructor(Map.class).newInstance(conf); + if (!(obj instanceof IEventSerializer)) { + throw new Exception(String.format("serializer %s of publishement %s is not subclass to %s!", + publishment.getSerializer(), + publishment.getName(), + IEventSerializer.class.getName())); + } + serializer = (IEventSerializer) obj; + } catch (Exception e) { + getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e); + serializer = new StringEventSerializer(conf); + } + } + + @Override + public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { + deduplicator.setDedupIntervalMin(dedupIntervalMin); + } + + @Override + public AlertStreamEvent dedup(AlertStreamEvent event) { + return deduplicator.dedup(event); + } + + @Override + public PublishStatus getStatus() { + return status; + } + + protected abstract Logger getLogger(); + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java deleted file mode 100644 index 2a4e332..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.List; - -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -/** - * Alert API entity Persistor - */ -public class AlertEagleStorePersister { - private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class); - private Config config; - - public AlertEagleStorePersister(Config config) { - this.config = config; - } - - /** - * Persist passes list of Entities - * @param list - * @return - */ - public boolean doPersist(List<? extends StreamEvent> list) { - if (list.isEmpty()) return false; - LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size()); - try { - IMetadataServiceClient client = new MetadataServiceClientImpl(config); - // TODO: metadata service support - } - catch (Exception ex) { - LOG.error("Got an exception in persisting entities", ex); - return false; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java deleted file mode 100644 index 807aacc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.Arrays; -import java.util.Map; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -/** - * Plugin to persist alerts to Eagle Storage - */ -public class AlertEagleStorePublisher implements AlertPublishPlugin { - - private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class); - private PublishStatus status; - private AlertEagleStorePersister persist; - private AlertDeduplicator deduplicator; - - @Override - public void init(Config config, Publishment publishment) throws Exception { - this.persist = new AlertEagleStorePersister(config); - deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin()); - LOG.info("initialized plugin for EagleStorePlugin"); - } - - @Override - public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); - } - - @Override - public PublishStatus getStatus() { - return this.status; - } - - @Override - public AlertStreamEvent dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); - } - - /** - * Persist AlertEntity to alert_details table - * @param event - */ - @Override - public void onAlert(AlertStreamEvent event) { - LOG.info("write alert to eagle storage " + event); - event = dedup(event); - if(event == null) { - return; - } - PublishStatus status = new PublishStatus(); - try{ - boolean result = persist.doPersist(Arrays.asList(event)); - if(result) { - status.successful = true; - status.errorMessage = ""; - }else{ - status.successful = false; - status.errorMessage = ""; - } - }catch (Exception ex ){ - status.successful = false; - status.errorMessage = ex.getMessage(); - LOG.error("Fail writing alert entity to Eagle Store", ex); - } - this.status = status; - } - - @Override - public void close() { - - } - - @Override - public int hashCode(){ - return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode(); - } - - @Override - public boolean equals(Object o){ - if(o == this) - return true; - if(!(o instanceof AlertEagleStorePublisher)) - return false; - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index efe29bc..9d191c0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -27,8 +27,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.alert.engine.publisher.email.AlertEmailGenerator; import org.apache.eagle.alert.engine.publisher.email.AlertEmailGeneratorBuilder; @@ -37,27 +35,28 @@ import org.slf4j.LoggerFactory; import com.typesafe.config.Config; -public class AlertEmailPublisher implements AlertPublishPlugin { +public class AlertEmailPublisher extends AbstractPublishPlugin { private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisher.class); - private AlertEmailGenerator emailGenerator; - private AlertDeduplicator deduplicator; - private Map<String, String> emailConfig; private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4; private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8; private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute + + private AlertEmailGenerator emailGenerator; + private Map<String, String> emailConfig; + private transient ThreadPoolExecutor executorPool; - private PublishStatus status; @Override - public void init(Config config, Publishment publishment) throws Exception { + @SuppressWarnings("rawtypes") + public void init(Config config, Publishment publishment, Map conf) throws Exception { + super.init(config, publishment, conf); executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); LOG.info(" Creating Email Generator... "); if (publishment.getProperties() != null) { emailConfig = new HashMap<>(publishment.getProperties()); emailGenerator = createEmailGenerator(emailConfig); } - deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin()); } @Override @@ -84,7 +83,8 @@ public class AlertEmailPublisher implements AlertPublishPlugin { @Override public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); + super.update(dedupIntervalMin, pluginProperties); + if (pluginProperties != null && ! emailConfig.equals(pluginProperties)) { emailConfig = new HashMap<>(pluginProperties); emailGenerator = createEmailGenerator(pluginProperties); @@ -96,16 +96,6 @@ public class AlertEmailPublisher implements AlertPublishPlugin { this.executorPool.shutdown(); } - @Override - public PublishStatus getStatus() { - return this.status; - } - - @Override - public AlertStreamEvent dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); - } - /** * @param notificationConfig * @return @@ -148,4 +138,9 @@ public class AlertEmailPublisher implements AlertPublishPlugin { return false; return true; } + + @Override + protected Logger getLogger() { + return LOG; + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java index ea65298..2566f79 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java @@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -36,30 +34,30 @@ import org.slf4j.LoggerFactory; import com.typesafe.config.Config; -public class AlertKafkaPublisher implements AlertPublishPlugin { +public class AlertKafkaPublisher extends AbstractPublishPlugin { private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPublisher.class); - private AlertDeduplicator deduplicator; - private PublishStatus status; + private static final long MAX_TIMEOUT_MS = 60000; + @SuppressWarnings("rawtypes") private KafkaProducer producer; private String brokerList; private String topic; - private final static long MAX_TIMEOUT_MS =60000; - @Override - public void init(Config config, Publishment publishment) throws Exception { - deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin()); + @SuppressWarnings("rawtypes") + public void init(Config config, Publishment publishment, Map conf) throws Exception { + super.init(config, publishment, conf); + if (publishment.getProperties() != null) { Map<String, String> kafkaConfig = new HashMap<>(publishment.getProperties()); brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim(); - producer = KafkaProducerManager.INSTANCE.getProducer(brokerList); + producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig); topic = kafkaConfig.get(PublishConstants.TOPIC).trim(); } } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void onAlert(AlertStreamEvent event) throws Exception { if (producer == null) { @@ -72,7 +70,12 @@ public class AlertKafkaPublisher implements AlertPublishPlugin { } PublishStatus status = new PublishStatus(); try { - Future<?> future = producer.send(createRecord(event, topic)); + ProducerRecord record = createRecord(event, topic); + if (record == null) { + LOG.error(" Alert serialize return null, ignored message! "); + return; + } + Future<?> future = producer.send(record); future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); status.successful = true; status.errorMessage = ""; @@ -89,6 +92,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin { this.status = status; } + @SuppressWarnings("rawtypes") @Override public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { deduplicator.setDedupIntervalMin(dedupIntervalMin); @@ -99,7 +103,7 @@ public class AlertKafkaPublisher implements AlertPublishPlugin { brokerList = newBrokerList; KafkaProducer newProducer = null; try { - newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList); + newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties); } catch (Exception e) { LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties); } @@ -113,24 +117,22 @@ public class AlertKafkaPublisher implements AlertPublishPlugin { producer.close(); } - /** - * To Create KafkaProducer Record - * @param event - * @return ProducerRecord - * @throws Exception - */ - private ProducerRecord<String, String> createRecord(AlertStreamEvent event, String topic) throws Exception { - ProducerRecord<String, String> record = new ProducerRecord<>(topic, event.toString()); - return record; + private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception { + Object o = serialzeEvent(event); + if (o != null) { + ProducerRecord<String, Object> record = new ProducerRecord<>(topic, o); + return record; + } else { + return null; + } } - @Override - public PublishStatus getStatus() { - return this.status; + private Object serialzeEvent(AlertStreamEvent event) { + return serializer.serialize(event); } @Override - public AlertStreamEvent dedup(AlertStreamEvent event) { - return this.deduplicator.dedup(event); + protected Logger getLogger() { + return LOG; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java index f538088..82be5a0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublishPluginsFactory.java @@ -18,6 +18,8 @@ package org.apache.eagle.alert.engine.publisher.impl; +import java.util.Map; + import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.slf4j.Logger; @@ -32,12 +34,13 @@ public class AlertPublishPluginsFactory { private static final Logger LOG = LoggerFactory.getLogger(AlertPublishPluginsFactory.class); - public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config) { + @SuppressWarnings("rawtypes") + public static AlertPublishPlugin createNotificationPlugin(Publishment publishment, Config config, Map conf) { AlertPublishPlugin plugin = null; String publisherType = publishment.getType(); try { plugin = (AlertPublishPlugin) Class.forName(publisherType).newInstance(); - plugin.init(config, publishment); + plugin.init(config, publishment, conf); } catch (Exception ex) { LOG.error("Error in loading AlertPublisherPlugin class: ", ex); //throw new IllegalStateException(ex); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index fce22f1..6baa616 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import com.typesafe.config.Config; +@SuppressWarnings("rawtypes") public class AlertPublisherImpl implements AlertPublisher { private static final long serialVersionUID = 4809983246198138865L; private final static Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); @@ -40,14 +41,16 @@ public class AlertPublisherImpl implements AlertPublisher { private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1); private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1); private Config config; + private Map conf; public AlertPublisherImpl(String name) { this.name = name; } @Override - public void init(Config config) { + public void init(Config config, Map conf) { this.config = config; + this.conf = conf; } @Override @@ -84,6 +87,7 @@ public class AlertPublisherImpl implements AlertPublisher { publishPluginMapping.values().forEach(plugin -> plugin.close()); } + @SuppressWarnings("unchecked") @Override public void onPublishChange(List<Publishment> added, List<Publishment> removed, @@ -100,7 +104,7 @@ public class AlertPublisherImpl implements AlertPublisher { } for (Publishment publishment : added) { - AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config); + AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if(plugin != null) { publishPluginMapping.put(publishment.getName(), plugin); onPolicyAdded(publishment.getPolicyIds(), publishment.getName()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java index e8964a8..6b7fc61 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java @@ -17,26 +17,56 @@ */ package org.apache.eagle.alert.engine.publisher.impl; +import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; /** - * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. + * The producer is thread safe and sharing a single producer instance across threads will generally be faster than + * having multiple instances. */ public class KafkaProducerManager { - public static final KafkaProducerManager INSTANCE = new KafkaProducerManager(); - - public KafkaProducer<String, Object> getProducer(String brokerList) { - Properties configMap = new Properties(); - configMap.put("bootstrap.servers", brokerList); - configMap.put("metadata.broker.list", brokerList); - configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - configMap.put("request.required.acks", "1"); - configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); - configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); - KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap); - return producer; - } + + private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String VALUE_DESERIALIZER = "value.deserializer"; + private static final String KEY_DESERIALIZER = "key.deserializer"; + private static final String VALUE_SERIALIZER = "value.serializer"; + private static final String KEY_SERIALIZER = "key.serializer"; + + public static final KafkaProducerManager INSTANCE = new KafkaProducerManager(); + + public KafkaProducer<String, Object> getProducer(String brokerList, Map<String, String> kafkaConfig) { + Properties configMap = new Properties(); + configMap.put("bootstrap.servers", brokerList); + configMap.put("metadata.broker.list", brokerList); + + if (kafkaConfig.containsKey(KEY_SERIALIZER)) { + configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER)); + } else { + configMap.put(KEY_SERIALIZER, STRING_SERIALIZER); + } + + if (kafkaConfig.containsKey(VALUE_SERIALIZER)) { + configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER)); + } else { + configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER); + } + configMap.put("request.required.acks", "1"); + + if (kafkaConfig.containsKey(KEY_DESERIALIZER)) { + configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER)); + } else { + configMap.put(KEY_DESERIALIZER, STRING_SERIALIZER); + } + + if (kafkaConfig.containsKey(VALUE_DESERIALIZER)) { + configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER)); + } else { + configMap.put(VALUE_DESERIALIZER, STRING_SERIALIZER); + } + + KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap); + return producer; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java new file mode 100644 index 0000000..012ebaa --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/StringEventSerializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.publisher.impl; + +import java.util.Map; + +import org.apache.eagle.alert.engine.codec.IEventSerializer; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; + +/** + * @since Jun 3, 2016 + * + */ +public class StringEventSerializer implements IEventSerializer { + + @SuppressWarnings("rawtypes") + public StringEventSerializer(Map stormConf) { + } + + @Override + public Object serialize(AlertStreamEvent event) { + return event.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java index db235a7..28d2d22 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java @@ -57,7 +57,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBoltOutputCollector.class); private final OutputCollector outputCollector; private final Object outputLock = new Object(); - private final List<String> outputStreamIds; +// private final List<String> outputStreamIds; private final StreamContext streamContext; private final PartitionedEventSerializer serializer; private volatile Map<StreamPartition,StreamRouterSpec> routeSpecMap; @@ -69,7 +69,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto this.outputCollector = outputCollector; this.routeSpecMap = new HashMap<>(); this.routePartitionerMap = new HashMap<>(); - this.outputStreamIds = outputStreamIds; +// this.outputStreamIds = outputStreamIds; this.streamContext = streamContext; this.serializer = serializer; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java index cc819ba..131d85a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AbstractStreamBolt.java @@ -40,6 +40,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt { private Config config; private List<String> outputStreamIds; protected OutputCollector collector; + protected Map stormConf; public AbstractStreamBolt(IMetadataChangeNotifyService changeNotifyService, Config config){ this.changeNotifyService = changeNotifyService; @@ -56,6 +57,7 @@ public abstract class AbstractStreamBolt extends BaseRichBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.stormConf = stormConf; Preconditions.checkNotNull(this.changeNotifyService, "IMetadataChangeNotifyService is not set yet"); this.collector = collector; internalPrepare(collector,this.changeNotifyService,this.config,context); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index 30ff5f0..e53b0ba 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -134,7 +134,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen public void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) { List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId); if(newPolicies == null) { - LOG.info("no policy with AlertBoltSpec {} for this bolt {}", spec, boltId); + LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId); return; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 0a239e2..768cf48 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -53,13 +53,13 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli public AlertPublisherBolt(AlertPublisher alertPublisher, Config config, IMetadataChangeNotifyService coordinatorService){ super(coordinatorService, config); this.alertPublisher = alertPublisher; - this.alertPublisher.init(config); } @Override public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) { coordinatorService.registerListener(this); coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT); + this.alertPublisher.init(config, stormConf); streamContext = new StreamContextImpl(config,context.registerMetric("eagle.publisher",new MultiCountMetric(),60),context); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java index 942ef97..85c2f73 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java @@ -16,18 +16,26 @@ */ package org.apache.eagle.alert.engine.runner; -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; import org.apache.eagle.alert.coordination.model.RouterSpec; import org.apache.eagle.alert.coordination.model.StreamRouterSpec; import org.apache.eagle.alert.engine.StreamContext; import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.*; +import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.engine.coordinator.MetadataType; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.router.StreamRouter; import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; @@ -39,8 +47,12 @@ import org.apache.eagle.alert.utils.AlertConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; +import backtype.storm.metric.api.MultiCountMetric; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; + +import com.typesafe.config.Config; public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider{ private final static Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java index cef94c7..5a937f2 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; +import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl; @@ -67,11 +68,17 @@ public class UnitTopologyRunner { public final static int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600; private final IMetadataChangeNotifyService metadataChangeNotifyService; + private backtype.storm.Config givenStormConfig = null; public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService){ this.metadataChangeNotifyService = metadataChangeNotifyService; } + public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) { + this(changeNotifyService); + this.givenStormConfig = stormConfig; + } + public StormTopology buildTopology(String topologyId, int numOfSpoutTasks, int numOfRouterBolts, @@ -148,7 +155,7 @@ public class UnitTopologyRunner { return builder.createTopology(); } - public void run(String topologyId, + private void run(String topologyId, int numOfTotalWorkers, int numOfSpoutTasks, int numOfRouterBolts, @@ -156,7 +163,8 @@ public class UnitTopologyRunner { int numOfPublishTasks, Config config, boolean localMode) { - backtype.storm.Config stormConfig = new backtype.storm.Config(); + + backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig; // TODO: Configurable metric consumer instance number int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS)?config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS; @@ -186,11 +194,6 @@ public class UnitTopologyRunner { } } - public void run(Config config) { - String topologyId = config.getString("topology.name"); - run(topologyId,config); - } - public void run(String topologyId,Config config) { int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java index a3487d3..c1da90f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java @@ -40,7 +40,8 @@ public class JsonScheme implements Scheme { private String topic; - public JsonScheme(String topic) { + @SuppressWarnings("rawtypes") + public JsonScheme(String topic, Map conf) { this.topic = topic; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java index 89d2e76..194b0c2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java @@ -41,7 +41,8 @@ public class PlainStringScheme implements Scheme { private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class); private String topic; - public PlainStringScheme(String topic){ + @SuppressWarnings("rawtypes") + public PlainStringScheme(String topic, Map conf){ this.topic = topic; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java index 5ba1080..03c1dfb 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java @@ -16,16 +16,16 @@ */ package org.apache.eagle.alert.engine.serialization; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.PartitionedEvent; import org.apache.eagle.alert.engine.model.StreamEvent; import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer; import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - /** * TODO: Seams the complexity dosen't bring enough performance improve * http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java index c518e40..f653361 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java @@ -16,10 +16,10 @@ */ package org.apache.eagle.alert.engine.serialization; -import org.apache.eagle.alert.engine.model.PartitionedEvent; - import java.io.IOException; +import org.apache.eagle.alert.engine.model.PartitionedEvent; + public interface PartitionedEventSerializer { /** * http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java index a94604c..6be8f1a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializers.java @@ -16,12 +16,19 @@ */ package org.apache.eagle.alert.engine.serialization; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.serialization.impl.*; - import java.util.HashMap; import java.util.Map; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.serialization.impl.BooleanSerializer; +import org.apache.eagle.alert.engine.serialization.impl.DoubleSerializer; +import org.apache.eagle.alert.engine.serialization.impl.FloatSerializer; +import org.apache.eagle.alert.engine.serialization.impl.IntegerSerializer; +import org.apache.eagle.alert.engine.serialization.impl.JavaObjectSerializer; +import org.apache.eagle.alert.engine.serialization.impl.LongSerializer; +import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl; +import org.apache.eagle.alert.engine.serialization.impl.StringSerializer; + public class Serializers { private final static Map<StreamColumn.Type,Serializer<?>> COLUMN_TYPE_SER_MAPPING = new HashMap<>(); @@ -32,6 +39,7 @@ public class Serializers { COLUMN_TYPE_SER_MAPPING.put(type,serializer); } + @SuppressWarnings("unchecked") public static <T> Serializer<T> getColumnSerializer(StreamColumn.Type type){ if(COLUMN_TYPE_SER_MAPPING.containsKey(type)){ return (Serializer<T>) COLUMN_TYPE_SER_MAPPING.get(type); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java index 1e90569..db91a70 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/BooleanSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java index ad5f53c..f2f5359 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/impl/DoubleSerializer.java @@ -1,11 +1,11 @@ package org.apache.eagle.alert.engine.serialization.impl; -import org.apache.eagle.alert.engine.serialization.Serializer; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.eagle.alert.engine.serialization.Serializer; + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with
