http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java new file mode 100644 index 0000000..40f16e9 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.AlertBoltSpec; +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; +import org.apache.eagle.alert.coordination.model.PublishSpec; +import org.apache.eagle.alert.coordination.model.RouterSpec; +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordination.model.SpoutSpec; +import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata; +import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy; +import org.apache.eagle.alert.coordination.model.StreamRouterSpec; +import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.coordinator.IScheduleContext; +import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; +import org.apache.eagle.alert.coordinator.model.TopologyUsage; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @since Apr 26, 2016 + * Given current policy placement, figure out monitor metadata + * + * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create.... + * FIXME: too many duplicated code logic : check null; add list to map; add to list.. + */ +public class MonitorMetadataGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class); + + private IScheduleContext context; + + public MonitorMetadataGenerator(IScheduleContext context) { + this.context = context; + } + + public ScheduleState generate(List<WorkItem> expandworkSets) { + // topologyId -> SpoutSpec + Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata(); + + // grp-by meta spec(sort & grp) + Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata(); + + // alert bolt spec + Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata(); + + Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata(); + + String uniqueVersion = generateVersion(); + ScheduleState status = new ScheduleState(uniqueVersion, + topoSpoutSpecsMap, + groupSpecsMap, + alertSpecsMap, + publishSpecsMap, + context.getPolicyAssignments().values(), + context.getMonitoredStreams().values(), + context.getPolicies().values(), + context.getStreamSchemas().values()); + return status; + } + + private Map<String, PublishSpec> generatePublishMetadata() { + Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>(); + // prebuild policy to publishment map + Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>(); + for (Publishment pub : context.getPublishments().values()) { + for (String policyId : pub.getPolicyIds()) { + List<Publishment> policyPubs = policyToPub.get(policyId); + if (policyPubs == null) { + policyPubs = new ArrayList<>(); + policyToPub.put(policyId, policyPubs); + } + policyPubs.add(pub); + } + } + + // build per topology + for (TopologyUsage u : context.getTopologyUsages().values()) { + PublishSpec pubSpec = pubSpecs.get(u.getTopoName()); + if (pubSpec == null) { + pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId()); + pubSpecs.put(u.getTopoName(), pubSpec); + } + + for (String p : u.getPolicies()) { + PolicyDefinition definition = context.getPolicies().get(p); + if (definition == null) { + continue; + } + if (policyToPub.containsKey(p)) { + for (Publishment pub : policyToPub.get(p)) { + pubSpec.addPublishment(pub); + } + } + } + } + return pubSpecs; + } + + /** + * FIXME: add auto-increment version number? + */ + private String generateVersion() { + return "spec_version_" + System.currentTimeMillis(); + } + + private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() { + Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>(); + for (TopologyUsage u : context.getTopologyUsages().values()) { + AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName()); + if (alertSpec == null) { + alertSpec = new AlertBoltSpec(u.getTopoName()); + alertSpecs.put(u.getTopoName(), alertSpec); + } + for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) { + for (String policyName : boltUsage.getPolicies()) { + PolicyDefinition definition = context.getPolicies().get(policyName); + alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName()); + } + } + } + return alertSpecs; + } + + private Map<String, RouterSpec> generateGroupbyMonitorMetadata() { + Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>(); + for (TopologyUsage u : context.getTopologyUsages().values()) { + RouterSpec spec = groupSpecsMap.get(u.getTopoName()); + if (spec == null) { + spec = new RouterSpec(u.getTopoName()); + groupSpecsMap.put(u.getTopoName(), spec); + } + + for (MonitoredStream ms : u.getMonitoredStream()) { + // mutiple stream on the same policy group : for correlation group case: + for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) { + StreamRouterSpec routeSpec = new StreamRouterSpec(); + routeSpec.setPartition(partiton); + routeSpec.setStreamId(partiton.getStreamId()); + + for (StreamWorkSlotQueue sq : ms.getQueues()) { + PolicyWorkerQueue queue = new PolicyWorkerQueue(); + queue.setWorkers(sq.getWorkingSlots()); + queue.setPartition(partiton); + routeSpec.addQueue(queue); + } + + spec.addRouterSpec(routeSpec); + } + } + } + + return groupSpecsMap; + } + + private Map<String, SpoutSpec> generateSpoutMonitorMetadata() { + Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap(); + + Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>(); + // streamName -> StreamDefinition + Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas(); + Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata(); + for (TopologyUsage usage : context.getTopologyUsages().values()) { + Topology topo = context.getTopologies().get(usage.getTopoName()); + + // based on data source schemas + // generate topic -> Kafka2TupleMetadata + // generate topic -> Tuple2StreamMetadata (actually the schema selector) + Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>(); + Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>(); + for (String dataSourceId : usage.getDataSources()) { + Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId); + dss.put(ds.getTopic(), ds); + tss.put(ds.getTopic(), ds.getCodec()); + } + + // generate topicId -> StreamRepartitionMetadata + Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>(); + for (String policyName : usage.getPolicies()) { + PolicyDefinition def = context.getPolicies().get(policyName); + + PolicyAssignment assignment = context.getPolicyAssignments().get(policyName); + if (assignment == null) { + LOG.error(" can not find assignment for policy {} ! ", policyName); + continue; + } + + for (StreamPartition policyStreamPartition : def.getPartitionSpec()) { + String stream = policyStreamPartition.getStreamId(); + StreamDefinition schema = streamSchemaMap.get(stream); + String topic = datasourcesMap.get(schema.getDataSource()).getTopic(); + + // add stream name to tuple metadata + if (tss.containsKey(topic)) { + Tuple2StreamMetadata tupleMetadata = tss.get(topic); + tupleMetadata.getActiveStreamNames().add(stream); + } + + // grouping strategy + StreamRepartitionStrategy gs = new StreamRepartitionStrategy(); + gs.partition = policyStreamPartition; + gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts(); + gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName()); + gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds()); + + // add to map + addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs); + } + } + + SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss); + topoSpoutSpecsMap.put(topo.getName(), spoutSpec); + } + return topoSpoutSpecsMap; + } + + /** + * Work queue not a root level object, thus we need to build a map from + * MonitoredStream for later quick lookup + * + * @return + */ + private Map<String, StreamWorkSlotQueue> buildQueueMap() { + Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); + for (MonitoredStream ms : context.getMonitoredStreams().values()) { + for (StreamWorkSlotQueue queue : ms.getQueues()) { + queueMap.put(queue.getQueueId(), queue); + } + } + return queueMap; + } + + private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream, + StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) { + List<StreamRepartitionMetadata> dsStreamMeta; + if (streamsMap.containsKey(topicName)) { + dsStreamMeta = streamsMap.get(topicName); + } else { + dsStreamMeta = new ArrayList<StreamRepartitionMetadata>(); + streamsMap.put(topicName, dsStreamMeta); + } + StreamRepartitionMetadata targetSm = null; + for (StreamRepartitionMetadata sm : dsStreamMeta) { + if (stream.equalsIgnoreCase(sm.getStreamId())) { + targetSm = sm; + break; + } + } + if (targetSm == null) { + targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId()); + dsStreamMeta.add(targetSm); + } + if (!targetSm.groupingStrategies.contains(gs)) { + targetSm.addGroupStrategy(gs); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java new file mode 100644 index 0000000..ea96d79 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl; + +import java.util.List; + +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.engine.coordinator.StreamPartition; + +/** + * Schedule result for one policy + * + * + * @since Apr 26, 2016 + * + */ +public class ScheduleResult { + int code; + String message; + String policyName; + StreamPartition partition; + int index; + List<PolicyAssignment> topoliciesScheduled; + + public String toString() { + return String.format("policy: %s, result code: %d ", policyName, code, message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java new file mode 100644 index 0000000..baa489d --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; + +public class WorkItem { + public final PolicyDefinition def; + public final int requestParallelism; + + public WorkItem(PolicyDefinition def, int workNum) { + this.def = def; + this.requestParallelism = workNum; + } + + public String toString() { + return "policy name: " + def.getName() + "(" + requestParallelism + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java new file mode 100644 index 0000000..a32b8fb --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; +import org.apache.eagle.alert.coordinator.IScheduleContext; +import org.apache.eagle.alert.coordinator.TopologyMgmtService; +import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy; +import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy; +import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; +import org.apache.eagle.alert.coordinator.model.TopologyUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @since Apr 27, 2016 + * + */ +public class WorkQueueBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class); + + private final IScheduleContext context; + private final TopologyMgmtService mgmtService; + + public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) { + this.context = context; + this.mgmtService = mgmtService; + } + + public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size, + Map<String, Object> properties) { + // FIXME: make extensible and configurable + IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService); + List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties); + if (slots.size() < size) { + LOG.error("allocat stream work queue failed, required size"); + return null; + } + StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties, + slots); + calculateGroupIndexAndCount(queue); + assignQueueSlots(stream, queue);// build reverse reference + stream.addQueues(queue); + + return queue; + } + + private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) { + for (WorkSlot slot : queue.getWorkingSlots()) { + TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName()); + AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId()); + boltUsage.addQueue(stream.getStreamGroup(), queue); + u.addMonitoredStream(stream); + } + } + + private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) { + Map<String, Integer> result = new HashMap<String, Integer>(); + int total = 0; + for (WorkSlot slot : queue.getWorkingSlots()) { + if (result.containsKey(slot.getTopologyName())) { + continue; + } + result.put(slot.getTopologyName(), total); + total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt(); + } + + queue.setNumberOfGroupBolts(total); + queue.setTopoGroupStartIndex(result); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java new file mode 100644 index 0000000..28df3c4 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl.strategies; + +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.WorkSlot; + +/** + * @since Apr 27, 2016 + * + */ +public interface IWorkSlotStrategy { + + List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties); + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java new file mode 100644 index 0000000..e755237 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.impl.strategies; + +import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.coordination.model.internal.StreamGroup; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.coordinator.CoordinatorConstants; +import org.apache.eagle.alert.coordinator.IScheduleContext; +import org.apache.eagle.alert.coordinator.TopologyMgmtService; +import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta; +import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; +import org.apache.eagle.alert.coordinator.model.TopologyUsage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +/** + * A simple strategy that only find the bolts in the same topology as the + * required work slots. + * + * Invariant:<br/> + * One slot queue only on the one topology.<br/> + * One topology doesn't contains two same partition slot queues. + * + * @since Apr 27, 2016 + * + */ +public class SameTopologySlotStrategy implements IWorkSlotStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class); + + private final IScheduleContext context; + private final StreamGroup partitionGroup; + private final TopologyMgmtService mgmtService; + +// private final int numOfPoliciesBoundPerBolt; + private final double topoLoadUpbound; + + public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup, + TopologyMgmtService mgmtService) { + this.context = context; + this.partitionGroup = streamPartitionGroup; + this.mgmtService = mgmtService; + + Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR); +// numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT); + topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND); + } + + /** + * @param isDedicated + * - not used yet! + */ + @Override + public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) { + Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size) + .iterator(); + // priority strategy first??? + List<WorkSlot> slots = new ArrayList<WorkSlot>(); + while (it.hasNext()) { + Topology t = it.next(); + if (getQueueOnTopology(size, slots, t)) { + break; + } + } + + if (slots.size() == 0) { + int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology(); + if (size > supportedSize) { + LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize); + return Collections.emptyList(); + } + TopologyMeta topoMeta = mgmtService.creatTopology(); + if (topoMeta == null) { + LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup); + return Collections.emptyList(); + } + + context.getTopologies().put(topoMeta.topologyId, topoMeta.topology); + context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage); + boolean placed = getQueueOnTopology(size, slots, topoMeta.topology); + if (!placed) { + LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size); + } + } + return slots; + } + + private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) { + TopologyUsage u = context.getTopologyUsages().get(t.getName()); + if (!isTopologyAvailable(u)) { + return false; + } + + List<String> bolts = new ArrayList<String>(); + for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) { + if (isBoltAvailable(alertUsage)) { + bolts.add(alertUsage.getBoltId()); + } + + if (bolts.size() == size) { + break; + } + } + + if (bolts.size() == size) { + for (String boltId : bolts) { + WorkSlot slot = new WorkSlot(t.getName(), boltId); + slots.add(slot); + } + return true; + } + return false; + } + + private boolean isTopologyAvailable(TopologyUsage u) { +// for (MonitoredStream stream : u.getMonitoredStream()) { +// if (partition.equals(stream.getStreamParitition())) { +// return false; +// } +// } + if (u == null || u.getLoad() > topoLoadUpbound) { + return false; + } + + return true; + } + + private boolean isBoltAvailable(AlertBoltUsage alertUsage) { + // FIXME : more detail to compare on queue exclusion check + if (alertUsage.getQueueSize() > 0) { + return false; + } + // actually it's now 0; + return true; +// return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java new file mode 100644 index 0000000..e9148f5 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.model; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.eagle.alert.coordination.model.internal.StreamGroup; +import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; + +/** + * @since Mar 28, 2016 + * + */ +public class AlertBoltUsage { + + private String boltId; + private List<String> policies = new ArrayList<String>(); + // the stream partitions group that scheduled for this given alert bolt + private List<StreamGroup> partitions = new ArrayList<StreamGroup>(); + // the slot queue that scheduled for this given aler bolt + private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>(); + private double load; + + public AlertBoltUsage(String anid) { + this.boltId = anid; + } + + public String getBoltId() { + return boltId; + } + + public void setBoltId(String boltId) { + this.boltId = boltId; + } + + public List<String> getPolicies() { + return policies; + } + + public void addPolicies(PolicyDefinition pd) { + policies.add(pd.getName()); + // add first partition +// for (StreamPartition par : pd.getPartitionSpec()) { +// partitions.add(par); +// } + } + + public double getLoad() { + return load; + } + + public void setLoad(double load) { + this.load = load; + } + + public List<StreamGroup> getPartitions() { + return partitions; + } + + public List<StreamWorkSlotQueue> getReferQueues() { + return referQueues; + } + + public int getQueueSize() { + return referQueues.size(); + } + + public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) { + this.referQueues.add(queue); + this.partitions.add(streamPartition); + } + + public void removeQueue(StreamWorkSlotQueue queue) { + this.referQueues.remove(queue); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java new file mode 100644 index 0000000..86238d1 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.model; + + +/** + * @since Mar 28, 2016 + * + */ +public class GroupBoltUsage { + + private String boltId; + private double load; + + public GroupBoltUsage(String boltId) { + this.boltId = boltId; + } + +// private final Set<String> streams = new HashSet<String>(); +// private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>(); + +// private final Map<String, List<StreamPartition>> groupByMeta; + + public double getLoad() { + return load; + } + + public void setLoad(double load) { + this.load = load; + } + +// public Set<String> getStreams() { +// return streams; +// } +// +// +// public Map<String, StreamFilter> getFilters() { +// return filters; +// } + +// public Map<String, List<StreamPartition>> getGroupByMeta() { +// return groupByMeta; +// } + + public String getBoltId() { + return boltId; + } + + public void setBoltId(String boltId) { + this.boltId = boltId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java new file mode 100644 index 0000000..6eb6195 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.model; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; + +/** + * @since Mar 27, 2016 + * + */ +public class TopologyUsage { + // topo info + private String topoName; + private final Set<String> datasources = new HashSet<String>(); + // usage information + private final Set<String> policies = new HashSet<String>(); + private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>(); + private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>(); + private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>(); + + private double load; + + /** + * This is to be the existing/previous meta-data. <br/> + * Only one group meta-data for all of the group bolts in this topology. + */ + + public TopologyUsage() { + } + + public TopologyUsage(String name) { + this.topoName = name; + } + + public String getTopoName() { + return topoName; + } + + public void setTopoName(String topoId) { + this.topoName = topoId; + } + + public Set<String> getDataSources() { + return datasources; + } + + public Set<String> getPolicies() { + return policies; + } + + public Map<String, AlertBoltUsage> getAlertUsages() { + return alertUsages; + } + + public AlertBoltUsage getAlertBoltUsage(String boltId) { + return alertUsages.get(boltId); + } + + public Map<String, GroupBoltUsage> getGroupUsages() { + return groupUsages; + } + + public List<MonitoredStream> getMonitoredStream() { + return monitoredStream; + } + + public void addMonitoredStream(MonitoredStream par) { + if (!this.monitoredStream.contains(par)) { + this.monitoredStream.add(par); + } + } + + public double getLoad() { + return load; + } + + public void setLoad(double load) { + this.load = load; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java new file mode 100644 index 0000000..84a4061 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.provider; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.StreamGroup; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.coordinator.IScheduleContext; +import org.apache.eagle.alert.coordinator.model.TopologyUsage; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; + +/** + * @since Mar 28, 2016 + * + */ +public class InMemScheduleConext implements IScheduleContext { + + private Map<String, Topology> topologies = new HashMap<String, Topology>(); + private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); + private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>(); + private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>(); + private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>(); + private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>(); + private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(); + private Map<String, Publishment> publishments = new HashMap<String, Publishment>(); + + public InMemScheduleConext() { + } + + public InMemScheduleConext(IScheduleContext context) { + this.topologies = new HashMap<String, Topology>(context.getTopologies()); + this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages()); + this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies()); + this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata()); + this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments()); + this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas()); + this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams()); + this.publishments = new HashMap<String, Publishment>(context.getPublishments()); + } + + public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments, + Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2, + Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions, + Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) { + this.topologies = topologies2; + this.policyAssignments = assignments; + this.datasources = kafkaSources; + this.policies = policies2; + this.publishments = publishments2; + this.schemas = streamDefinitions; + this.monitoredStreams = monitoredStreamMap; + this.usages = usages2; + } + + public Map<String, Topology> getTopologies() { + return topologies; + } + + public void addTopology(Topology topo) { + topologies.put(topo.getName(), topo); + } + + public Map<String, TopologyUsage> getTopologyUsages() { + return usages; + } + + public void addTopologyUsages(TopologyUsage usage) { + usages.put(usage.getTopoName(), usage); + } + + public Map<String, PolicyDefinition> getPolicies() { + return policies; + } + + public void addPoilcy(PolicyDefinition pd) { + this.policies.put(pd.getName(), pd); + } + + public Map<String, Kafka2TupleMetadata> getDatasources() { + return datasources; + } + + public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) { + this.datasources = datasources; + } + + public void addDataSource(Kafka2TupleMetadata dataSource) { + this.datasources.put(dataSource.getName(), dataSource); + } + + @Override + public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() { + return datasources; + } + + public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) { + this.policyAssignments = policyAssignments; + } + + public Map<String, PolicyAssignment> getPolicyAssignments() { + return this.policyAssignments; + } + + @Override + public Map<String, StreamDefinition> getStreamSchemas() { + return schemas; + } + + public void addSchema(StreamDefinition schema) { + this.schemas.put(schema.getStreamId(), schema); + } + + public void setStreamSchemas(Map<String, StreamDefinition> schemas) { + this.schemas = schemas; + } + + @Override + public Map<StreamGroup, MonitoredStream> getMonitoredStreams() { + return monitoredStreams; + } + + @Override + public Map<String, Publishment> getPublishments() { + return publishments; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java new file mode 100644 index 0000000..d4d6c0c --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.provider; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordination.model.WorkSlot; +import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; +import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; +import org.apache.eagle.alert.coordination.model.internal.StreamGroup; +import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; +import org.apache.eagle.alert.coordination.model.internal.Topology; +import org.apache.eagle.alert.coordinator.IScheduleContext; +import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; +import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; +import org.apache.eagle.alert.coordinator.model.TopologyUsage; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.engine.coordinator.Publishment; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.service.IMetadataServiceClient; +import org.apache.eagle.alert.service.MetadataServiceClientImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +/** + * FIXME: this class focus on correctness, not the efficiency now. There might + * be problem when metadata size grows too big. + * + * @since May 3, 2016 + * + */ +public class ScheduleContextBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class); + private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname + + private IMetadataServiceClient client; + + private Map<String, Topology> topologies; + private Map<String, PolicyAssignment> assignments; + private Map<String, Kafka2TupleMetadata> kafkaSources; + private Map<String, PolicyDefinition> policies; + private Map<String, Publishment> publishments; + private Map<String, StreamDefinition> streamDefinitions; + private Map<StreamGroup, MonitoredStream> monitoredStreamMap; + private Map<String, TopologyUsage> usages; + + public ScheduleContextBuilder(Config config) { + client = new MetadataServiceClientImpl(config); + } + + public ScheduleContextBuilder(IMetadataServiceClient client) { + this.client = client; + } + + /** + * Built a shcedule context for metadata client service. + * + * @return + */ + public IScheduleContext buildContext() { + topologies = listToMap(client.listTopologies()); + kafkaSources = listToMap(client.listDataSources()); + policies = listToMap(client.listPolicies()); + publishments = listToMap(client.listPublishment()); + streamDefinitions = listToMap(client.listStreams()); + + // TODO: See ScheduleState comments on how to improve the storage + ScheduleState state = client.getVersionedSpec(); + assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : cleanupDeprecatedAssignments(state.getAssignments())); + + monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams())); + + // build based on existing data + usages = buildTopologyUsage(); + + // copy to shedule context now + return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments, + streamDefinitions, monitoredStreamMap, usages); + } + + /** + * 1. + * <pre> + * Check for deprecated policy stream group with its assigned monitored stream groups. + * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments + * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed. + * Log when every time a remove happens. + * </pre> + * 2. + * <pre> + * if monitored stream's queue's is on non-existing topology, remove it. + * </pre> + * @param monitoredStreams + * @return + */ + private List<MonitoredStream> cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) { + List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams); + + // clear deprecated streams + clearMonitoredStreams(monitoredStreams); + + // build queueId-> streamGroup + Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>(); + for (MonitoredStream ms : result) { + for (StreamWorkSlotQueue q : ms.getQueues()) { + queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup()); + } + } + + // decide the assignment delete set + Set<StreamGroup> usedGroups = new HashSet<StreamGroup>(); + Set<String> toRemove = new HashSet<String>(); + // check if queue is still referenced by policy assignments + for (PolicyAssignment assignment : assignments.values()) { + PolicyDefinition def = policies.get(assignment.getPolicyName()); + StreamGroup group = queue2StreamGroup.get(assignment.getQueueId()); + if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) { + LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, " + + "this indicates a policy stream partition spec change, the assignment would be removed! ", + assignment, def.getPartitionSpec(), group == null ? "'not found'" :group.getStreamPartitions()); + toRemove.add(assignment.getPolicyName()); + } else { + usedGroups.add(group); + } + } + + // remove useless + assignments.keySet().removeAll(toRemove); + // remove non-referenced monitored streams + result.removeIf((t) -> { + boolean used = usedGroups.contains(t.getStreamGroup()); + if (!used) { + LOG.warn("monitor stream with stream group {} is not referenced, " + + "this monitored stream and its worker queu will be removed", t.getStreamGroup()); + } + return !used; + }); + + return result; + } + + private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) { + Iterator<MonitoredStream> it = monitoredStreams.iterator(); + while (it.hasNext()) { + MonitoredStream ms = it.next(); + Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator(); + // clean queue that underly topology is changed(removed/down) + while (queueIt.hasNext()) { + StreamWorkSlotQueue queue = queueIt.next(); + boolean deprecated = false; + for (WorkSlot ws : queue.getWorkingSlots()) { + // check if topology available or bolt available + if (!topologies.containsKey(ws.topologyName) + || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) { + deprecated = true; + break; + } + } + if (deprecated) { + queueIt.remove(); + } + } + + if (ms.getQueues().isEmpty()) { + it.remove(); + } + } + } + + private List<PolicyAssignment> cleanupDeprecatedAssignments(List<PolicyAssignment> list) { + List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list); + Iterator<PolicyAssignment> paIt = result.iterator(); + while (paIt.hasNext()) { + PolicyAssignment assignment = paIt.next(); + if (!policies.containsKey(assignment.getPolicyName())) { + LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment); + paIt.remove(); + } + } + return result; + } + + private <T, K> Map<K, T> listToMap(List<T> collections) { + Map<K, T> maps = new HashMap<K, T>(collections.size()); + for (T t : collections) { + maps.put(getKey(t), t); + } + return maps; + } + + /* + * One drawback, once we add class, this code need to be changed! + */ + @SuppressWarnings("unchecked") + private <T, K> K getKey(T t) { + if (t instanceof Topology) { + return (K) ((Topology) t).getName(); + } else if (t instanceof PolicyAssignment) { + return (K) ((PolicyAssignment) t).getPolicyName(); + } else if (t instanceof Kafka2TupleMetadata) { + return (K) ((Kafka2TupleMetadata) t).getName(); + } else if (t instanceof PolicyDefinition) { + return (K) ((PolicyDefinition) t).getName(); + } else if (t instanceof Publishment) { + return (K) ((Publishment) t).getName(); + } else if (t instanceof StreamDefinition) { + return (K) ((StreamDefinition) t).getStreamId(); + } else if (t instanceof MonitoredStream) { + return (K) ((MonitoredStream) t).getStreamGroup(); + } + throw new RuntimeException("unexpected key class " + t.getClass()); + } + + private Map<String, TopologyUsage> buildTopologyUsage() { + Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); + + // pre-build data structure for help + Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>(); + Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>(); + // simply assume no bolt with the same id + Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>(); + // simply assume no bolt with the same id + Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>(); + // simply assume no bolt with the same id + Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>(); + Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); + + preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap); + + for (Topology t : topologies.values()) { + TopologyUsage u = new TopologyUsage(t.getName()); + // add group/bolt usages + for (String grpBolt : t.getGroupNodeIds()) { + GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt); + u.getGroupUsages().put(grpBolt, grpUsage); + } + for (String alertBolt : t.getAlertBoltIds()) { + String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt); + + AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt); + u.getAlertUsages().put(alertBolt, alertUsage); + // complete usage + addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap); + } + + // policy -- policy assignment + if (topo2Policies.containsKey(u.getTopoName())) { + u.getPolicies().addAll(topo2Policies.get(u.getTopoName())); + } + + // data source + buildTopologyDataSource(u); + + // topology usage monitored stream -- from monitored steams' queue slot item info + if (topo2MonitorStream.containsKey(u.getTopoName())) { + u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName())); + } + + usages.put(u.getTopoName(), u); + } + + return usages; + } + + private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies, + Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt, + AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) { + // + if (bolt2Policies.containsKey(uniqueAlertBolt)) { + alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt)); + } + // + if (bolt2Partition.containsKey(uniqueAlertBolt)) { + alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt)); + } + // + if (bolt2QueueIds.containsKey(uniqueAlertBolt)) { + for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) { + if (queueMap.containsKey(qId)) { + alertUsage.getReferQueues().add(queueMap.get(qId)); + } else { + LOG.error(" queue id {} not found in queue map !", qId); + } + } + } + } + + private void buildTopologyDataSource(TopologyUsage u) { + for (String policyName : u.getPolicies()) { + PolicyDefinition def = policies.get(policyName); + if (def != null) { + u.getDataSources().addAll(findDatasource(def)); + } else { + LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName()); + } + } + } + + private List<String> findDatasource(PolicyDefinition def) { + List<String> result = new ArrayList<String>(); + List<String> inputStreams = def.getInputStreams(); + for (String is : inputStreams) { + StreamDefinition ss = this.streamDefinitions.get(is); + if (ss == null) { + LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is); + } else { + result.add(ss.getDataSource()); + } + } + return result; + } + + private void preBuildQueue2TopoMap( + Map<String, Set<MonitoredStream>> topo2MonitorStream, + Map<String, Set<String>> topo2Policies, + Map<String, Set<String>> bolt2Policies, + Map<String, Set<StreamGroup>> bolt2Partition, + Map<String, Set<String>> bolt2QueueIds, + Map<String, StreamWorkSlotQueue> queueMap) { + // pre-build structure + // why don't reuse the queue.getPolicies + Map<String, Set<String>> queue2Policies= new HashMap<String, Set<String>>(); + for (PolicyAssignment pa : assignments.values()) { + if (!queue2Policies.containsKey(pa.getQueueId())) { + queue2Policies.put(pa.getQueueId(), new HashSet<String>()); + } + queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName()); + } + + for (MonitoredStream stream : monitoredStreamMap.values()) { + for (StreamWorkSlotQueue q : stream.getQueues()) { + queueMap.put(q.getQueueId(), q); + Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>(); + + for (WorkSlot slot : q.getWorkingSlots()) { + // topo2monitoredstream + if (!topo2MonitorStream.containsKey(slot.getTopologyName())) { + topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>()); + } + topo2MonitorStream.get(slot.getTopologyName()).add(stream); + + // topo2policy + if (!topo2Policies.containsKey(slot.getTopologyName())) { + topo2Policies.put(slot.getTopologyName(), new HashSet<String>()); + } + topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ); + + // bolt2Policy + if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) { + bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>()); + } + bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ); + + // bolt2Queue + if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) { + bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>()); + } + bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId()); + + // bolt2Partition + if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) { + bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>()); + } + bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup()); + } + } + } + } + + private String getUniqueBoltId(WorkSlot slot) { + return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java new file mode 100644 index 0000000..6b8495e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.resource; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import org.apache.eagle.alert.coordination.model.ScheduleState; +import org.apache.eagle.alert.coordinator.Coordinator; +import org.apache.eagle.alert.coordinator.ScheduleOption; +import org.apache.eagle.alert.utils.JsonUtils; + +/** + * This is to provide API access even we don't have ZK as intermediate access. + * FIXME : more elogant status code + * + * @since Mar 24, 2016 <br/> + */ +@Path("/coordinator") +@Produces({ "application/json" }) +public class CoordinatorResource { + + // sprint config here? + private Coordinator alertCoordinator = new Coordinator(); + + @GET + @Path("/assignments") + public String getAssignments() throws Exception { + ScheduleState state = alertCoordinator.getState(); + return JsonUtils.writeValueAsString(state); + } + + @POST + @Path("/build") + public String build() throws Exception { + ScheduleOption option = new ScheduleOption(); + ScheduleState state = alertCoordinator.schedule(option); + return JsonUtils.writeValueAsString(state); + } + + /** + * Manually update the topology usages, for administration + * + * @return + */ + @POST + @Path("/refreshUsages") + public String refreshUsages() { + // TODO + return ""; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java new file mode 100644 index 0000000..15333da --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.coordinator.trigger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; +import org.apache.eagle.alert.service.IMetadataServiceClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Poll policy change and notify listeners + */ +public class DynamicPolicyLoader implements Runnable{ + private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class); + + private IMetadataServiceClient client; + // initial cachedPolicies should be empty + private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); + private List<PolicyChangeListener> listeners = new ArrayList<>(); + + public DynamicPolicyLoader(IMetadataServiceClient client){ + this.client = client; + } + + public synchronized void addPolicyChangeListener(PolicyChangeListener listener){ + listeners.add(listener); + } + + /** + * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected + * to be addedPolicies + */ + @SuppressWarnings("unchecked") + @Override + public void run() { + // we should catch every exception to avoid zombile thread + try { + List<PolicyDefinition> current = client.listPolicies(); + Map<String, PolicyDefinition> currPolicies = new HashMap<>(); + current.forEach(pe -> currPolicies.put(pe.getName(), pe)); + + Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet()); + Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet()); + Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet()); + + List<String> reallyModifiedPolicies = new ArrayList<>(); + for (String updatedPolicy : potentiallyModifiedPolicies) { + if (!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) { + reallyModifiedPolicies.add(updatedPolicy); + } + } + + boolean policyChanged = false; + if (addedPolicies.size() != 0 || + removedPolicies.size() != 0 || + reallyModifiedPolicies.size() != 0) { + policyChanged = true; + } + + if (!policyChanged) { + LOG.info("policy is not changed since last run"); + return; + } + synchronized (this) { + for (PolicyChangeListener listener : listeners) { + listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies); + } + } + + // reset cached policies + cachedPolicies = currPolicies; + } catch (Throwable t) { + LOG.error("error loading policy, but continue to run", t); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java new file mode 100644 index 0000000..cd5265d --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limit + */ +package org.apache.eagle.alert.coordinator.trigger; + +import java.util.Collection; +import java.util.List; + +import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; + +public interface PolicyChangeListener { + void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf new file mode 100644 index 0000000..23ee161 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +{ + "coordinator" : { + "policiesPerBolt" : 5, + "boltParallelism" : 5, + "policyDefaultParallelism" : 5, + "boltLoadUpbound": 0.8, + "topologyLoadUpbound" : 0.8, + "numOfAlertBoltsPerTopology" : 5, + "zkConfig" : { + "zkQuorum" : "localhost:2181", + "zkRoot" : "/alert", + "zkSessionTimeoutMs" : 10000, + "connectionTimeoutMs" : 10000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 3000 + }, + "metadataService" : { + "host" : "localhost", + "port" : 8080, + "context" : "/api" + }, + "metadataDynamicCheck" : { + "initDelayMillis" : 1000, + "delayMillis" : 30000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties new file mode 100644 index 0000000..d4bc126 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout + +# standard output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml new file mode 100644 index 0000000..1aa925e --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<web-app xmlns="http://java.sun.com/xml/ns/javaee" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://java.sun.com/xml/ns/javaee + http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" + version="3.0"> + <welcome-file-list> + <welcome-file>index.html</welcome-file> + </welcome-file-list> + <servlet> + <servlet-name>Jersey Web Application</servlet-name> + <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class> + <init-param> + <param-name>com.sun.jersey.config.property.packages</param-name> + <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value> + </init-param> + <init-param> + <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name> + <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value> + </init-param> + <init-param> + <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name> + <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value> + </init-param> + <load-on-startup>1</load-on-startup> + </servlet> + <!-- Servlet for swagger initialization only, no URL mapping. --> + <servlet> + <servlet-name>swaggerConfig</servlet-name> + <servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class> + <init-param> + <param-name>api.version</param-name> + <param-value>1.0.0</param-value> + </init-param> + <init-param> + <param-name>swagger.api.basepath</param-name> + <param-value>/api</param-value> + </init-param> + <load-on-startup>2</load-on-startup> + </servlet> + + <servlet-mapping> + <servlet-name>Jersey Web Application</servlet-name> + <url-pattern>/api/*</url-pattern> + </servlet-mapping> + <filter> + <filter-name>CorsFilter</filter-name> + <!-- this should be replaced by tomcat ones, see also metadata resource --> + <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class> + <init-param> + <param-name>cors.allowed.origins</param-name> + <param-value>*</param-value> + </init-param> + <init-param> + <param-name>cors.allowed.headers</param-name> + <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value> + </init-param> + <init-param> + <param-name>cors.allowed.methods</param-name> + <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value> + </init-param> + <init-param> + <param-name>cors.support.credentials</param-name> + <param-value>true</param-value> + </init-param> + </filter> + <filter-mapping> + <filter-name>CorsFilter</filter-name> + <url-pattern>/*</url-pattern> + </filter-mapping> +</web-app> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html new file mode 100644 index 0000000..1c4ea76 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html @@ -0,0 +1,18 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +UMP Coordinator service!