http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java deleted file mode 100644 index 25e5cff..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dedup; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.alert.config.DeduplicatorConfig; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.policy.DynamicPolicyLoader; -import org.apache.eagle.policy.PolicyLifecycleMethods; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; -import scala.Tuple2; - -public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class); - protected Config config; - protected DEDUP_TYPE dedupType; - - private List<String> alertExecutorIdList; - private volatile CopyOnWriteHashMap<String, DefaultDeduplicator> alertDedups; - private PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao; - - public enum DEDUP_TYPE { - ENTITY, - EMAIL - } - - public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){ - this.alertExecutorIdList = alertExecutorIdList; - this.dedupType = dedupType; - this.dao = dao; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - public DefaultDeduplicator createAlertDedup(AlertDefinitionAPIEntity alertDef) { - DeduplicatorConfig dedupConfig = null; - try { - dedupConfig = JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), DeduplicatorConfig.class); - } - catch (Exception ex) { - LOG.warn("Initial dedup Config error, " + ex.getMessage()); - } - - if (dedupConfig != null) { - return new DefaultDeduplicator(dedupConfig.getAlertDedupIntervalMin(), dedupConfig.getFields()); - } - - return null; - } - - @Override - public void init() { - String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); - String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION); - Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs; - try { - initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - Map<String, DefaultDeduplicator> tmpDeduplicators = new HashMap<>(); - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions was found for site: "+site+", dataSource: "+dataSource); - } else { - for (String alertExecutorId: alertExecutorIdList) { - if(initialAlertDefs.containsKey(alertExecutorId)){ - for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){ - try { - DefaultDeduplicator deduplicator = createAlertDedup(alertDef); - if (deduplicator != null) - tmpDeduplicators.put(alertDef.getTags().get(Constants.POLICY_ID), deduplicator); - else LOG.warn("The dedup interval is not set, alertDef: " + alertDef); - } - catch (Throwable t) { - LOG.error("Got an exception when initial dedup config, probably dedup config is not set: " + t.getMessage() + "," + alertDef); - } - } - } else { - LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId)); - } - } - } - - alertDedups = new CopyOnWriteHashMap<>(); - alertDedups.putAll(tmpDeduplicators); - DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class); - policyLoader.init(initialAlertDefs, dao, config); - for (String alertExecutorId : alertExecutorIdList) { - policyLoader.addPolicyChangeListener(alertExecutorId, this); - } - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){ - String policyId = (String) input.get(0); - AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1); - DefaultDeduplicator dedup; - synchronized(alertDedups) { - dedup = alertDedups.get(policyId); - } - - List<AlertAPIEntity> ret = Arrays.asList(alertEntity); - if (dedup == null) { - LOG.warn("Dedup config for policyId " + policyId + " is not set or is not a valid config"); - } else { - if (dedup.getDedupIntervalMin() == -1) { - LOG.warn("the dedup interval is set as -1, which mean all alerts should be deduped(skipped)"); - return; - } - ret = dedup.dedup(ret); - } - for (AlertAPIEntity entity : ret) { - outputCollector.collect(new Tuple2(policyId, entity)); - } - } - - public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) { - if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be added : " + added); - for(AlertDefinitionAPIEntity alertDef : added.values()){ - LOG.info("Alert dedup config really added " + alertDef); - DefaultDeduplicator dedup = createAlertDedup(alertDef); - if (dedup != null) { - synchronized(alertDedups) { - alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup); - } - } - } - } - - public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) { - LOG.info("Alert dedup config changed : " + changed); - for(AlertDefinitionAPIEntity alertDef : changed.values()){ - LOG.info("Alert dedup config really changed " + alertDef); - DefaultDeduplicator dedup = createAlertDedup(alertDef); - if (dedup != null) { - synchronized(alertDedups) { - alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup); - } - } - } - } - - public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) { - LOG.info("alert dedup config deleted : " + deleted); - for(AlertDefinitionAPIEntity alertDef : deleted.values()){ - LOG.info("alert dedup config deleted " + alertDef); - // no cleanup to do, just remove it - synchronized(alertDedups) { - alertDedups.remove(alertDef.getTags().get(Constants.POLICY_ID)); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java deleted file mode 100644 index 8947d2c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dedup; - -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; - -import java.util.List; - -public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorBase { - - private static final long serialVersionUID = 1L; - - public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){ - super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java deleted file mode 100644 index b30dbda..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dedup; - -import java.util.List; - -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; - -public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase { - - private static final long serialVersionUID = 1L; - - public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){ - super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java deleted file mode 100644 index 1d79f9f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dedup; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.commons.lang.time.DateUtils; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.common.metric.AlertContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DefaultDeduplicator implements EntityDeduplicator { - protected long dedupIntervalMin; - protected List<String> fields; - protected Map<EntityDedupKey, Long> entites = new HashMap<>(); - public static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); - - public static enum AlertDeduplicationStatus{ - NEW, - DUPLICATED, - IGNORED - } - - public DefaultDeduplicator() { - this.dedupIntervalMin = 0; - fields = null; - } - - public DefaultDeduplicator(long intervalMin, List<String> fields) { - this.dedupIntervalMin = intervalMin; - this.fields = fields; - } - - public void clearOldCache() { - List<EntityDedupKey> removedkeys = new ArrayList<>(); - for (Entry<EntityDedupKey, Long> entry : entites.entrySet()) { - EntityDedupKey entity = entry.getKey(); - if (System.currentTimeMillis() - 7 * DateUtils.MILLIS_PER_DAY > entity.createdTime) { - removedkeys.add(entry.getKey()); - } - } - for (EntityDedupKey alertKey : removedkeys) { - entites.remove(alertKey); - } - } - - public AlertDeduplicationStatus checkDedup(EntityDedupKey key){ - long current = key.timestamp; - if(!entites.containsKey(key)){ - entites.put(key, current); - return AlertDeduplicationStatus.NEW; - } - - long last = entites.get(key); - if(current - last >= dedupIntervalMin * DateUtils.MILLIS_PER_MINUTE){ - entites.put(key, current); - return AlertDeduplicationStatus.DUPLICATED; - } - - return AlertDeduplicationStatus.IGNORED; - } - - private List<String> getKeyList(AlertAPIEntity entity) { - List<String> keys = new ArrayList<>(entity.getTags().values()); - if(fields != null && !fields.isEmpty()) { - for (String field: fields) { - AlertContext context = entity.getWrappedAlertContext(); - keys.add(context.getProperty(field)); - } - } - return keys; - } - - public List<AlertAPIEntity> dedup(List<AlertAPIEntity> list) { - clearOldCache(); - List<AlertAPIEntity> dedupList = new ArrayList<>(); - int totalCount = list.size(); - int dedupedCount = 0; - for(AlertAPIEntity entity: list) { - if (entity.getTags() == null) { - if(LOG.isDebugEnabled()) LOG.debug("Tags is null, don't know how to deduplicate, do nothing"); - } else { - AlertDeduplicationStatus status = checkDedup(new EntityDedupKey(getKeyList(entity), entity.getTimestamp())); - if (!status.equals(AlertDeduplicationStatus.IGNORED)) { - dedupList.add(entity); - } else { - dedupedCount++; - if (LOG.isDebugEnabled()) - LOG.debug(String.format("Entity is skipped because it's duplicated: " + entity.toString())); - } - } - } - - if(dedupedCount>0){ - LOG.info(String.format("Skipped %s of %s alerts because they are duplicated", dedupedCount, totalCount)); - }else if(LOG.isDebugEnabled()){ - LOG.debug(String.format("Skipped %s of %s duplicated alerts",dedupedCount,totalCount)); - } - - return dedupList; - } - - public EntityDeduplicator setDedupIntervalMin(long dedupIntervalMin) { - this.dedupIntervalMin = dedupIntervalMin; - return this; - } - - public long getDedupIntervalMin() { - return dedupIntervalMin; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java deleted file mode 100644 index 36b83e1..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.dedup; - -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -public class EntityDedupKey { - public List<String> values; - public Long timestamp; // entity's timestamp - public long createdTime; // entityTagsUniq's created time, for cache removal; - - private static final Logger LOG = LoggerFactory.getLogger(EntityDedupKey.class); - - public EntityDedupKey(List<String> values, long timestamp) { - this.values = new ArrayList<>(values); - this.timestamp = timestamp; - this.createdTime = System.currentTimeMillis(); - } - - public boolean equals(Object obj) { - if (obj instanceof EntityDedupKey) { - EntityDedupKey key = (EntityDedupKey) obj; - if (key == null || key.values.size() != values.size()) { - return false; - } - return values.equals(key.values); - } - return false; - } - - public int hashCode() { - HashCodeBuilder builder = new HashCodeBuilder(); - for (String value : values) { - builder.append(value); - } - return builder.build(); - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java deleted file mode 100644 index 85dd19a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dedup; - -import java.util.List; - -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; - -/** - * Dedup Eagle entities. - */ -public interface EntityDeduplicator { - - EntityDeduplicator setDedupIntervalMin(long intervalMin); - - long getDedupIntervalMin(); - - List dedup(List<AlertAPIEntity> list); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java deleted file mode 100644 index 81c8ba6..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.apache.eagle.alert.dedup; - -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @since Mar 19, 2015 - */ -public class EntityTagsUniq { - public Map<String, String> tags; - public Long timestamp; // entity's timestamp - public long createdTime; // entityTagsUniq's created time, for cache removal; - - private static final Logger LOG = LoggerFactory.getLogger(EntityTagsUniq.class); - - public EntityTagsUniq(Map<String, String> tags, long timestamp) { - this.tags = new HashMap<String, String>(tags); - this.timestamp = timestamp; - this.createdTime = System.currentTimeMillis(); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EntityTagsUniq) { - EntityTagsUniq au = (EntityTagsUniq) obj; - if (tags.size() != au.tags.size()) return false; - for (Entry<String, String> keyValue : au.tags.entrySet()) { - boolean keyExist = tags.containsKey(keyValue.getKey()); - // sanity check - if (tags.get(keyValue.getKey()) == null || keyValue.getValue() == null) { - return true; - } - if ( !keyExist || !tags.get(keyValue.getKey()).equals(keyValue.getValue())) { - return false; - } - } - return true; - } - return false; - } - - @Override - public int hashCode() { - int hashCode = 0; - for (Map.Entry<String,String> entry : tags.entrySet()) { - if(entry.getValue() == null) { - LOG.warn("Tag value for key ["+entry.getKey()+"] is null, skipped for hash code"); - }else { - try { - hashCode ^= entry.getValue().hashCode(); - } catch (Throwable t) { - LOG.info("Got exception because of entry: " + entry, t); - } - } - } - return hashCode; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java deleted file mode 100644 index 2eee6c5..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.executor; - -import org.apache.eagle.policy.ResultRender; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.policy.PolicyPartitioner; -import org.apache.eagle.policy.executor.PolicyProcessExecutor; -import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender; - -public class AlertExecutor extends PolicyProcessExecutor<AlertDefinitionAPIEntity, AlertAPIEntity> { - - private final SiddhiAlertAPIEntityRender resultRender = new SiddhiAlertAPIEntityRender(); - - public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, - PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) { - super(alertExecutorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams, - AlertDefinitionAPIEntity.class); - } - - @Override - public ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity> getResultRender() { - return resultRender; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java deleted file mode 100644 index 8ab290e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.executor; - -import java.util.List; -import java.util.Map; - -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.policy.DefaultPolicyPartitioner; -import org.apache.eagle.policy.PolicyPartitioner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValue; - -/** - * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors - * - * <br/><br/> - * Explanations for programId, alertExecutorId and policy<br/><br/> - * - programId - distributed or single-process program for example one storm topology<br/> - * - alertExecutorId - one process/thread which executes multiple policies<br/> - * - policy - some rules to be evaluated<br/> - * - * <br/> - * - * Normally the mapping is like following: - * <pre> - * programId (1:N) alertExecutorId - * alertExecutorId (1:N) policy - * </pre> - */ -public class AlertExecutorCreationUtils { - private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class); - - - /** - * Build DAG Tasks based on persisted alert definition and schemas from eagle store. - * - * <h3>Require configuration:</h3> - * - * <ul> - * <li>eagleProps.site: program site id.</li> - * <li>eagleProps.dataSource: program data source.</li> - * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li> - * </ul> - * - * <h3>Steps:</h3> - * - * <ol> - * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li> - * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li> - * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li> - * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li> - * </ol> - */ - public static AlertExecutor[] createAlertExecutors(Config config, PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefDAO, - List<String> streamNames, String alertExecutorId) throws Exception{ - // Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId - int numPartitions =1; - String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName(); - String alertExecutorConfigsKey = "alertExecutorConfigs"; - if(config.hasPath(alertExecutorConfigsKey)) { - Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey); - if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) { - Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped(); - int parts = 0; - if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism")); - numPartitions = parts == 0 ? 1 : parts; - if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner"); - } - } - - return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls); - } - - /** - * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"]) - */ - public static AlertExecutor[] createAlertExecutors(PolicyDefinitionDAO alertDefDAO, List<String> sourceStreams, - String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{ - LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls); - - // TODO: Create sourceStreams with alertExecutorID into AlertExecutorService - - PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance(); - AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions]; - String[] _sourceStreams = sourceStreams.toArray(new String[0]); - - for(int i = 0; i < numPartitions; i++){ - alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams); - } - return alertExecutors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java deleted file mode 100644 index af42dd3..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.apache.eagle.alert.notification; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.*; - -import org.apache.eagle.common.metric.AlertContext; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.alert.common.AlertEmailSender; -import org.apache.eagle.alert.email.AlertEmailComponent; -import org.apache.eagle.alert.email.AlertEmailContext; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import com.typesafe.config.ConfigObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Deprecated -public class AlertEmailGenerator{ - private String tplFile; - private String sender; - private String recipients; - private String subject; - private ConfigObject eagleProps; - - private ThreadPoolExecutor executorPool; - - private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class); - - private final static long MAX_TIMEOUT_MS =60000; - - public void sendAlertEmail(AlertAPIEntity entity) { - sendAlertEmail(entity, recipients, null); - } - - public void sendAlertEmail(AlertAPIEntity entity, String recipients) { - sendAlertEmail(entity, recipients, null); - } - - public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) { - AlertEmailContext email = new AlertEmailContext(); - - AlertEmailComponent component = new AlertEmailComponent(); - component.setAlertContext(AlertContext.fromJsonString(entity.getAlertContext())); - List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>(); - components.add(component); - email.setComponents(components); - if (AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT) != null) { - email.setSubject(AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT)); - } - else email.setSubject(subject); - email.setVelocityTplFile(tplFile); - email.setRecipients(recipients); - email.setCc(cc); - email.setSender(sender); - - /** asynchronized email sending */ - @SuppressWarnings("rawtypes") - AlertEmailSender thread = new AlertEmailSender(email, eagleProps); - - if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet"); - - LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc); - Future future = this.executorPool.submit(thread); - try { - future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); - LOG.info(String.format("Successfully send email to %s", recipients)); - } catch (InterruptedException | ExecutionException e) { - LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e); - } catch (TimeoutException e) { - LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e); - } - } - - public String getTplFile() { - return tplFile; - } - - public void setTplFile(String tplFile) { - this.tplFile = tplFile; - } - - public String getSender() { - return sender; - } - - public void setSender(String sender) { - this.sender = sender; - } - - public String getRecipients() { - return recipients; - } - - public void setRecipients(String recipients) { - this.recipients = recipients; - } - - public String getSubject() { - return subject; - } - - public void setSubject(String subject) { - this.subject = subject; - } - - public ConfigObject getEagleProps() { - return eagleProps; - } - - public void setEagleProps(ConfigObject eagleProps) { - this.eagleProps = eagleProps; - } - - public void setExecutorPool(ThreadPoolExecutor executorPool) { - this.executorPool = executorPool; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java deleted file mode 100644 index b024c39..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.notification; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.policy.DynamicPolicyLoader; -import org.apache.eagle.policy.PolicyLifecycleMethods; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; -import scala.Tuple1; - -/** - * notify alert by email, kafka message, storage or other means - */ -public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> { - private static final long serialVersionUID = 1690354365435407034L; - private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class); - private Config config; - /** Notification Manager - Responsible for forward and invoke configured Notification Plugin **/ - private NotificationPluginManagerImpl notificationManager; - - private List<String> alertExecutorIdList; - private PolicyDefinitionDAO dao; - - - public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){ - this.alertExecutorIdList = alertExecutorIdList; - this.dao = dao; - } - - @Override - public void init() { - String site = config.getString("eagleProps.site"); - String application = config.getString("eagleProps.application"); - Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs; - try { - initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId( site, application ); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions found for site: "+site+", application: "+ application); - } - try{ - notificationManager = new NotificationPluginManagerImpl(config); - }catch (Exception ex ){ - LOG.error("Fail to initialize NotificationManager: ", ex); - throw new IllegalStateException("Fail to initialize NotificationManager: ", ex); - } - - DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class); - policyLoader.init(initialAlertDefs, dao, config); - for (String alertExecutorId : alertExecutorIdList) { - policyLoader.addPolicyChangeListener(alertExecutorId, this); - } - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){ - AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1); - processAlerts(Arrays.asList(alertEntity)); - } - - private void processAlerts(List<AlertAPIEntity> list) { - for (AlertAPIEntity entity : list) { - notificationManager.notifyAlert(entity); - } - } - - @Override - public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) { - if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added); - for(AlertDefinitionAPIEntity alertDef : added.values()){ - LOG.info("alert notification config really changed " + alertDef); - notificationManager.updateNotificationPlugins( alertDef , false ); - } - } - - @Override - public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) { - if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed); - for(AlertDefinitionAPIEntity alertDef : changed.values()){ - LOG.info("alert notification config really added " + alertDef); - notificationManager.updateNotificationPlugins( alertDef , false ); - } - } - - @Override - public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) { - if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted); - for(AlertDefinitionAPIEntity alertDef : deleted.values()){ - LOG.info("alert notification config really deleted " + alertDef); - notificationManager.updateNotificationPlugins( alertDef , true ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java deleted file mode 100644 index 61bb7dc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.persist; - -import org.apache.eagle.alert.entity.AlertAPIEntity; -import com.typesafe.config.Config; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor1; -import scala.Tuple1; - -import java.util.Arrays; - -public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> { - - private static final long serialVersionUID = 1L; - private Config config; - private EaglePersist persist; - - public AlertPersistExecutor(){ - } - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) - ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; - String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) - ? config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; - this.persist = new EaglePersist(host, port, username, password); - } - - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){ - persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1)))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java deleted file mode 100644 index ebba518..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * - */ -package org.apache.eagle.alert.persist; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; -import org.apache.eagle.service.client.IEagleServiceClient; -import org.apache.eagle.service.client.impl.EagleServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -public class EaglePersist { - - private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class); - private String eagleServiceHost; - private int eagleServicePort; - private String username; - private String password; - - public EaglePersist(String eagleServiceHost, int eagleServicePort) { - this(eagleServiceHost, eagleServicePort, null, null); - } - - public EaglePersist(String eagleServiceHost, int eagleServicePort, String username, String password) { - this.eagleServiceHost = eagleServiceHost; - this.eagleServicePort = eagleServicePort; - this.username = username; - this.password = password; - } - - public boolean doPersist(List<? extends TaggedLogAPIEntity> list) { - if (list.isEmpty()) return false; - LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size()); - try { - IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password); - GenericServiceAPIResponseEntity<String> response = client.create(list); - client.close(); - if (response.isSuccess()) { - LOG.info("Successfully create entities " + list.toString()); - return true; - } - else { - LOG.error("Fail to create entities"); - return false; - } - } - catch (Exception ex) { - LOG.error("Got an exception in persisting entities" + ex.getMessage(), ex); - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java deleted file mode 100644 index c7ff74c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.siddhi; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.common.DateTimeUtil; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.common.metric.AlertContext; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.ResultRender; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.common.UrlBuilder; -import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator; -import org.apache.eagle.policy.siddhi.SiddhiQueryCallbackImpl; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -public class SiddhiAlertAPIEntityRender implements ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity>, Serializable { - - public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRender.class); - public static final String source = ManagementFactory.getRuntimeMXBean().getName(); - - @Override - @SuppressWarnings("unchecked") - public AlertAPIEntity render(Config config, List<Object> results, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> siddhiAlertContext, long timestamp) { - List<String> rets = SiddhiQueryCallbackImpl.convertToString(results); - SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = (SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>) siddhiAlertContext.evaluator; - String alertExecutorId = siddhiAlertContext.alertExecutor.getExecutorId(); - AlertAPIEntity entity = new AlertAPIEntity(); - AlertContext context = new AlertContext(); - String sourceStreams = evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS); - String[] sourceStreamsArr = sourceStreams.split(","); - List<String> attrRenameList = evaluator.getOutputStreamAttrNameList(); - Map<String, String> tags = new HashMap<String, String>(); - for (String sourceStream : sourceStreamsArr) { - List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim()); - for (AlertStreamSchemaEntity alertStream : list) { - if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) { - String attrName = alertStream.getTags().get(Constants.ATTR_NAME); - tags.put(attrName, rets.get(attrRenameList.indexOf(attrName))); - } - } - } - - for (int index = 0; index < rets.size(); index++) { - //attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList. - context.addProperty(attrRenameList.get(index), rets.get(index)); - } - - StringBuilder sb = new StringBuilder(); - for (Entry<String, String> entry : context.getProperties().entrySet()) { - String key = entry.getKey(); - String value = entry.getValue(); - sb.append(key + "=\"" + value + "\" "); - } - context.addAll(evaluator.getAdditionalContext()); - String policyId = context.getProperty(Constants.POLICY_ID); - String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ; - String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); - String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION); - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - - context.addProperty(Constants.ALERT_EVENT, sb.toString()); - context.addProperty(Constants.ALERT_MESSAGE, alertMessage); - context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis())); - context.addProperty(EagleConfigConstants.APPLICATION, application); - context.addProperty(EagleConfigConstants.SITE, site); - entity.setTimestamp(timestamp); - /** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/ - tags.put(EagleConfigConstants.SITE, site); - tags.put(EagleConfigConstants.APPLICATION, application); - tags.put(Constants.SOURCE_STREAMS, context.getProperty(Constants.SOURCE_STREAMS)); - tags.put(Constants.POLICY_ID, context.getProperty(Constants.POLICY_ID)); - tags.put(Constants.ALERT_SOURCE, source); - tags.put(Constants.ALERT_EXECUTOR_ID, alertExecutorId); - entity.setTags(tags); - - context.addProperty(Constants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags)); - context.addProperty(Constants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity)); - entity.setAlertContext(context.toJsonString()); - return entity; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider deleted file mode 100644 index eac2bfd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script deleted file mode 100644 index d4d3795..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -##### create alert related tables -create 'eagle_metric', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} -create 'alertdetail', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} - -create 'alertDataSource', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} -create 'alertStream', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} -create 'alertExecutor', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} -create 'alertStreamSchema', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} -create 'alertdef', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', COMPRESSION => 'SNAPPY'} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF deleted file mode 100644 index edfb15f..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -Manifest-Version: 1.0 -Class-Path: - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java deleted file mode 100644 index 20afc95..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.cep; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.alert.entity.AlertAPIEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.alert.executor.AlertExecutor; -import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.dao.*; -import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition; -import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.apache.eagle.service.client.EagleServiceConnector; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import scala.Tuple2; - -import java.util.*; -import java.util.concurrent.Semaphore; - -public class TestSiddhiEvaluator { - private AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) { - AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity(); - Map<String, String> tags = new HashMap<String, String>(); - tags.put("application", "hdfsAuditLog"); - tags.put("streamName", "hdfsAuditLogEventStream"); - tags.put("attrName", attrName); - entity.setTags(tags); - entity.setAttrType(type); - return entity; - } - - private int alertCount; - private Semaphore semaphore; - @Before - public void before(){ - alertCount = 0; - semaphore = new Semaphore(0); - } - - @Test - public void test() throws Exception{ - Config config = ConfigFactory.load("unittest.conf"); - AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) { - @Override - public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String dataSource) throws Exception { - List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>(); - list.add(createStreamMetaEntity("cmd", "string")); - list.add(createStreamMetaEntity("dst", "string")); - list.add(createStreamMetaEntity("src", "string")); - list.add(createStreamMetaEntity("host", "string")); - list.add(createStreamMetaEntity("user", "string")); - list.add(createStreamMetaEntity("timestamp", "long")); - list.add(createStreamMetaEntity("securityZone", "string")); - list.add(createStreamMetaEntity("sensitivityType", "string")); - list.add(createStreamMetaEntity("allowed", "string")); - return list; - } - }; - StreamMetadataManager.getInstance().reset(); - StreamMetadataManager.getInstance().init(config, streamDao); - - Map<String, Object> data1 = new TreeMap<String, Object>(){{ - put("cmd", "open"); - put("dst", ""); - put("src", ""); - put("host", ""); - put("user", ""); - put("timestamp", String.valueOf(System.currentTimeMillis())); - put("securityZone", ""); - put("sensitivityType", ""); - put("allowed", "true"); - }}; - final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition(); - policyDef.setType("siddhiCEPEngine"); - String expression = "from hdfsAuditLogEventStream[cmd=='open'] " + - "select * " + - "insert into outputStream ;"; - policyDef.setExpression(expression); - - PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null), - Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) { - @Override - public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception { - return null; - } - - @Override - public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ } - }; - - AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) { - @Override - protected Map<String, String> getDimensions(String policyId) { - return new HashMap<>(); - } - }; - alertExecutor.prepareConfig(config); - alertExecutor.init(); - - PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> context = new PolicyEvaluationContext<>(); - context.alertExecutor = alertExecutor; - context.policyId = "testPolicy"; - context.resultRender = new SiddhiAlertAPIEntityRender(); - context.outputCollector = (Collector<Tuple2<String, AlertAPIEntity>>) (stringAlertAPIEntityTuple2) -> { - alertCount++; - semaphore.release(); - }; - SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = - new SiddhiPolicyEvaluator<>(config, context, policyDef, new String[]{"hdfsAuditLogEventStream"}, false); - evaluator.evaluate(new ValuesArray(context.outputCollector, "hdfsAuditLogEventStream", data1)); - Thread.sleep(2 * 1000); - semaphore.acquire(); - Assert.assertEquals(alertCount, 1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java deleted file mode 100644 index f6d6a63..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.config; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; - -public class TestAlertDedup { - - @Test - public void test() throws Exception{ - String alertDef = "{\"alertDedupIntervalMin\":\"10\",\"fields\":[\"key1\",\"key2\",\"key3\"]}"; - DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class); - Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 10); - Assert.assertEquals(dedupConfig.getFields().size(), 3); - - alertDef = "null"; - dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class); - Assert.assertEquals(dedupConfig, null); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java deleted file mode 100644 index f7dcdde..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dao; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl; -import org.apache.eagle.service.client.EagleServiceConnector; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class TestAlertDefinitionDAOImpl { - - public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) { - AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity(); - entity.setEnabled(true); - Map<String, String> tags = new HashMap<String, String>(); - tags.put("site", site); - tags.put("programId", programId); - tags.put("alertExecutorId", alertExecutorId); - tags.put("policyId", policyId); - tags.put("policyType", policyType); - entity.setTags(tags); - return entity; - } - - @Test - public void test() throws Exception{ - Config config = ConfigFactory.load(); - String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - - String site = "sandbox"; - String dataSource = "UnitTest"; - PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(eagleServiceHost, eagleServicePort), - Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) { - @Override - public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception { - List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>(); - list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA")); - list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB")); - list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC")); - list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD")); - return list; - } - - @Override - public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ } - }; - - Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActivePoliciesGroupbyExecutorId(site, dataSource); - - Assert.assertEquals(2, retMap.size()); - Assert.assertEquals(2, retMap.get("TestExecutor1").size()); - Assert.assertEquals(2, retMap.get("TestExecutor2").size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java deleted file mode 100644 index d703214..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dao; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; -import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -public class TestSiddhiStreamMetadataUtils { - @Test - public void test() throws Exception { - Config config = ConfigFactory.load(); - StreamMetadataManager.getInstance().reset(); - StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO() { - @Override - public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication( - String application) { - return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"), - generateStreamMetadataAPIEntity("attrName2", "LONG") - ); - } - }); - String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName"); - Assert.assertEquals("define stream " + "testStreamName" + "(attrName1 string,attrName2 long);", siddhiStreamDef); - } - - private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){ - AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity(); - entity.setTags(new HashMap<String, String>(){{ - put("programId", "testProgramId"); - put("streamName", "testStreamName"); - put("attrName", attrName); - }}); - entity.setAttrType(attrType); - return entity; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java deleted file mode 100644 index 0bbfc4a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.dao; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class TestStreamDefinitionDAOImpl { - - public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) { - AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity(); - entity.setAttrType("String"); - entity.setAttrValueResolver("DefaultAttrValueResolver"); - entity.setCategory("SimpleType"); - Map<String, String> tags = new HashMap<String, String>(); - tags.put("programId", programId); - tags.put("streamName", streamName); - tags.put("attrName", attrName); - entity.setTags(tags); - return entity; - } - - @Test - public void test() throws Exception{ - Config config = ConfigFactory.load(); - AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) { - public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception { - List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>(); - String programId = "UnitTest"; - list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1")); - list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2")); - list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3")); - list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4")); - return list; - } - }; - StreamMetadataManager.getInstance().reset(); - StreamMetadataManager.getInstance().init(config, dao); - Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams(); - Assert.assertTrue(retMap.get("TestStream").size() == 4); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java deleted file mode 100644 index bb94908..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.executor; - -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.config.AbstractPolicyDefinition; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.policy.PolicyEvaluator; -import org.apache.eagle.policy.PolicyManager; -import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.junit.Ignore; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.junit.Assert; - -/** - * @since Dec 18, 2015 - * - */ -public class TestPolicyExecutor { - - public static class T2 extends AbstractPolicyDefinitionEntity { - @Override - public String getPolicyDef() { - return null; - } - @Override - public boolean isEnabled() { - return false; - } - } - - // not feasible to Unit test, it requires the local service. - @Ignore - @Test - public void testReflectCreatePolicyEvaluator() throws Exception { - System.setProperty("config.resource", "/unittest.conf"); - String policyType = Constants.policyType.siddhiCEPEngine.name(); - Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType); - Config config = ConfigFactory.load(); - - String def = "{\"expression\":\"from hdfsAuditLogEventStream select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}"; - // test1 : test json deserialization - AbstractPolicyDefinition policyDef = null; - policyDef = JsonSerDeserUtils.deserialize(def, AbstractPolicyDefinition.class, - PolicyManager.getInstance().getPolicyModules(policyType)); - // Assert conversion succeed - Assert.assertEquals(SiddhiPolicyDefinition.class, policyDef.getClass()); - - // make sure meta data manager initialized - StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config)); - - String[] sourceStreams = new String[] { "hdfsAuditLogEventStream" }; - // test2 : test evaluator - PolicyEvaluator pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, - String[].class, boolean.class).newInstance(config, "policy-id", policyDef, sourceStreams, false); - - PolicyEvaluator<AlertDefinitionAPIEntity> e1 = (PolicyEvaluator<AlertDefinitionAPIEntity>) pe; - - PolicyEvaluator<T2> e2 = (PolicyEvaluator<T2>) pe; - - } - -}
