http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java deleted file mode 100755 index 3aa079e..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.OutputCollector; - -/** - * <h2>Thread Safe Mechanism</h2> - * <ul> - * <li> - * emit() method is thread-safe enough to be called anywhere asynchronously in multi-thread - * </li> - * <li> - * flush() method must be called synchronously, because Storm OutputCollector is not thread-safe - * </li> - * </ul> - */ -public class AlertBoltOutputCollectorThreadSafeWrapper implements AlertStreamCollector { - private final OutputCollector delegate; - private final LinkedBlockingQueue<AlertStreamEvent> queue; - private final static Logger LOG = LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class); - private final AtomicLong lastFlushTime = new AtomicLong(System.currentTimeMillis()); - private final AutoAlertFlusher flusher; - private final static int MAX_ALERT_DELAY_SECS = 10; - - public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector outputCollector){ - this.delegate = outputCollector; - this.queue = new LinkedBlockingQueue<>(); - this.flusher = new AutoAlertFlusher(this); - this.flusher.setName(Thread.currentThread().getName()+"-alertFlusher"); - this.flusher.start(); - } - - private static class AutoAlertFlusher extends Thread{ - private final AlertBoltOutputCollectorThreadSafeWrapper collector; - private boolean stopped = false; - private final static Logger LOG = LoggerFactory.getLogger(AutoAlertFlusher.class); - - private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper collector){ - this.collector = collector; - } - - @Override - public void run() { - LOG.info("Starting"); - while(!this.stopped){ - if(System.currentTimeMillis() - collector.lastFlushTime.get() >= MAX_ALERT_DELAY_SECS * 1000L){ - this.collector.flush(); - } - try { - Thread.sleep(5000); - } catch (InterruptedException ignored) {} - } - LOG.info("Stopped"); - } - public void shutdown(){ - LOG.info("Stopping"); - this.stopped = true; - } - } - - /** - * Emit method can be called in multi-thread - * @param event - */ - @Override - public void emit(AlertStreamEvent event) { - try { - queue.put(event); - } catch (InterruptedException e) { - LOG.error(e.getMessage(),e); - } - } - - /** - * Flush will be called in synchronous way like StormBolt.execute() as Storm OutputCollector is not thread-safe - */ - @Override - public void flush() { - if(!queue.isEmpty()) { - List<AlertStreamEvent> events = new ArrayList<>(); - queue.drainTo(events); - events.forEach((event) -> delegate.emit(Arrays.asList(event.getStreamId(), event))); - LOG.info("Flushed {} alerts", events.size()); - } - lastFlushTime.set(System.currentTimeMillis()); - } - - @Override - public void close() { - this.flusher.shutdown(); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java deleted file mode 100755 index 8d8a4d2..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import java.util.Arrays; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; - -import backtype.storm.task.OutputCollector; - -public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { - private final OutputCollector delegate; - private final Object outputLock; - private final StreamContext streamContext; - - public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, Object outputLock, StreamContext streamContext){ - this.delegate = outputCollector; - this.outputLock = outputLock; - this.streamContext = streamContext; - } - - @Override - public void emit(AlertStreamEvent event) { - synchronized (outputLock) { - streamContext.counter().scope("alert_count").incr(); - delegate.emit(Arrays.asList(event.getStreamId(), event)); - } - } - - @Override - public void flush() { - // do nothing - } - - @Override - public void close() { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java deleted file mode 100644 index f063618..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.engine.AlertStreamCollector; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { - private static final long serialVersionUID = -5499413193675985288L; - - private final static Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class); - - private AlertStreamCollector collector; - // mapping from policy name to PolicyDefinition - private volatile Map<String,PolicyDefinition> policyDefinitionMap = new HashMap<>(); - // mapping from policy name to PolicyStreamHandler - private volatile Map<String,PolicyStreamHandler> policyStreamHandlerMap = new HashMap<>(); - private String policyEvaluatorId; - private StreamContext context; - - public PolicyGroupEvaluatorImpl(String policyEvaluatorId){ - this.policyEvaluatorId = policyEvaluatorId; - } - - public void init(StreamContext context, AlertStreamCollector collector) { - this.collector = collector; - this.policyStreamHandlerMap = new HashMap<>(); - this.context = context; - Thread.currentThread().setName(policyEvaluatorId); - } - - public void nextEvent(PartitionedEvent event) { - this.context.counter().scope("receive_count").incr(); - dispatch(event); - } - - @Override - public String getName() { - return this.policyEvaluatorId; - } - - public void close() { - for(PolicyStreamHandler handler: policyStreamHandlerMap.values()){ - try { - handler.close(); - } catch (Exception e) { - LOG.error("Failed to close handler {}",handler.toString(),e); - } - } - } - - /** - * fixme make selection of PolicyStreamHandler to be more efficient - * @param partitionedEvent PartitionedEvent - */ - private void dispatch(PartitionedEvent partitionedEvent){ - boolean handled = false; - for(Map.Entry<String,PolicyStreamHandler> policyStreamHandler: policyStreamHandlerMap.entrySet()){ - if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){ - try { - handled = true; - this.context.counter().scope("eval_count").incr(); - policyStreamHandler.getValue().send(partitionedEvent.getEvent()); - } catch (Exception e) { - this.context.counter().scope("fail_count").incr(); - LOG.error("{} failed to handle {}",policyStreamHandler.getValue(), partitionedEvent.getEvent()); - } - } - } - if(!handled){ - this.context.counter().scope("drop_count").incr(); - LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent); - } else { - this.context.counter().scope("accept_count").incr(); - } - } - - private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy){ - return policy.getInputStreams() != null - && policy.getInputStreams().contains(event.getEvent().getStreamId()) - && policy.getPartitionSpec().contains(event.getPartition()); - } - - @Override - public void onPolicyChange(List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) { - Map<String,PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap); - Map<String,PolicyStreamHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap); - for(PolicyDefinition pd : added){ - inplaceAdd(copyPolicies, copyHandlers, pd, sds); - } - for(PolicyDefinition pd : removed){ - inplaceRemove(copyPolicies, copyHandlers, pd); - } - for(PolicyDefinition pd : modified){ - inplaceRemove(copyPolicies, copyHandlers, pd); - inplaceAdd(copyPolicies, copyHandlers, pd, sds); - } - - // logging - LOG.info("Policy metadata updated with added={}, removed={}, modified={}", added, removed, modified); - - // switch reference - this.policyDefinitionMap = copyPolicies; - this.policyStreamHandlerMap = copyHandlers; - } - - private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) { - if(handlers.containsKey(policy.getName())){ - LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy); - }else { - policies.put(policy.getName(), policy); - PolicyStreamHandler handler = PolicyStreamHandlers.createHandler(policy.getDefinition().type, sds); - try { - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyCounter(this.context.counter()); - context.setPolicyDefinition(policy); - context.setParentEvaluator(this); - context.setPolicyEvaluatorId(policyEvaluatorId); - handler.prepare(collector, context); - handlers.put(policy.getName(), handler); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - policies.remove(policy.getName()); - handlers.remove(policy.getName()); - } - } - } - - private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy) { - if(handlers.containsKey(policy.getName())) { - PolicyStreamHandler handler = handlers.get(policy.getName()); - try { - handler.close(); - } catch (Exception e) { - LOG.error("Failed to close handler {}",handler,e); - }finally { - policies.remove(policy.getName()); - handlers.remove(policy.getName()); - LOG.info("Removed policy: {}",policy); - } - } else { - LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: "+policy); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java deleted file mode 100644 index 5b83abb..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.query.api.definition.AbstractDefinition; -import org.wso2.siddhi.query.api.definition.Attribute; - -import com.google.common.base.Preconditions; - -public class SiddhiDefinitionAdapter { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiDefinitionAdapter.class); - public final static String DEFINE_STREAM_TEMPLATE = "define stream %s ( %s );"; - - public static String buildStreamDefinition(StreamDefinition streamDefinition){ - List<String> columns = new ArrayList<>(); - Preconditions.checkNotNull(streamDefinition,"StreamDefinition is null"); - if(streamDefinition.getColumns()!=null) { - for (StreamColumn column : streamDefinition.getColumns()) { - columns.add(String.format("%s %s", column.getName(), convertToSiddhiAttributeType(column.getType()).toString().toLowerCase())); - } - }else{ - LOG.warn("No columns found for stream {}"+streamDefinition.getStreamId()); - } - return String.format(DEFINE_STREAM_TEMPLATE, streamDefinition.getStreamId(),StringUtils.join(columns,",")); - } - - public static Attribute.Type convertToSiddhiAttributeType(StreamColumn.Type type){ - if(_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)){ - return _EAGLE_SIDDHI_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: "+type); - } - - public static Class<?> convertToJavaAttributeType(StreamColumn.Type type){ - if(_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)){ - return _EAGLE_JAVA_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: "+type); - } - - public static StreamColumn.Type convertFromJavaAttributeType(Class<?> type){ - if(_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)){ - return _JAVA_EAGLE_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown stream type: "+type); - } - - public static StreamColumn.Type convertFromSiddhiAttributeType(Attribute.Type type){ - if(_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)){ - return _SIDDHI_EAGLE_TYPE_MAPPING.get(type); - } - - throw new IllegalArgumentException("Unknown siddhi type: "+type); - } - - /** - * public enum Type { - * STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT - * } - */ - private final static Map<StreamColumn.Type,Attribute.Type> _EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>(); - private final static Map<StreamColumn.Type,Class<?>> _EAGLE_JAVA_TYPE_MAPPING = new HashMap<>(); - private final static Map<Class<?>,StreamColumn.Type> _JAVA_EAGLE_TYPE_MAPPING = new HashMap<>(); - private final static Map<Attribute.Type,StreamColumn.Type> _SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>(); - - static { - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING,Attribute.Type.STRING); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT,Attribute.Type.INT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG,Attribute.Type.LONG); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Attribute.Type.FLOAT); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Attribute.Type.DOUBLE); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Attribute.Type.BOOL); - _EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Attribute.Type.OBJECT); - - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING,String.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT,Integer.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG,Long.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Float.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Double.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Boolean.class); - _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Object.class); - - _JAVA_EAGLE_TYPE_MAPPING.put(String.class,StreamColumn.Type.STRING); - _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class,StreamColumn.Type.INT); - _JAVA_EAGLE_TYPE_MAPPING.put(Long.class,StreamColumn.Type.LONG); - _JAVA_EAGLE_TYPE_MAPPING.put(Float.class,StreamColumn.Type.FLOAT); - _JAVA_EAGLE_TYPE_MAPPING.put(Double.class,StreamColumn.Type.DOUBLE); - _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class,StreamColumn.Type.BOOL); - _JAVA_EAGLE_TYPE_MAPPING.put(Object.class,StreamColumn.Type.OBJECT); - - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING,StreamColumn.Type.STRING); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT,StreamColumn.Type.INT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG,StreamColumn.Type.LONG); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT,StreamColumn.Type.FLOAT); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE,StreamColumn.Type.DOUBLE); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL,StreamColumn.Type.BOOL); - _SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT,StreamColumn.Type.OBJECT); - } - - public static StreamDefinition convertFromSiddiDefinition(AbstractDefinition siddhiDefinition){ - StreamDefinition streamDefinition = new StreamDefinition(); - streamDefinition.setStreamId(siddhiDefinition.getId()); - List<StreamColumn> columns = new ArrayList<>(siddhiDefinition.getAttributeNameArray().length); - for(Attribute attribute:siddhiDefinition.getAttributeList()){ - StreamColumn column = new StreamColumn(); - column.setType(convertFromSiddhiAttributeType(attribute.getType())); - column.setName(attribute.getName()); - columns.add(column); - } - streamDefinition.setColumns(columns); - streamDefinition.setTimeseries(true); - streamDefinition.setDescription("Auto-generated stream schema from siddhi for "+siddhiDefinition.getId()); - return streamDefinition; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java deleted file mode 100755 index dfa5612..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.evaluator.impl; - -import java.util.Map; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.core.stream.output.StreamCallback; - -public class SiddhiPolicyHandler implements PolicyStreamHandler { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyHandler.class); - private ExecutionPlanRuntime executionRuntime; - private SiddhiManager siddhiManager; - private Map<String, StreamDefinition> sds; - private PolicyDefinition policy; - private PolicyHandlerContext context; - - public SiddhiPolicyHandler(Map<String, StreamDefinition> sds){ - this.sds = sds; - } - - private static String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamDefinitionNotFoundException { - StringBuilder builder = new StringBuilder(); - for(String inputStream:policyDefinition.getInputStreams()) { - builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); - builder.append("\n"); - } - builder.append(policyDefinition.getDefinition().value); - if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: {} from policy: {}", builder.toString(),policyDefinition); - return builder.toString(); - } - - private static class AlertStreamCallback extends StreamCallback{ - private final String outputStream; - private final Collector<AlertStreamEvent> collector; - private final PolicyHandlerContext context; - private final StreamDefinition definition; - - public AlertStreamCallback(String outputStream, StreamDefinition streamDefinition, Collector<AlertStreamEvent> collector, PolicyHandlerContext context){ - this.outputStream = outputStream; - this.collector = collector; - this.context = context; - this.definition = streamDefinition; - } - - /** - * Possibly more than one event will be triggered for alerting - * @param events - */ - @Override - public void receive(Event[] events) { - LOG.info("Generated {} alerts from policy '{}' in {}", events.length,context.getPolicyDefinition().getName(), context.getPolicyEvaluatorId()); - for(Event e : events) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setTimestamp(e.getTimestamp()); - event.setData(e.getData()); - event.setStreamId(outputStream); - event.setPolicy(context.getPolicyDefinition()); - if (this.context.getParentEvaluator() != null) { - event.setCreatedBy(context.getParentEvaluator().getName()); - } - event.setCreatedTime(System.currentTimeMillis()); - event.setSchema(definition); - if(LOG.isDebugEnabled()) - LOG.debug("Generate new alert event: {}", event); - collector.emit(event); - } - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"alert_count")).incrBy(events.length); - } - } - - @Override - public void prepare(final Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - LOG.info("Initializing handler for policy {}: {}",context.getPolicyDefinition(),this); - this.policy = context.getPolicyDefinition(); - this.siddhiManager = new SiddhiManager(); - String plan = generateExecutionPlan(policy, sds); - try { - this.executionRuntime = siddhiManager.createExecutionPlanRuntime(plan); - LOG.info("Created siddhi runtime {}",executionRuntime.getName()); - }catch (Exception parserException){ - LOG.error("Failed to create siddhi runtime for policy: {}, siddhi plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException); - throw parserException; - } - for(final String outputStream:policy.getOutputStreams()){ - if(executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) { - this.executionRuntime.addCallback(outputStream, - new AlertStreamCallback( - outputStream, SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream)) - ,collector, context)); - } else { - throw new IllegalStateException("Undefined output stream "+outputStream); - } - } - this.executionRuntime.start(); - this.context = context; - LOG.info("Initialized policy handler for policy: {}",policy.getName()); - } - - public void send(StreamEvent event) throws Exception { - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr(); - String streamId = event.getStreamId(); - InputHandler inputHandler = executionRuntime.getInputHandler(streamId); - if(inputHandler != null){ - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr(); - inputHandler.send(event.getTimestamp(),event.getData()); - }else{ - context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr(); - LOG.warn("No input handler found for stream {}",streamId); - } - } - - public void close() { - LOG.info("Closing handler for policy {}",this.policy.getName()); - this.executionRuntime.shutdown(); - LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName()); - this.siddhiManager.shutdown(); - LOG.info("Shutdown siddhi manager {}",this.siddhiManager); - LOG.info("Closed handler for policy {}",this.policy.getName()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java deleted file mode 100644 index b894cc0..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher; - -import org.apache.eagle.alert.engine.model.AlertStreamEvent; - -/** - * Dedup Eagle entities. - */ -public interface AlertDeduplicator { - - AlertStreamEvent dedup(AlertStreamEvent event); - - void setDedupIntervalMin(String intervalMin); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java deleted file mode 100644 index 8f1c248..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.util.List; - -import org.apache.eagle.alert.engine.coordinator.Publishment; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface AlertPublishListener { - void onPublishChange(List<Publishment> added, - List<Publishment> removed, - List<Publishment> afterModified, - List<Publishment> beforeModified); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java deleted file mode 100644 index 644fe2b..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.PublishStatus; - -import com.typesafe.config.Config; - -import java.io.Closeable; -import java.util.Map; - -/** - * Created on 2/10/16. - * Notification Plug-in interface which provide abstraction layer to notify to different system - */ -public interface AlertPublishPlugin extends Closeable { - /** - * for initialization - * @throws Exception - */ - void init(Config config, Publishment publishment) throws Exception; - - void update(String dedupIntervalMin, Map<String, String> pluginProperties); - - void close(); - - void onAlert(AlertStreamEvent event) throws Exception; - - AlertStreamEvent dedup(AlertStreamEvent event); - - PublishStatus getStatus(); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java deleted file mode 100644 index fc1fc28..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface AlertPublishSpecListener { - void onAlertPublishSpecChange(PublishSpec spec, Map<String, StreamDefinition> sds); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java deleted file mode 100644 index 7a44009..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.eagle.alert.engine.publisher; - - -import java.io.Serializable; - -import org.apache.eagle.alert.engine.model.AlertStreamEvent; - -import com.typesafe.config.Config; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface AlertPublisher extends AlertPublishListener, Serializable { - void init(Config config); - String getName(); - void nextEvent(AlertStreamEvent event); - void close(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java deleted file mode 100644 index 91c9296..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.eagle.alert.engine.publisher; - -import java.io.Serializable; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface AlertSink extends Serializable { - /** - * - * @throws Exception - */ - void open() throws Exception; - - /** - * - * @throws Exception - */ - void close() throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java deleted file mode 100644 index 54a9afa..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher; - -public class PublishConstants { - public static final String NOTIFICATION_TYPE = "type"; - public static final String EMAIL_NOTIFICATION = "email"; - public static final String KAFKA_STORE = "kafka"; - public static final String EAGLE_STORE = "eagleStore"; - - // email specific constants - public static final String SUBJECT = "subject"; - public static final String SENDER = "sender"; - public static final String RECIPIENTS = "recipients"; - public static final String TEMPLATE = "template"; - - // kafka specific constants - public static final String TOPIC = "topic"; - public static final String BROKER_LIST = "kafka_broker"; - - public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp"; - public static final String ALERT_EMAIL_COUNT_PROPERTY = "count"; - public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList"; - public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin"; - - public static final String ALERT_EMAIL_MESSAGE = "alertMessage"; - public static final String ALERT_EMAIL_STREAM = "streamId"; - public static final String ALERT_EMAIL_TIMESTAMP = "alertTime"; - public static final String ALERT_EMAIL_POLICY = "policyId"; - public static final String ALERT_EMAIL_CREATOR = "creator"; - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java deleted file mode 100644 index 7d31bb5..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.email; - -public class AlertEmailConstants { - - public static final String MAIL_AUTH = "mail.smtp.auth"; - public static final String MAIL_HOST = "mail.smtp.host"; - public static final String MAIL_PORT = "mail.smtp.port"; - - public static final String CONN_PLAINTEXT = "plaintext"; - public static final String CONN_TLS = "tls"; - public static final String CONN_SSL = "ssl"; - - public static final String CONF_MAIL_SERVER = "smtp.server"; - public static final String CONF_MAIL_PORT = "smtp.port"; - public static final String CONF_MAIL_CONN = "connection"; - public static final String CONF_MAIL_DEBUG = "mailDebug"; - public static final String CONF_MAIL_AUTH = "smtp.auth.enable"; - public static final String CONF_AUTH_USER = "auth.username"; - public static final String CONF_AUTH_PASSWORD = "auth.password"; - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java deleted file mode 100644 index 8723283..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.email; - -import java.util.Map; - -/** - * alert email bean - * one email consists of a list of email component - */ -public class AlertEmailContext { - private Map<String, String> alertContext; - private String sender; - private String subject; - private String recipients; - private String velocityTplFile; - private String cc; - - public Map<String, String> getAlertContext() { - return alertContext; - } - - public void setAlertContext(Map<String, String> alertContext) { - this.alertContext = alertContext; - } - public String getVelocityTplFile() { - return velocityTplFile; - } - public void setVelocityTplFile(String velocityTplFile) { - this.velocityTplFile = velocityTplFile; - } - public String getRecipients() { - return recipients; - } - public void setRecipients(String recipients) { - this.recipients = recipients; - } - public String getSender() { - return sender; - } - public void setSender(String sender) { - this.sender = sender; - } - public String getSubject() { - return subject; - } - public void setSubject(String subject) { - this.subject = subject; - } - public String getCc() { - return cc; - } - public void setCc(String cc) { - this.cc = cc; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java deleted file mode 100644 index 0c68629..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * - */ -package org.apache.eagle.alert.engine.publisher.email; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.eagle.alert.utils.DateTimeUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AlertEmailGenerator { - private String tplFile; - private String sender; - private String recipients; - private String subject; - private Map<String, String> properties; - - private ThreadPoolExecutor executorPool; - - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class); - - private final static long MAX_TIMEOUT_MS =60000; - - public boolean sendAlertEmail(AlertStreamEvent entity) { - return sendAlertEmail(entity, recipients, null); - } - - public boolean sendAlertEmail(AlertStreamEvent entity, String recipients) { - return sendAlertEmail(entity, recipients, null); - } - - public boolean sendAlertEmail(AlertStreamEvent event, String recipients, String cc) { - AlertEmailContext email = new AlertEmailContext(); - Map<String, String> alertContext = buildAlertContext(event); - email.setAlertContext(alertContext); - email.setVelocityTplFile(tplFile); - email.setSubject(subject); - email.setSender(sender); - email.setRecipients(recipients); - email.setCc(cc); - - /** asynchronized email sending */ - AlertEmailSender thread = new AlertEmailSender(email, properties); - - if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet"); - - LOG.info("Sending email in asynchronous to: " + recipients + ", cc: " + cc); - Future<?> future = this.executorPool.submit(thread); - Boolean status; - try { - future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); - status = true; - //LOG.info(String.format("Successfully send email to %s", recipients)); - } catch (InterruptedException | ExecutionException e) { - status = false; - LOG.error(String.format("Failed to send email to %s, due to:%s", recipients, e),e); - } catch (TimeoutException e) { - status = false; - LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS),e); - } - return status; - } - - private Map<String, String> buildAlertContext(AlertStreamEvent event) { - Map<String, String> alertContext = new HashMap<>(); - alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, event.toString()); - alertContext.put(PublishConstants.ALERT_EMAIL_POLICY, event.getPolicyId()); - alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); - alertContext.put(PublishConstants.ALERT_EMAIL_STREAM, event.getStreamId()); - alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy()); - return alertContext; - } - - public String getTplFile() { - return tplFile; - } - - public void setTplFile(String tplFile) { - this.tplFile = tplFile; - } - - public String getSender() { - return sender; - } - - public void setSender(String sender) { - this.sender = sender; - } - - public String getRecipients() { - return recipients; - } - - public void setRecipients(String recipients) { - this.recipients = recipients; - } - - public String getSubject() { - return subject; - } - - public void setSubject(String subject) { - this.subject = subject; - } - - public Map<String, String> getProperties() { - return properties; - } - - public void setProperties(Map<String, String> properties) { - this.properties = properties; - } - - public void setExecutorPool(ThreadPoolExecutor executorPool) { - this.executorPool = executorPool; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java deleted file mode 100644 index e9635dd..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.email; - -import java.util.Map; -import java.util.concurrent.ThreadPoolExecutor; - -public class AlertEmailGeneratorBuilder { - private AlertEmailGenerator generator; - private AlertEmailGeneratorBuilder(){ - generator = new AlertEmailGenerator(); - } - public static AlertEmailGeneratorBuilder newBuilder(){ - return new AlertEmailGeneratorBuilder(); - } - public AlertEmailGeneratorBuilder withSubject(String subject){ - generator.setSubject(subject); - return this; - } - public AlertEmailGeneratorBuilder withSender(String sender){ - generator.setSender(sender); - return this; - } - public AlertEmailGeneratorBuilder withRecipients(String recipients){ - generator.setRecipients(recipients); - return this; - } - public AlertEmailGeneratorBuilder withTplFile(String tplFile){ - generator.setTplFile(tplFile); - return this; - } - public AlertEmailGeneratorBuilder withMailProps(Map<String, String> mailProps) { - generator.setProperties(mailProps); - return this; - } - public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) { - generator.setExecutorPool(threadPoolExecutor); - return this; - } - - public AlertEmailGenerator build(){ - return this.generator; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java deleted file mode 100644 index d0e5cf6..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.email; - -import java.lang.management.ManagementFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.eagle.alert.engine.publisher.PublishConstants; -import org.apache.eagle.alert.utils.DateTimeUtil; -import org.apache.velocity.VelocityContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AlertEmailSender implements Runnable { - - protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>(); - protected final String configFileName; - protected final String subject; - protected final String sender; - protected final String recipients; - protected final String cc; - protected final String origin; - protected boolean sentSuccessfully = false; - - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class); - private final static int MAX_RETRY_COUNT = 3; - - - - private Map<String, String> mailProps; - - - private String threadName; - /** - * Derived class may have some additional context properties to add - * @param context velocity context - * @param env environment - */ - protected void additionalContext(VelocityContext context, String env) { - // By default there's no additional context added - } - - public AlertEmailSender(AlertEmailContext alertEmail){ - this.recipients = alertEmail.getRecipients(); - this.configFileName = alertEmail.getVelocityTplFile(); - this.subject = alertEmail.getSubject(); - this.sender = alertEmail.getSender(); - this.cc = alertEmail.getCc(); - - this.alertContexts.add(alertEmail.getAlertContext()); - String tmp = ManagementFactory.getRuntimeMXBean().getName(); - this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")"; - threadName = Thread.currentThread().getName(); - LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipients +", velocity TPL file: " + this.configFileName); - } - - public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> mailProps){ - this(alertEmail); - this.mailProps = mailProps; - } - - private Properties parseMailClientConfig(Map<String, String> mailProps) { - if (mailProps == null) return null; - Properties props = new Properties(); - String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER); - String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT); - if (mailHost == null || mailPort == null || mailHost.isEmpty()) { - LOG.warn("SMTP server is unset, will exit"); - return null; - } - props.put(AlertEmailConstants.MAIL_HOST, mailHost); - props.put(AlertEmailConstants.MAIL_PORT, mailPort); - - String smtpAuth = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false"); - props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth); - if (Boolean.parseBoolean(smtpAuth)) { - props.put(AlertEmailConstants.CONF_AUTH_USER, mailProps.get(AlertEmailConstants.CONF_AUTH_USER)); - props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD)); - } - - String smtpConn = mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, AlertEmailConstants.CONN_PLAINTEXT); - if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) { - props.put("mail.smtp.starttls.enable", "true"); - } - if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) { - props.put("mail.smtp.socketFactory.port", "465"); - props.put("mail.smtp.socketFactory.class", - "javax.net.ssl.SSLSocketFactory"); - } - props.put(AlertEmailConstants.CONF_MAIL_DEBUG, mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false")); - return props; - } - - @Override - public void run() { - int count = 0; - boolean success = false; - while(count++ < MAX_RETRY_COUNT && !success){ - LOG.info("Sending email, tried: " + count+", max: " + MAX_RETRY_COUNT); - try { - final EagleMailClient client; - if (mailProps != null) { - Properties props = parseMailClientConfig(mailProps); - client = new EagleMailClient(props); - } - else { - client = new EagleMailClient(); - } - - final VelocityContext context = new VelocityContext(); - generateCommonContext(context); - LOG.info("After calling generateCommonContext..."); - - if (recipients == null || recipients.equals("")) { - LOG.error("Recipients is null, skip sending emails "); - return; - } - String title = subject; - - success = client.send(sender, recipients, cc, title, configFileName, context, null); - LOG.info("Success of sending email: " + success); - if(!success && count < MAX_RETRY_COUNT) { - LOG.info("Sleep for a while before retrying"); - Thread.sleep(10 * 1000); - } - } - catch (Exception e){ - LOG.warn("Sending mail exception", e); - } - } - if (success) { - sentSuccessfully = true; - LOG.info(String.format("Successfully send email, thread: %s", threadName)); - } else{ - LOG.warn(String.format("Fail sending email after tries %s times, thread: %s", MAX_RETRY_COUNT, threadName)); - } - } - - private void generateCommonContext(VelocityContext context) { - context.put(PublishConstants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis())); - context.put(PublishConstants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size()); - context.put(PublishConstants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts); - context.put(PublishConstants.ALERT_EMAIL_ORIGIN_PROPERTY, origin); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java deleted file mode 100755 index 61194a2..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.email; - -import java.io.File; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import javax.activation.DataHandler; -import javax.activation.DataSource; -import javax.activation.FileDataSource; -import javax.mail.Authenticator; -import javax.mail.Message; -import javax.mail.MessagingException; -import javax.mail.Multipart; -import javax.mail.PasswordAuthentication; -import javax.mail.Session; -import javax.mail.Transport; -import javax.mail.internet.AddressException; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeBodyPart; -import javax.mail.internet.MimeMessage; -import javax.mail.internet.MimeMultipart; - -import org.apache.velocity.Template; -import org.apache.velocity.VelocityContext; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.exception.ResourceNotFoundException; -import org.apache.velocity.runtime.RuntimeConstants; -import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EagleMailClient { -// private static final String CONFIG_FILE = "config.properties"; - private static final String BASE_PATH = "templates/"; - - private VelocityEngine velocityEngine; - private Session session; - private static final Logger LOG = LoggerFactory.getLogger(EagleMailClient.class); - - public EagleMailClient() { - this(new Properties()); - } - - public EagleMailClient(final Properties config) { - try { - velocityEngine = new VelocityEngine(); - velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); - velocityEngine.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); - velocityEngine.init(); - - config.put("mail.transport.protocol", "smtp"); - if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){ - session = Session.getDefaultInstance(config, new Authenticator() { - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER), config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD)); - } - }); - } - else session = Session.getDefaultInstance(config, new Authenticator() {}); - final String debugMode = config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false"); - final boolean debug = Boolean.parseBoolean(debugMode); - session.setDebug(debug); - } catch (Exception e) { - LOG.error("Failed connect to smtp server",e); - } - } - - private boolean _send(String from, String to, String cc, String title, - String content) { - Message msg = new MimeMessage(session); - try { - msg.setFrom(new InternetAddress(from)); - msg.setSubject(title); - if (to != null) { - msg.setRecipients(Message.RecipientType.TO, - InternetAddress.parse(to)); - } - if (cc != null) { - msg.setRecipients(Message.RecipientType.CC, - InternetAddress.parse(cc)); - } - //msg.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); - msg.setContent(content, "text/html;charset=utf-8"); - LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); - Transport.send(msg); - return true; - } catch (AddressException e) { - LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e); - return false; - } catch (MessagingException e) { - LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e); - return false; - } - } - - private boolean _send(String from,String to,String cc,String title,String content,List<MimeBodyPart> attachments){ - MimeMessage mail = new MimeMessage(session); - try { - mail.setFrom(new InternetAddress(from)); - mail.setSubject(title); - if (to != null) { - mail.setRecipients(Message.RecipientType.TO, - InternetAddress.parse(to)); - } - if (cc != null) { - mail.setRecipients(Message.RecipientType.CC, - InternetAddress.parse(cc)); - } - - //mail.setRecipients(Message.RecipientType.BCC, InternetAddress.parse(DEFAULT_BCC_ADDRESS)); - - MimeBodyPart mimeBodyPart = new MimeBodyPart(); - mimeBodyPart.setContent(content,"text/html;charset=utf-8"); - - Multipart multipart = new MimeMultipart(); - multipart.addBodyPart(mimeBodyPart); - - for(MimeBodyPart attachment:attachments){ - multipart.addBodyPart(attachment); - } - - mail.setContent(multipart); -// mail.setContent(content, "text/html;charset=utf-8"); - LOG.info(String.format("Going to send mail: from[%s], to[%s], cc[%s], title[%s]", from, to, cc, title)); - Transport.send(mail); - return true; - } catch (AddressException e) { - LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e); - return false; - } catch (MessagingException e) { - LOG.info("Send mail failed, got an AddressException: " + e.getMessage(), e); - return false; - } - } - - public boolean send(String from, String to, String cc, String title, - String content) { - return this._send(from, to, cc, title, content); - } - - public boolean send(String from, String to, String cc, String title, - String templatePath, VelocityContext context) { - Template t = null; - try { - t = velocityEngine.getTemplate(BASE_PATH + templatePath); - } catch (ResourceNotFoundException ex) { - } - if (t == null) { - try { - t = velocityEngine.getTemplate(templatePath); - } catch (ResourceNotFoundException e) { - t = velocityEngine.getTemplate("/" + templatePath); - } - } - final StringWriter writer = new StringWriter(); - t.merge(context, writer); - if(LOG.isDebugEnabled()) LOG.debug(writer.toString()); - return this.send(from, to, cc, title, writer.toString()); - } - - public boolean send(String from, String to, String cc, String title, - String templatePath, VelocityContext context, Map<String,File> attachments) { - if (attachments == null || attachments.isEmpty()) { - return send(from, to, cc, title, templatePath, context); - } - Template t = null; - - List<MimeBodyPart> mimeBodyParts = new ArrayList<MimeBodyPart>(); - Map<String,String> cid = new HashMap<String,String>(); - - for (Map.Entry<String,File> entry : attachments.entrySet()) { - final String attachment = entry.getKey(); - final File attachmentFile = entry.getValue(); - final MimeBodyPart mimeBodyPart = new MimeBodyPart(); - if(attachmentFile !=null && attachmentFile.exists()){ - DataSource source = new FileDataSource(attachmentFile); - try { - mimeBodyPart.setDataHandler(new DataHandler(source)); - mimeBodyPart.setFileName(attachment); - mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT); - mimeBodyPart.setContentID(attachment); - cid.put(attachment,mimeBodyPart.getContentID()); - mimeBodyParts.add(mimeBodyPart); - } catch (MessagingException e) { - LOG.error("Generate mail failed, got exception while attaching files: " + e.getMessage(), e); - } - }else{ - LOG.error("Attachment: " + attachment + " is null or not exists"); - } - } - //TODO remove cid, because not used at all - if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid); - context.put("cid", cid); - - try { - t = velocityEngine.getTemplate(BASE_PATH + templatePath); - } catch (ResourceNotFoundException ex) { -// LOGGER.error("Template not found:"+BASE_PATH + templatePath, ex); - } - - if (t == null) { - try { - t = velocityEngine.getTemplate(templatePath); - } catch (ResourceNotFoundException e) { - try { - t = velocityEngine.getTemplate("/" + templatePath); - } - catch (Exception ex) { - LOG.error("Template not found:"+ "/" + templatePath, ex); - } - } - } - - final StringWriter writer = new StringWriter(); - t.merge(context, writer); - if(LOG.isDebugEnabled()) LOG.debug(writer.toString()); - return this._send(from, to, cc, title, writer.toString(), mimeBodyParts); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java deleted file mode 100644 index 2a4e332..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.List; - -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -/** - * Alert API entity Persistor - */ -public class AlertEagleStorePersister { - private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class); - private Config config; - - public AlertEagleStorePersister(Config config) { - this.config = config; - } - - /** - * Persist passes list of Entities - * @param list - * @return - */ - public boolean doPersist(List<? extends StreamEvent> list) { - if (list.isEmpty()) return false; - LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size()); - try { - IMetadataServiceClient client = new MetadataServiceClientImpl(config); - // TODO: metadata service support - } - catch (Exception ex) { - LOG.error("Got an exception in persisting entities", ex); - return false; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java deleted file mode 100644 index 807aacc..0000000 --- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher.impl; - -import java.util.Arrays; -import java.util.Map; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -/** - * Plugin to persist alerts to Eagle Storage - */ -public class AlertEagleStorePublisher implements AlertPublishPlugin { - - private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePublisher.class); - private PublishStatus status; - private AlertEagleStorePersister persist; - private AlertDeduplicator deduplicator; - - @Override - public void init(Config config, Publishment publishment) throws Exception { - this.persist = new AlertEagleStorePersister(config); - deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin()); - LOG.info("initialized plugin for EagleStorePlugin"); - } - - @Override - public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); - } - - @Override - public PublishStatus getStatus() { - return this.status; - } - - @Override - public AlertStreamEvent dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); - } - - /** - * Persist AlertEntity to alert_details table - * @param event - */ - @Override - public void onAlert(AlertStreamEvent event) { - LOG.info("write alert to eagle storage " + event); - event = dedup(event); - if(event == null) { - return; - } - PublishStatus status = new PublishStatus(); - try{ - boolean result = persist.doPersist(Arrays.asList(event)); - if(result) { - status.successful = true; - status.errorMessage = ""; - }else{ - status.successful = false; - status.errorMessage = ""; - } - }catch (Exception ex ){ - status.successful = false; - status.errorMessage = ex.getMessage(); - LOG.error("Fail writing alert entity to Eagle Store", ex); - } - this.status = status; - } - - @Override - public void close() { - - } - - @Override - public int hashCode(){ - return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode(); - } - - @Override - public boolean equals(Object o){ - if(o == this) - return true; - if(!(o instanceof AlertEagleStorePublisher)) - return false; - return true; - } -} \ No newline at end of file