EAGLE-594: Remove raw alert from kafka publisher We are leveraging configured deduplicator to dedup the duplicated alerts before publish to kafka, email, slack, etc. However, sometimes we may still want to keep the raw alerts in kafka. Here we have defined rawAlertNamespaceLabel and rawAlertNamespaceValue custom fields to emit the raw alerts into kafka. However, these configured namespace concept is ebay/sherlock specific, we should remove it from eagle and use it ebay/sherlock extended kafka publisher.
Author: Li, Garrett Reviewer: ralphsu This closes #478 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/a3df0743 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/a3df0743 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/a3df0743 Branch: refs/heads/master Commit: a3df074379b3952c366e1d9d876d0223ca24ffed Parents: 1d0f9f5 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Sun Oct 9 10:37:06 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Thu Oct 13 16:51:23 2016 +0800 ---------------------------------------------------------------------- .../engine/publisher/PublishConstants.java | 3 - .../publisher/impl/AlertKafkaPublisher.java | 90 ++++++++++---------- 2 files changed, 44 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a3df0743/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java index 7408779..0ba2313 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java @@ -51,7 +51,4 @@ public class PublishConstants { public static final String ALERT_EMAIL_POLICY = "policyId"; public static final String ALERT_EMAIL_CREATOR = "creator"; - public static final String RAW_ALERT_NAMESPACE_LABEL = "rawAlertNamespaceLabel"; - public static final String RAW_ALERT_NAMESPACE_VALUE = "rawAlertNamespaceValue"; - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/a3df0743/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java index 5464ded..5df34e3 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java @@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine.publisher.impl; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,8 +44,6 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { private KafkaProducer producer; private String brokerList; private String topic; - private String namespaceLabel; - private String namespaceValue; @Override @SuppressWarnings("rawtypes") @@ -58,42 +55,64 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { brokerList = kafkaConfig.get(PublishConstants.BROKER_LIST).trim(); producer = KafkaProducerManager.INSTANCE.getProducer(brokerList, kafkaConfig); topic = kafkaConfig.get(PublishConstants.TOPIC).trim(); - namespaceLabel = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_LABEL, "namespace"); - namespaceValue = kafkaConfig.getOrDefault(PublishConstants.RAW_ALERT_NAMESPACE_VALUE, "network"); } } - @SuppressWarnings( {"unchecked", "rawtypes"}) @Override public void onAlert(AlertStreamEvent event) throws Exception { if (producer == null) { LOG.warn("KafkaProducer is null due to the incorrect configurations"); return; } - List<AlertStreamEvent> outputEvents = new ArrayList<AlertStreamEvent>(); - int namespaceColumnIndex = event.getSchema().getColumnIndex(namespaceLabel); - if (namespaceColumnIndex < 0 || namespaceColumnIndex >= event.getData().length) { - LOG.warn("Namespace column {} is not found, the found index {} is invalid", - namespaceLabel, namespaceColumnIndex); - } else { - // copy raw event to be duped - AlertStreamEvent newEvent = new AlertStreamEvent(event); - newEvent.getData()[namespaceColumnIndex] = namespaceValue; - outputEvents.add(newEvent); + this.emit(this.topic, this.dedup(event)); + } + + @SuppressWarnings("rawtypes") + @Override + public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { + deduplicator.setDedupIntervalMin(dedupIntervalMin); + String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim(); + String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim(); + if (!newBrokerList.equals(this.brokerList)) { + producer.close(); + brokerList = newBrokerList; + KafkaProducer newProducer = null; + try { + newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties); + } catch (Exception e) { + LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties); + } + producer = newProducer; } + topic = newTopic; + } - List<AlertStreamEvent> dedupResults = dedup(event); - if (dedupResults != null) { - outputEvents.addAll(dedupResults); + @Override + public void close() { + producer.close(); + } + + @SuppressWarnings( {"rawtypes", "unchecked"}) + protected PublishStatus emit(String topic, List<AlertStreamEvent> outputEvents) { + // we need to check producer here since the producer is invisable to extended kafka publisher + if (producer == null) { + LOG.warn("KafkaProducer is null due to the incorrect configurations"); + return null; } - PublishStatus status = new PublishStatus(); + if (outputEvents == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Alert stream events list in publishment is empty"); + } + return null; + } + this.status = new PublishStatus(); try { for (AlertStreamEvent outputEvent : outputEvents) { ProducerRecord record = createRecord(outputEvent, topic); if (record == null) { - LOG.error(" Alert serialize return null, ignored message! "); - return; + LOG.error("Alert serialize return null, ignored message! "); + return null; } Future<?> future = producer.send(record); future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -112,32 +131,11 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin { status.successful = false; status.errorMessage = ex.getMessage(); } - this.status = status; - } - - @SuppressWarnings("rawtypes") - @Override - public void update(String dedupIntervalMin, Map<String, String> pluginProperties) { - deduplicator.setDedupIntervalMin(dedupIntervalMin); - String newBrokerList = pluginProperties.get(PublishConstants.BROKER_LIST).trim(); - String newTopic = pluginProperties.get(PublishConstants.TOPIC).trim(); - if (!newBrokerList.equals(this.brokerList)) { - producer.close(); - brokerList = newBrokerList; - KafkaProducer newProducer = null; - try { - newProducer = KafkaProducerManager.INSTANCE.getProducer(brokerList, pluginProperties); - } catch (Exception e) { - LOG.error("Create KafkaProducer failed with configurations: {}", pluginProperties); - } - producer = newProducer; - } - topic = newTopic; + return status; } - @Override - public void close() { - producer.close(); + protected String getTopic() { + return this.topic; } private ProducerRecord<String, Object> createRecord(AlertStreamEvent event, String topic) throws Exception {