http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java deleted file mode 100755 index 144f9aa..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; -import org.apache.eagle.alert.engine.evaluator.impl.AlertBoltOutputCollectorWrapper; -import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.router.AlertBoltSpecListener; -import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.engine.utils.SingletonExecutor; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.apache.eagle.alert.utils.AlertConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; - -/** - * Since 5/1/16. - * This is container for hosting all policies belonging to the same monitoredStream - * MonitoredStream refers to tuple of {dataSource, streamId, groupby} - * The container is also called {@link WorkSlot} - */ -public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListener, SerializationMetadataProvider { - private static final Logger LOG = LoggerFactory.getLogger(AlertBolt.class); - private static final long serialVersionUID = -4132297691448945672L; - private PolicyGroupEvaluator policyGroupEvaluator; - private AlertBoltOutputCollectorWrapper alertOutputCollector; - private String boltId; - private boolean logEventEnabled; - private volatile Object outputLock; - // mapping from policy name to PolicyDefinition - private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies - - private volatile Set<PublishPartition> cachedPublishPartitions = new HashSet<>(); - - private AlertBoltSpec spec; - - public AlertBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) { - super(boltId, changeNotifyService, config); - this.boltId = boltId; - this.policyGroupEvaluator = new PolicyGroupEvaluatorImpl(boltId + "-evaluator_stage1"); // use bolt id as evaluatorId. - // TODO next stage evaluator - - if (config.hasPath("topology.logEventEnabled")) { - logEventEnabled = config.getBoolean("topology.logEventEnabled"); - } - } - - @Override - public void execute(Tuple input) { - this.streamContext.counter().incr("execute_count"); - try { - PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0)); - if (logEventEnabled) { - LOG.info("Alert bolt {} received event: {}", boltId, pe.getEvent()); - } - String streamEventVersion = pe.getEvent().getMetaVersion(); - - if (streamEventVersion == null) { - // if stream event version is null, need to initialize it - pe.getEvent().setMetaVersion(specVersion); - } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) { - if (specVersion != null && streamEventVersion != null - && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) { - // check if specVersion is older than stream_event_version - // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]); - // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]); - long timestampOfSpecVersion = Long.valueOf(specVersion.substring(13)); - long timestampOfStreamEventVersion = Long.valueOf(streamEventVersion.substring(13)); - specVersionOutofdate = timestampOfSpecVersion < timestampOfStreamEventVersion; - if (!specVersionOutofdate) { - pe.getEvent().setMetaVersion(specVersion); - } - } - - String message = String.format("Spec Version [%s] of AlertBolt is %s Stream Event Version [%s]!", specVersion, specVersionOutofdate ? "older than" : "newer than", streamEventVersion); - LOG.warn(message); - - // send out metrics for meta conflict - this.streamContext.counter().incr("meta_conflict"); - - ExecutorService executors = SingletonExecutor.getExecutorService(); - executors.submit(() -> { - // if spec version is out-of-date, need to refresh it - if (specVersionOutofdate) { - try { - IMetadataServiceClient client = new MetadataServiceClientImpl(this.getConfig()); - String topologyId = spec.getTopologyName(); - AlertBoltSpec latestSpec = client.getVersionedSpec().getAlertSpecs().get(topologyId); - if (latestSpec != null) { - spec = latestSpec; - } - } catch (Exception e) { - LOG.error(e.toString()); - } - - } - }); - - } - - policyGroupEvaluator.nextEvent(pe.withAnchor(input)); - synchronized (outputLock) { - this.collector.ack(input); - } - this.streamContext.counter().incr("ack_count"); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - synchronized (outputLock) { - this.streamContext.counter().incr("fail_count"); - this.collector.fail(input); - } - } finally { - alertOutputCollector.flush(); - } - } - - @Override - public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService metadataChangeNotifyService, Config config, TopologyContext context) { - // instantiate output lock object - outputLock = new Object(); - streamContext = new StreamContextImpl(config, context.registerMetric("eagle.evaluator", new MultiCountMetric(), 60), context); - alertOutputCollector = new AlertBoltOutputCollectorWrapper(new StormOutputCollector(collector), outputLock, streamContext); - policyGroupEvaluator.init(streamContext, alertOutputCollector); - metadataChangeNotifyService.registerListener(this); - metadataChangeNotifyService.init(config, MetadataType.ALERT_BOLT); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(AlertConstants.FIELD_0, AlertConstants.FIELD_1)); - } - - @Override - public void cleanup() { - policyGroupEvaluator.close(); - alertOutputCollector.flush(); - alertOutputCollector.close(); - super.cleanup(); - } - - @SuppressWarnings("unchecked") - @Override - public synchronized void onAlertBoltSpecChange(AlertBoltSpec spec, Map<String, StreamDefinition> sds) { - List<PolicyDefinition> newPolicies = spec.getBoltPoliciesMap().get(boltId); - if (newPolicies == null) { - LOG.info("no new policy with AlertBoltSpec {} for this bolt {}", spec, boltId); - return; - } - - Map<String, PolicyDefinition> newPoliciesMap = new HashMap<>(); - newPolicies.forEach(p -> newPoliciesMap.put(p.getName(), p)); - MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies); - comparator.compare(); - - MapComparator<String, StreamDefinition> streamComparator = new MapComparator<>(sds, sdf); - streamComparator.compare(); - - List<StreamDefinition> addOrUpdatedStreams = streamComparator.getAdded(); - addOrUpdatedStreams.addAll(streamComparator.getModified()); - List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values()); - addOrUpdatedStreams.forEach(s -> { - cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId()) - || p.getOutputStreams().contains(s.getStreamId())).forEach(p -> { - if (comparator.getModified().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0 - && comparator.getAdded().stream().filter(x -> x.getName().equals(p.getName())).count() <= 0) { - comparator.getModified().add(p); - } - }); - ; - }); - - policyGroupEvaluator.onPolicyChange(spec.getVersion(), comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds); - - // update alert output collector - Set<PublishPartition> newPublishPartitions = new HashSet<>(); - spec.getPublishPartitions().forEach(p -> { - if (newPolicies.stream().filter(o -> o.getName().equals(p.getPolicyId())).count() > 0) { - newPublishPartitions.add(p); - } - }); - - Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(newPublishPartitions, cachedPublishPartitions); - Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions, newPublishPartitions); - Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(newPublishPartitions, cachedPublishPartitions); - - LOG.debug("added PublishPartition " + addedPublishPartitions); - LOG.debug("removed PublishPartition " + removedPublishPartitions); - LOG.debug("modified PublishPartition " + modifiedPublishPartitions); - - alertOutputCollector.onAlertBoltSpecChange(addedPublishPartitions, removedPublishPartitions, modifiedPublishPartitions); - - // switch - cachedPolicies = newPoliciesMap; - cachedPublishPartitions = newPublishPartitions; - sdf = sds; - specVersion = spec.getVersion(); - this.spec = spec; - } - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java deleted file mode 100644 index 44a5fe9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ /dev/null @@ -1,218 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import com.typesafe.config.Config; -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; -import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; -import org.apache.eagle.alert.engine.publisher.PipeStreamFilter; -import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; -import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; -import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; -import org.apache.eagle.alert.utils.AlertConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener { - private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class); - private final AlertPublisher alertPublisher; - private volatile Map<String, Publishment> cachedPublishments = new HashMap<>(); - private volatile Map<String, PolicyDefinition> policyDefinitionMap; - private volatile Map<String, StreamDefinition> streamDefinitionMap; - private AlertTemplateEngine alertTemplateEngine; - - private boolean logEventEnabled; - private TopologyContext context; - private AlertStreamFilter alertFilter; - - public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) { - super(alertPublisherName, coordinatorService, config); - this.alertPublisher = new AlertPublisherImpl(alertPublisherName); - - if (config != null && config.hasPath("topology.logEventEnabled")) { - logEventEnabled = config.getBoolean("topology.logEventEnabled"); - } - } - - @Override - public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService coordinatorService, Config config, TopologyContext context) { - coordinatorService.registerListener(this); - coordinatorService.init(config, MetadataType.ALERT_PUBLISH_BOLT); - this.alertPublisher.init(config, stormConf); - streamContext = new StreamContextImpl(config, context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context); - this.context = context; - this.alertTemplateEngine = AlertTemplateProvider.createAlertTemplateEngine(); - this.alertTemplateEngine.init(config); - this.alertFilter = new PipeStreamFilter(new AlertContextEnrichFilter(this), new AlertTemplateFilter(alertTemplateEngine)); - } - - @Override - public void execute(Tuple input) { - try { - streamContext.counter().incr("receive_count"); - PublishPartition partition = (PublishPartition) input.getValueByField(AlertConstants.FIELD_0); - AlertStreamEvent event = (AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1); - if (logEventEnabled) { - LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); - } - AlertStreamEvent filteredEvent = alertFilter.filter(event); - if (filteredEvent != null) { - alertPublisher.nextEvent(partition, filteredEvent); - } - this.collector.ack(input); - streamContext.counter().incr("ack_count"); - } catch (Throwable ex) { - streamContext.counter().incr("fail_count"); - LOG.error(ex.getMessage(), ex); - collector.reportError(ex); - } - } - - @Override - public void cleanup() { - alertPublisher.close(); - super.cleanup(); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields()); - } - - @Override - public synchronized void onAlertPublishSpecChange(PublishSpec pubSpec, Map<String, StreamDefinition> sds) { - if (pubSpec == null) { - return; - } - this.streamDefinitionMap = sds; - - List<Publishment> newPublishments = pubSpec.getPublishments(); - if (newPublishments == null) { - LOG.info("no publishments with PublishSpec {} for this topology", pubSpec); - return; - } - - Map<String, Publishment> newPublishmentsMap = new HashMap<>(); - newPublishments.forEach(p -> newPublishmentsMap.put(p.getName(), p)); - MapComparator<String, Publishment> comparator = new MapComparator<>(newPublishmentsMap, cachedPublishments); - comparator.compare(); - - List<Publishment> beforeModified = new ArrayList<>(); - comparator.getModified().forEach(p -> beforeModified.add(cachedPublishments.get(p.getName()))); - alertPublisher.onPublishChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), beforeModified); - - // switch - cachedPublishments = newPublishmentsMap; - specVersion = pubSpec.getVersion(); - } - - @Override - public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) { - List<String> policyToRemove = new ArrayList<>(); - if (this.policyDefinitionMap != null) { - policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList())); - } - - this.policyDefinitionMap = pds; - this.streamDefinitionMap = sds; - - for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) { - try { - this.alertTemplateEngine.register(entry.getValue()); - } catch (Throwable throwable) { - LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable); - } - } - - for (String policyId : policyToRemove) { - try { - this.alertTemplateEngine.unregister(policyId); - } catch (Throwable throwable) { - LOG.error("Failed to unregister policy {} from template engine", policyId, throwable); - } - } - } - - private class AlertContextEnrichFilter implements AlertStreamFilter { - private final AlertPublisherBolt alertPublisherBolt; - - private AlertContextEnrichFilter(AlertPublisherBolt alertPublisherBolt) { - this.alertPublisherBolt = alertPublisherBolt; - } - - /** - * TODO: Refactor wrapAlertPublishEvent into alertTemplateEngine and remove extraData from AlertStreamEvent. - */ - @Override - public AlertStreamEvent filter(AlertStreamEvent event) { - event.ensureAlertId(); - Map<String, Object> extraData = new HashMap<>(); - List<String> appIds = new ArrayList<>(); - if (alertPublisherBolt.policyDefinitionMap == null || alertPublisherBolt.streamDefinitionMap == null) { - LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized"); - } else { - PolicyDefinition policyDefinition = alertPublisherBolt.policyDefinitionMap.get(event.getPolicyId()); - if (alertPublisherBolt.policyDefinitionMap != null && policyDefinition != null) { - for (String inputStreamId : policyDefinition.getInputStreams()) { - StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId); - if (sd != null) { - extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId()); - appIds.add(sd.getStreamSource()); - } - } - extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds); - extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, policyDefinition.getDefinition().getValue()); - event.setSeverity(policyDefinition.getAlertSeverity()); - event.setCategory(policyDefinition.getAlertCategory()); - } - event.setContext(extraData); - } - return event; - } - } - - private class AlertTemplateFilter implements AlertStreamFilter { - private final AlertTemplateEngine alertTemplateEngine; - - private AlertTemplateFilter(AlertTemplateEngine alertTemplateEngine) { - this.alertTemplateEngine = alertTemplateEngine; - } - - @Override - public AlertStreamEvent filter(AlertStreamEvent event) { - return this.alertTemplateEngine.filter(event); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java deleted file mode 100644 index 5c65d91..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/MapComparator.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.runner; - -import org.apache.commons.collections.CollectionUtils; - -import java.util.*; - -/** - * Since 5/2/16. - */ -public class MapComparator<K, V> { - private Map<K, V> map1; - private Map<K, V> map2; - private List<V> added = new ArrayList<>(); - private List<V> removed = new ArrayList<>(); - private List<V> modified = new ArrayList<>(); - - public MapComparator(Map<K, V> map1, Map<K, V> map2) { - this.map1 = map1; - this.map2 = map2; - } - - @SuppressWarnings("unchecked") - public void compare() { - Set<K> keys1 = map1.keySet(); - Set<K> keys2 = map2.keySet(); - Collection<K> addedKeys = CollectionUtils.subtract(keys1, keys2); - Collection<K> removedKeys = CollectionUtils.subtract(keys2, keys1); - Collection<K> modifiedKeys = CollectionUtils.intersection(keys1, keys2); - - addedKeys.forEach(k -> added.add(map1.get(k))); - removedKeys.forEach(k -> removed.add(map2.get(k))); - modifiedKeys.forEach(k -> { - if (!map1.get(k).equals(map2.get(k))) { - modified.add(map1.get(k)); - } - }); - } - - public List<V> getAdded() { - return added; - } - - public List<V> getRemoved() { - return removed; - } - - public List<V> getModified() { - return modified; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java deleted file mode 100644 index 771a667..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricConsumer.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import org.apache.eagle.alert.metric.IMetricSystem; -import org.apache.eagle.alert.metric.MetricSystem; -import backtype.storm.metric.api.IMetricsConsumer; -import backtype.storm.task.IErrorReporter; -import backtype.storm.task.TopologyContext; -import com.codahale.metrics.Gauge; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * Share same metric system. - */ -public class StormMetricConsumer implements IMetricsConsumer { - public static final Logger LOG = LoggerFactory.getLogger(StormMetricConsumer.class); - private String topologyName; - private IMetricSystem metricSystem; - private String topologyId; - - @SuppressWarnings( {"serial", "rawtypes"}) - @Override - public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { - Config config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults()); - topologyName = config.getString("appId"); - topologyId = context.getStormId(); - metricSystem = MetricSystem.load(config); - metricSystem.tags(new HashMap<String, Object>() { - { - put("appId", topologyName); - put("stormId", topologyId); - } - }); - metricSystem.start(); - } - - @SuppressWarnings( {"unchecked", "rawtypes"}) - @Override - public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { - synchronized (metricSystem) { - List<String> metricList = new LinkedList<>(); - for (DataPoint dataPoint : dataPoints) { - if (dataPoint.value instanceof Map) { - Map<String, Object> values = (Map<String, Object>) dataPoint.value; - for (Map.Entry<String, Object> entry : values.entrySet()) { - String metricName = buildMetricName(taskInfo, dataPoint.name, entry.getKey()); - metricList.add(metricName); - Gauge gauge = metricSystem.registry().getGauges().get(metricName); - if (gauge == null) { - LOG.info("Register metric {}", metricName); - gauge = new DataPointGauge(entry.getValue()); - metricSystem.registry().register(metricName, gauge); - } else { - ((DataPointGauge) gauge).setValue(entry.getValue()); - } - } - } else { - String metricName = buildMetricName(taskInfo, dataPoint.name); - metricList.add(metricName); - LOG.info("Register metric {}", metricName); - Gauge gauge = metricSystem.registry().getGauges().get(metricName); - if (gauge == null) { - LOG.info("Register metric {}", metricName); - gauge = new DataPointGauge(dataPoint.value); - metricSystem.registry().register(metricName, gauge); - } else { - ((DataPointGauge) gauge).setValue(dataPoint.value); - } - } - } - metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0); - metricSystem.report(); - metricSystem.registry().getGauges().values().forEach((gauge -> { - if (gauge instanceof DataPointGauge) { - ((DataPointGauge) gauge).reset(); - } - })); - LOG.info("Reported {} metric data points from {} [{}]", dataPoints.size(), taskInfo.srcComponentId, taskInfo.srcTaskId); - } - } - - private class DataPointGauge implements Gauge<Object> { - private Object value; - - public DataPointGauge(Object initialValue) { - this.value = initialValue; - } - - @Override - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } - - public void reset() { - this.value = 0; - } - } - - private String buildMetricName(TaskInfo taskInfo, String... name) { - return String.join(".", StringUtils.join(name, ".").replace("/", "."), taskInfo.srcComponentId, taskInfo.srcTaskId + ""); - } - - @Override - public void cleanup() { - metricSystem.stop(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java deleted file mode 100644 index 3c13ff7..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StormMetricTaggedConsumer.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import org.apache.eagle.alert.metric.IMetricSystem; -import org.apache.eagle.alert.metric.MetricSystem; -import backtype.storm.metric.api.IMetricsConsumer; -import backtype.storm.task.IErrorReporter; -import backtype.storm.task.TopologyContext; -import com.codahale.metrics.Gauge; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.metric.MetricConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * Per MetricSystem instance per task. - */ -public class StormMetricTaggedConsumer implements IMetricsConsumer { - public static final Logger LOG = LoggerFactory.getLogger(StormMetricTaggedConsumer.class); - private final Map<String, MetricSystem> metricSystems = new HashMap<>(); - private Config config; - private String metricNamePrefix; - private Map<String, Object> baseTags = new HashMap<>(); - - @SuppressWarnings("rawtypes") - @Override - public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { - this.config = ConfigFactory.parseString((String) registrationArgument, ConfigParseOptions.defaults()); - - if (config.hasPath("appId")) { - baseTags.put("appId", config.getString("appId")); - } - - if (config.hasPath("siteId")) { - baseTags.put("siteId", config.getString("siteId")); - } - - baseTags.put("appExecId", context.getStormId()); - - if (config.hasPath(MetricConfigs.METRIC_PREFIX_CONF)) { - metricNamePrefix = config.getString(MetricConfigs.METRIC_PREFIX_CONF); - } - } - - @SuppressWarnings("serial") - @Override - public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { - synchronized (metricSystems) { - String uniqueTaskKey = buildUniqueTaskKey(taskInfo); - MetricSystem metricSystem = metricSystems.get(uniqueTaskKey); - if (metricSystem == null) { - metricSystem = MetricSystem.load(config); - metricSystems.put(uniqueTaskKey, metricSystem); - metricSystem.tags(baseTags); - metricSystem.tags(new HashMap<String, Object>() { - { - put("componentId", taskInfo.srcComponentId); - put("taskId", taskInfo.srcTaskId); - } - }); - metricSystem.start(); - LOG.info("Initialized metric reporter for {}", uniqueTaskKey); - } - report(metricSystem, taskInfo, dataPoints); - if (LOG.isDebugEnabled()) { - LOG.debug("Reported {} metric points from {}", dataPoints.size(), uniqueTaskKey); - } - } - } - - @SuppressWarnings( {"unchecked", "rawtypes"}) - private void report(MetricSystem metricSystem, TaskInfo taskInfo, Collection<DataPoint> dataPoints) { - List<String> metricList = new LinkedList<>(); - for (DataPoint dataPoint : dataPoints) { - if (dataPoint.value instanceof Map) { - Map<String, Object> values = (Map<String, Object>) dataPoint.value; - for (Map.Entry<String, Object> entry : values.entrySet()) { - String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name, entry.getKey()); - metricList.add(metricName); - Gauge gauge = metricSystem.registry().getGauges().get(metricName); - if (gauge == null) { - gauge = new DataPointGauge(entry.getValue()); - metricSystem.registry().register(metricName, gauge); - LOG.debug("Register metric {}", metricName); - } else { - ((DataPointGauge) gauge).setValue(entry.getValue()); - } - } - } else { - String metricName = buildSimpleMetricName(metricNamePrefix, taskInfo, dataPoint.name); - metricList.add(metricName); - Gauge gauge = metricSystem.registry().getGauges().get(metricName); - if (gauge == null) { - gauge = new DataPointGauge(dataPoint.value); - metricSystem.registry().register(metricName, gauge); - LOG.debug("Register metric {}", metricName); - } else { - ((DataPointGauge) gauge).setValue(dataPoint.value); - } - } - } - metricSystem.registry().removeMatching((name, metric) -> metricList.indexOf(name) < 0); - metricSystem.report(); - metricSystem.registry().getGauges().values().forEach((gauge -> { - if (gauge instanceof DataPointGauge) { - ((DataPointGauge) gauge).reset(); - } - })); - } - - private static class DataPointGauge implements Gauge<Object> { - private Object value; - - public DataPointGauge(Object initialValue) { - this.setValue(initialValue); - } - - @Override - public Object getValue() { - return value; - } - - public void setValue(Object value) { - this.value = value; - } - - public void reset() { - this.value = 0; - } - } - - private static String buildUniqueTaskKey(TaskInfo taskInfo) { - return String.format("%s[%s]", taskInfo.srcComponentId, taskInfo.srcTaskId); - } - - private static String buildSimpleMetricName(String prefix, TaskInfo taskInfo, String... name) { - String metricName = String.join(".", StringUtils.join(name, ".").replace("/", ".")).replace("__", ""); - if (prefix == null) { - return metricName; - } else { - return String.format("%s%s", prefix, metricName); - } - } - - @Override - public void cleanup() { - metricSystems.values().forEach(IMetricSystem::stop); - metricSystems.clear(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java deleted file mode 100644 index e37b680..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.MetadataType; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.router.StreamRouter; -import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener; -import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; -import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector; -import org.apache.eagle.alert.engine.router.impl.StreamRouterImpl; -import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; -import org.apache.eagle.alert.utils.AlertConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; - -public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouterBoltSpecListener, SerializationMetadataProvider { - private static final Logger LOG = LoggerFactory.getLogger(StreamRouterBolt.class); - private static final long serialVersionUID = -7611470889316430372L; - private StreamRouter router; - private StreamRouterBoltOutputCollector routeCollector; - // mapping from StreamPartition to StreamSortSpec - private volatile Map<StreamPartition, StreamSortSpec> cachedSSS = new HashMap<>(); - // mapping from StreamPartition(streamId, groupbyspec) to StreamRouterSpec - private volatile Map<StreamPartition, List<StreamRouterSpec>> cachedSRS = new HashMap<>(); - - public StreamRouterBolt(String boltId, Config config, IMetadataChangeNotifyService changeNotifyService) { - super(boltId, changeNotifyService, config); - this.router = new StreamRouterImpl(boltId + "-router"); - } - - - @Override - public void internalPrepare(OutputCollector collector, IMetadataChangeNotifyService changeNotifyService, Config config, TopologyContext context) { - streamContext = new StreamContextImpl(config, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context); - routeCollector = new StreamRouterBoltOutputCollector(getBoltId(), new StormOutputCollector(collector, serializer), this.getOutputStreamIds(), streamContext); - router.prepare(streamContext, routeCollector); - changeNotifyService.registerListener(this); - changeNotifyService.init(config, MetadataType.STREAM_ROUTER_BOLT); - } - - @Override - public void execute(Tuple input) { - try { - this.streamContext.counter().incr("execute_count"); - this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input)); - } catch (Exception ex) { - this.streamContext.counter().incr("fail_count"); - LOG.error(ex.getMessage(), ex); - this.collector.fail(input); - } - } - - @Override - public void cleanup() { - this.router.close(); - super.cleanup(); - } - - /** - * Compare with metadata snapshot cache to generate diff like added, removed and modified between different versions. - * - * @param spec - */ - @SuppressWarnings("unchecked") - @Override - public synchronized void onStreamRouteBoltSpecChange(RouterSpec spec, Map<String, StreamDefinition> sds) { - sanityCheck(spec); - - // figure out added, removed, modified StreamSortSpec - Map<StreamPartition, StreamSortSpec> newSSS = spec.makeSSS(); - - Set<StreamPartition> newStreamIds = newSSS.keySet(); - Set<StreamPartition> cachedStreamIds = cachedSSS.keySet(); - Collection<StreamPartition> addedStreamIds = CollectionUtils.subtract(newStreamIds, cachedStreamIds); - Collection<StreamPartition> removedStreamIds = CollectionUtils.subtract(cachedStreamIds, newStreamIds); - Collection<StreamPartition> modifiedStreamIds = CollectionUtils.intersection(newStreamIds, cachedStreamIds); - - Map<StreamPartition, StreamSortSpec> added = new HashMap<>(); - Map<StreamPartition, StreamSortSpec> removed = new HashMap<>(); - Map<StreamPartition, StreamSortSpec> modified = new HashMap<>(); - addedStreamIds.forEach(s -> added.put(s, newSSS.get(s))); - removedStreamIds.forEach(s -> removed.put(s, cachedSSS.get(s))); - modifiedStreamIds.forEach(s -> { - if (!newSSS.get(s).equals(cachedSSS.get(s))) { // this means StreamSortSpec is changed for one specific streamId - modified.put(s, newSSS.get(s)); - } - }); - if (LOG.isDebugEnabled()) { - LOG.debug("added StreamSortSpec " + added); - LOG.debug("removed StreamSortSpec " + removed); - LOG.debug("modified StreamSortSpec " + modified); - } - router.onStreamSortSpecChange(added, removed, modified); - // switch cache - cachedSSS = newSSS; - - // figure out added, removed, modified StreamRouterSpec - Map<StreamPartition, List<StreamRouterSpec>> newSRS = spec.makeSRS(); - - Set<StreamPartition> newStreamPartitions = newSRS.keySet(); - Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet(); - - Collection<StreamPartition> addedStreamPartitions = CollectionUtils.subtract(newStreamPartitions, cachedStreamPartitions); - Collection<StreamPartition> removedStreamPartitions = CollectionUtils.subtract(cachedStreamPartitions, newStreamPartitions); - Collection<StreamPartition> modifiedStreamPartitions = CollectionUtils.intersection(newStreamPartitions, cachedStreamPartitions); - - Collection<StreamRouterSpec> addedRouterSpecs = new ArrayList<>(); - Collection<StreamRouterSpec> removedRouterSpecs = new ArrayList<>(); - Collection<StreamRouterSpec> modifiedRouterSpecs = new ArrayList<>(); - addedStreamPartitions.forEach(s -> addedRouterSpecs.addAll(newSRS.get(s))); - removedStreamPartitions.forEach(s -> removedRouterSpecs.addAll(cachedSRS.get(s))); - modifiedStreamPartitions.forEach(s -> { - if (!CollectionUtils.isEqualCollection(newSRS.get(s), cachedSRS.get(s))) { // this means StreamRouterSpec is changed for one specific StreamPartition - modifiedRouterSpecs.addAll(newSRS.get(s)); - } - }); - - if (LOG.isDebugEnabled()) { - LOG.debug("added StreamRouterSpec " + addedRouterSpecs); - LOG.debug("removed StreamRouterSpec " + removedRouterSpecs); - LOG.debug("modified StreamRouterSpec " + modifiedRouterSpecs); - } - - routeCollector.onStreamRouterSpecChange(addedRouterSpecs, removedRouterSpecs, modifiedRouterSpecs, sds); - // switch cache - cachedSRS = newSRS; - sdf = sds; - specVersion = spec.getVersion(); - } - - /** - * in correlation cases, multiple streams will go to the same queue for correlation policy. - * - * @param spec - */ - private void sanityCheck(RouterSpec spec) { - Set<String> totalRequestedSlots = new HashSet<>(); - for (StreamRouterSpec s : spec.getRouterSpecs()) { - for (PolicyWorkerQueue q : s.getTargetQueue()) { - List<String> workers = new ArrayList<>(); - q.getWorkers().forEach(w -> workers.add(w.getBoltId())); - totalRequestedSlots.addAll(workers); - } - } - if (totalRequestedSlots.size() > getOutputStreamIds().size()) { - String error = String.format("Requested slots are not consistent with provided slots, %s, %s", totalRequestedSlots, getOutputStreamIds()); - LOG.error(error); - throw new IllegalStateException(error); - } - } - - public StreamRouter getStreamRouter() { - return router; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java deleted file mode 100755 index 3f06f66..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/UnitTopologyRunner.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.runner; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.spout.CorrelationSpout; -import org.apache.eagle.alert.utils.AlertConstants; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; - -/** - * By default - * 1. one spout with multiple tasks - * 2. multiple router bolts with each bolt having exactly one task - * 3. multiple alert bolts with each bolt having exactly one task - * 4. one publish bolt with multiple tasks - */ -public class UnitTopologyRunner { - private static final Logger LOG = LoggerFactory.getLogger(UnitTopologyRunner.class); - public static final String spoutName = "alertEngineSpout"; - private static final String streamRouterBoltNamePrefix = "streamRouterBolt"; - private static final String alertBoltNamePrefix = "alertBolt"; - public static final String alertPublishBoltName = "alertPublishBolt"; - - public static final String TOTAL_WORKER_NUM = "topology.numOfTotalWorkers"; - public static final String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; - public static final String ROUTER_TASK_NUM = "topology.numOfRouterBolts"; - public static final String ALERT_TASK_NUM = "topology.numOfAlertBolts"; - public static final String PUBLISH_TASK_NUM = "topology.numOfPublishTasks"; - public static final String PUBLISH_EXECUTOR_NUM = "topology.numOfPublishExecutors"; - public static final String LOCAL_MODE = "topology.localMode"; - public static final String MESSAGE_TIMEOUT_SECS = "topology.messageTimeoutSecs"; - public static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 3600; - - private final IMetadataChangeNotifyService metadataChangeNotifyService; - private backtype.storm.Config givenStormConfig = null; - - public UnitTopologyRunner(IMetadataChangeNotifyService metadataChangeNotifyService) { - this.metadataChangeNotifyService = metadataChangeNotifyService; - } - - public UnitTopologyRunner(ZKMetadataChangeNotifyService changeNotifyService, backtype.storm.Config stormConfig) { - this(changeNotifyService); - this.givenStormConfig = stormConfig; - } - - // ----------------------------- - // Storm Topology Submit Helper - // ----------------------------- - - private void run(String topologyId, - int numOfTotalWorkers, - int numOfSpoutTasks, - int numOfRouterBolts, - int numOfAlertBolts, - int numOfPublishExecutors, - int numOfPublishTasks, - Config config, - boolean localMode) { - - backtype.storm.Config stormConfig = givenStormConfig == null ? new backtype.storm.Config() : givenStormConfig; - // TODO: Configurable metric consumer instance number - - int messageTimeoutSecs = config.hasPath(MESSAGE_TIMEOUT_SECS) ? config.getInt(MESSAGE_TIMEOUT_SECS) : DEFAULT_MESSAGE_TIMEOUT_SECS; - LOG.info("Set topology.message.timeout.secs as {}", messageTimeoutSecs); - stormConfig.setMessageTimeoutSecs(messageTimeoutSecs); - - if (config.hasPath("metric")) { - stormConfig.registerMetricsConsumer(StormMetricTaggedConsumer.class, config.root().render(ConfigRenderOptions.concise()), 1); - } - - stormConfig.setNumWorkers(numOfTotalWorkers); - StormTopology topology = buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config).createTopology(); - - if (localMode) { - LOG.info("Submitting as local mode"); - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology(topologyId, stormConfig, topology); - Utils.sleep(Long.MAX_VALUE); - } else { - LOG.info("Submitting as cluster mode"); - try { - StormSubmitter.submitTopologyWithProgressBar(topologyId, stormConfig, topology); - } catch (Exception ex) { - LOG.error("fail submitting topology {}", topology, ex); - throw new IllegalStateException(ex); - } - } - } - - public void run(String topologyId, Config config) { - int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); - int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); - int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); - int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); - int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - boolean localMode = config.getBoolean(LOCAL_MODE); - int numOfTotalWorkers = config.getInt(TOTAL_WORKER_NUM); - run(topologyId, numOfTotalWorkers, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config, localMode); - } - - public IMetadataChangeNotifyService getMetadataChangeNotifyService() { - return metadataChangeNotifyService; - } - - // --------------------------- - // Build Storm Topology - // --------------------------- - - public TopologyBuilder buildTopology(String topologyId, - int numOfSpoutTasks, - int numOfRouterBolts, - int numOfAlertBolts, - int numOfPublishExecutors, - int numOfPublishTasks, - Config config) { - StreamRouterBolt[] routerBolts = new StreamRouterBolt[numOfRouterBolts]; - AlertBolt[] alertBolts = new AlertBolt[numOfAlertBolts]; - - TopologyBuilder builder = new TopologyBuilder(); - - // construct Spout object - CorrelationSpout spout = new CorrelationSpout(config, topologyId, getMetadataChangeNotifyService(), numOfRouterBolts, spoutName, streamRouterBoltNamePrefix); - builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); - - // construct StreamRouterBolt objects - for (int i = 0; i < numOfRouterBolts; i++) { - routerBolts[i] = new StreamRouterBolt(streamRouterBoltNamePrefix + i, config, getMetadataChangeNotifyService()); - } - - // construct AlertBolt objects - for (int i = 0; i < numOfAlertBolts; i++) { - alertBolts[i] = new AlertBolt(alertBoltNamePrefix + i, config, getMetadataChangeNotifyService()); - } - - // construct AlertPublishBolt object - AlertPublisherBolt publisherBolt = new AlertPublisherBolt(alertPublishBoltName, config, getMetadataChangeNotifyService()); - - // connect spout and router bolt, also define output streams for downstreaming alert bolt - for (int i = 0; i < numOfRouterBolts; i++) { - String boltName = streamRouterBoltNamePrefix + i; - - // define output streams, which are based on - String streamId = StreamIdConversion.generateStreamIdBetween(spoutName, boltName); - List<String> outputStreamIds = new ArrayList<>(numOfAlertBolts); - for (int j = 0; j < numOfAlertBolts; j++) { - String sid = StreamIdConversion.generateStreamIdBetween(boltName, alertBoltNamePrefix + j); - outputStreamIds.add(sid); - } - routerBolts[i].declareOutputStreams(outputStreamIds); - - /** - * TODO potentially one route bolt may have multiple tasks, so that is field grouping by groupby fields - * that means we need a separate field to become groupby field - */ - builder.setBolt(boltName, routerBolts[i]).fieldsGrouping(spoutName, streamId, new Fields()).setNumTasks(1); - } - - // connect router bolt and alert bolt, also define output streams for downstreaming alert publish bolt - for (int i = 0; i < numOfAlertBolts; i++) { - String boltName = alertBoltNamePrefix + i; - BoltDeclarer boltDeclarer = builder.setBolt(boltName, alertBolts[i]).setNumTasks(1); - for (int j = 0; j < numOfRouterBolts; j++) { - String streamId = StreamIdConversion.generateStreamIdBetween(streamRouterBoltNamePrefix + j, boltName); - boltDeclarer.fieldsGrouping(streamRouterBoltNamePrefix + j, streamId, new Fields()); - } - } - - // connect alert bolt and alert publish bolt, this is the last bolt in the pipeline - BoltDeclarer boltDeclarer = builder.setBolt(alertPublishBoltName, publisherBolt, numOfPublishExecutors).setNumTasks(numOfPublishTasks); - for (int i = 0; i < numOfAlertBolts; i++) { - boltDeclarer.fieldsGrouping(alertBoltNamePrefix + i, new Fields(AlertConstants.FIELD_0)); - } - - return builder; - } - - public TopologyBuilder buildTopology(String topologyId, Config config) { - int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); - int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); - int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); - int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); - int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - - return buildTopology(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); - } - - // --------------------------- - // Build Topology Metadata - // --------------------------- - - public static Topology buildTopologyMetadata(String topologyId, Config config) { - int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); - int numOfRouterBolts = config.getInt(ROUTER_TASK_NUM); - int numOfAlertBolts = config.getInt(ALERT_TASK_NUM); - int numOfPublishExecutors = config.getInt(PUBLISH_EXECUTOR_NUM); - int numOfPublishTasks = config.getInt(PUBLISH_TASK_NUM); - - return buildTopologyMetadata(topologyId, numOfSpoutTasks, numOfRouterBolts, numOfAlertBolts, numOfPublishExecutors, numOfPublishTasks, config); - } - - public static Topology buildTopologyMetadata(String topologyId, - int numOfSpoutTasks, - int numOfRouterBolts, - int numOfAlertBolts, - int numOfPublishExecutors, - int numOfPublishTasks, - Config config) { - Topology topology = new Topology(); - topology.setName(topologyId); - topology.setNumOfSpout(numOfSpoutTasks); - topology.setNumOfAlertBolt(numOfAlertBolts); - topology.setNumOfGroupBolt(numOfRouterBolts); - topology.setNumOfPublishBolt(numOfPublishTasks); - - // Set Spout ID - topology.setSpoutId(spoutName); - - // Set Router (Group) ID - Set<String> streamRouterBoltNames = new TreeSet<>(); - for (int i = 0; i < numOfRouterBolts; i++) { - streamRouterBoltNames.add(streamRouterBoltNamePrefix + i); - } - topology.setGroupNodeIds(streamRouterBoltNames); - - // Set Alert Bolt ID - Set<String> alertBoltIds = new TreeSet<>(); - for (int i = 0; i < numOfAlertBolts; i++) { - alertBoltIds.add(alertBoltNamePrefix + i); - } - topology.setAlertBoltIds(alertBoltIds); - - // Set Publisher ID - topology.setPubBoltId(alertPublishBoltName); - - // TODO: Load bolts' parallelism from configuration, currently keep 1 by default. - - topology.setSpoutParallelism(1); - topology.setGroupParallelism(1); - topology.setAlertParallelism(1); - - return topology; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java deleted file mode 100644 index db461d8..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonScheme.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.scheme; - -import backtype.storm.spout.Scheme; -import backtype.storm.tuple.Fields; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Expects flat Json scheme. - */ -public class JsonScheme implements Scheme { - private static final long serialVersionUID = -8352896475656975577L; - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(JsonScheme.class); - private static final ObjectMapper mapper = new ObjectMapper(); - - private String topic; - - @SuppressWarnings("rawtypes") - public JsonScheme(String topic, Map conf) { - this.topic = topic; - } - - @Override - public Fields getOutputFields() { - return new Fields("f1"); - } - - @Override - @SuppressWarnings("rawtypes") - public List<Object> deserialize(byte[] ser) { - try { - if (ser != null) { - Map map = mapper.readValue(ser, Map.class); - return Arrays.asList(topic, map); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Content is null, ignore"); - } - } - } catch (IOException e) { - try { - LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e); - } catch (Exception ex) { - LOG.error(ex.getMessage(), ex); - } - } - return null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java deleted file mode 100644 index 4e02edb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/JsonStringStreamNameSelector.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.scheme; - -import org.apache.eagle.alert.coordination.model.StreamNameSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Properties; - -/** - * A strategy to get stream name from message tuple. - * @since 5/5/16. - */ -public class JsonStringStreamNameSelector implements StreamNameSelector { - private static final Logger LOG = LoggerFactory.getLogger(JsonStringStreamNameSelector.class); - public static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName"; - public static final String FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY = "fieldNamesToInferStreamName"; - public static final String STREAM_NAME_FORMAT = "streamNameFormat"; - - private String userProvidedStreamName; - private String[] fieldNamesToInferStreamName; - private String streamNameFormat; - - public JsonStringStreamNameSelector(Properties prop) { - userProvidedStreamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY); - String fields = prop.getProperty(FIELD_NAMES_TO_INFER_STREAM_NAME_PROPERTY); - if (fields != null) { - fieldNamesToInferStreamName = fields.split(","); - } - streamNameFormat = prop.getProperty(STREAM_NAME_FORMAT); - if (streamNameFormat == null) { - LOG.warn("no stream name format found, this might cause default stream name be used which is dis-encouraged. Possibly this is a mis-configuration."); - } - } - - @Override - public String getStreamName(Map<String, Object> tuple) { - if (userProvidedStreamName != null) { - return userProvidedStreamName; - } else if (fieldNamesToInferStreamName != null && streamNameFormat != null) { - Object[] args = new Object[fieldNamesToInferStreamName.length]; - for (int i = 0; i < fieldNamesToInferStreamName.length; i++) { - Object colValue = tuple.get(fieldNamesToInferStreamName[i]); - args[i] = colValue; - } - return String.format(streamNameFormat, args); - } - if (LOG.isDebugEnabled()) { - LOG.debug("can not find the stream name for data source. Use the default stream, possibly this means mis-configuration of datasource!"); - } - return "defaultStringStream"; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java deleted file mode 100644 index 57c8897..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringScheme.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.scheme; - -import backtype.storm.spout.Scheme; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * used for parsing plain string. - */ -public class PlainStringScheme implements Scheme { - private static final long serialVersionUID = 5969724968671646310L; - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(PlainStringScheme.class); - private String topic; - - @SuppressWarnings("rawtypes") - public PlainStringScheme(String topic, Map conf) { - this.topic = topic; - } - - private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; - public static final String STRING_SCHEME_KEY = "str"; - - public static String deserializeString(byte[] buff) { - return new String(buff, UTF8_CHARSET); - } - - public Fields getOutputFields() { - return new Fields(STRING_SCHEME_KEY); - } - - @SuppressWarnings( {"unchecked", "rawtypes"}) - @Override - public List<Object> deserialize(byte[] ser) { - Map m = new HashMap<>(); - m.put("value", deserializeString(ser)); - m.put("timestamp", System.currentTimeMillis()); - return new Values(topic, m); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java deleted file mode 100644 index 0b88483..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/scheme/PlainStringStreamNameSelector.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.scheme; - -import org.apache.eagle.alert.coordination.model.StreamNameSelector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Properties; - -/** - * Since 5/3/16. - */ -public class PlainStringStreamNameSelector implements StreamNameSelector { - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(PlainStringStreamNameSelector.class); - private static final String USER_PROVIDED_STREAM_NAME_PROPERTY = "userProvidedStreamName"; - private static final String DEFAULT_STRING_STREAM_NAME = "defaultStringStream"; - - private String streamName; - - public PlainStringStreamNameSelector(Properties prop) { - streamName = prop.getProperty(USER_PROVIDED_STREAM_NAME_PROPERTY); - if (streamName == null) { - streamName = DEFAULT_STRING_STREAM_NAME; - } - } - - @Override - public String getStreamName(Map<String, Object> tuple) { - return streamName; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java deleted file mode 100644 index 1e8f440..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventDigestSerializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.serialization.impl.StreamEventSerializer; -import org.apache.eagle.alert.engine.serialization.impl.StreamPartitionDigestSerializer; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * TODO: Seams the complexity dosen't bring enough performance improve. - * - * @see PartitionedEvent - */ -@Deprecated -public class PartitionedEventDigestSerializer implements Serializer<PartitionedEvent> { - private final Serializer<StreamEvent> streamEventSerializer; - private final Serializer<StreamPartition> streamPartitionSerializer; - - public PartitionedEventDigestSerializer(SerializationMetadataProvider serializationMetadataProvider) { - this.streamEventSerializer = new StreamEventSerializer(serializationMetadataProvider); - this.streamPartitionSerializer = StreamPartitionDigestSerializer.INSTANCE; - } - - @Override - public void serialize(PartitionedEvent entity, DataOutput dataOutput) throws IOException { - dataOutput.writeLong(entity.getPartitionKey()); - streamEventSerializer.serialize(entity.getEvent(), dataOutput); - streamPartitionSerializer.serialize(entity.getPartition(), dataOutput); - } - - @Override - public PartitionedEvent deserialize(DataInput dataInput) throws IOException { - PartitionedEvent event = new PartitionedEvent(); - event.setPartitionKey(dataInput.readLong()); - StreamEvent streamEvent = streamEventSerializer.deserialize(dataInput); - event.setEvent(streamEvent); - StreamPartition partition = streamPartitionSerializer.deserialize(dataInput); - partition.setStreamId(streamEvent.getStreamId()); - event.setPartition(partition); - return event; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java deleted file mode 100644 index 428ad34..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializer.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; - -import java.io.IOException; - -public interface PartitionedEventSerializer { - - byte[] serialize(PartitionedEvent entity) throws IOException; - - PartitionedEvent deserialize(byte[] bytes) throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java deleted file mode 100644 index ef190b4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/SerializationMetadataProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; - -/** - * Integration interface to provide stream definition for serializer. - */ -public interface SerializationMetadataProvider { - /** - * @param streamId - * @return StreamDefinition or null if not exist. - */ - StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException; - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java deleted file mode 100644 index 599152e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/serialization/Serializer.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -public interface Serializer<V> { - void serialize(V value, DataOutput dataOutput) throws IOException; - - V deserialize(DataInput dataInput) throws IOException; -} \ No newline at end of file
