http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java deleted file mode 100644 index 84a4061..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.provider; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; - -/** - * @since Mar 28, 2016 - * - */ -public class InMemScheduleConext implements IScheduleContext { - - private Map<String, Topology> topologies = new HashMap<String, Topology>(); - private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); - private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>(); - private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>(); - private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>(); - private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>(); - private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(); - private Map<String, Publishment> publishments = new HashMap<String, Publishment>(); - - public InMemScheduleConext() { - } - - public InMemScheduleConext(IScheduleContext context) { - this.topologies = new HashMap<String, Topology>(context.getTopologies()); - this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages()); - this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies()); - this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata()); - this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments()); - this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas()); - this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams()); - this.publishments = new HashMap<String, Publishment>(context.getPublishments()); - } - - public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments, - Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2, - Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions, - Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) { - this.topologies = topologies2; - this.policyAssignments = assignments; - this.datasources = kafkaSources; - this.policies = policies2; - this.publishments = publishments2; - this.schemas = streamDefinitions; - this.monitoredStreams = monitoredStreamMap; - this.usages = usages2; - } - - public Map<String, Topology> getTopologies() { - return topologies; - } - - public void addTopology(Topology topo) { - topologies.put(topo.getName(), topo); - } - - public Map<String, TopologyUsage> getTopologyUsages() { - return usages; - } - - public void addTopologyUsages(TopologyUsage usage) { - usages.put(usage.getTopoName(), usage); - } - - public Map<String, PolicyDefinition> getPolicies() { - return policies; - } - - public void addPoilcy(PolicyDefinition pd) { - this.policies.put(pd.getName(), pd); - } - - public Map<String, Kafka2TupleMetadata> getDatasources() { - return datasources; - } - - public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) { - this.datasources = datasources; - } - - public void addDataSource(Kafka2TupleMetadata dataSource) { - this.datasources.put(dataSource.getName(), dataSource); - } - - @Override - public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() { - return datasources; - } - - public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) { - this.policyAssignments = policyAssignments; - } - - public Map<String, PolicyAssignment> getPolicyAssignments() { - return this.policyAssignments; - } - - @Override - public Map<String, StreamDefinition> getStreamSchemas() { - return schemas; - } - - public void addSchema(StreamDefinition schema) { - this.schemas.put(schema.getStreamId(), schema); - } - - public void setStreamSchemas(Map<String, StreamDefinition> schemas) { - this.schemas = schemas; - } - - @Override - public Map<StreamGroup, MonitoredStream> getMonitoredStreams() { - return monitoredStreams; - } - - @Override - public Map<String, Publishment> getPublishments() { - return publishments; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java deleted file mode 100644 index d4d6c0c..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.provider; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.GroupBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -/** - * FIXME: this class focus on correctness, not the efficiency now. There might - * be problem when metadata size grows too big. - * - * @since May 3, 2016 - * - */ -public class ScheduleContextBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class); - private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname - - private IMetadataServiceClient client; - - private Map<String, Topology> topologies; - private Map<String, PolicyAssignment> assignments; - private Map<String, Kafka2TupleMetadata> kafkaSources; - private Map<String, PolicyDefinition> policies; - private Map<String, Publishment> publishments; - private Map<String, StreamDefinition> streamDefinitions; - private Map<StreamGroup, MonitoredStream> monitoredStreamMap; - private Map<String, TopologyUsage> usages; - - public ScheduleContextBuilder(Config config) { - client = new MetadataServiceClientImpl(config); - } - - public ScheduleContextBuilder(IMetadataServiceClient client) { - this.client = client; - } - - /** - * Built a shcedule context for metadata client service. - * - * @return - */ - public IScheduleContext buildContext() { - topologies = listToMap(client.listTopologies()); - kafkaSources = listToMap(client.listDataSources()); - policies = listToMap(client.listPolicies()); - publishments = listToMap(client.listPublishment()); - streamDefinitions = listToMap(client.listStreams()); - - // TODO: See ScheduleState comments on how to improve the storage - ScheduleState state = client.getVersionedSpec(); - assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : cleanupDeprecatedAssignments(state.getAssignments())); - - monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams())); - - // build based on existing data - usages = buildTopologyUsage(); - - // copy to shedule context now - return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments, - streamDefinitions, monitoredStreamMap, usages); - } - - /** - * 1. - * <pre> - * Check for deprecated policy stream group with its assigned monitored stream groups. - * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments - * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed. - * Log when every time a remove happens. - * </pre> - * 2. - * <pre> - * if monitored stream's queue's is on non-existing topology, remove it. - * </pre> - * @param monitoredStreams - * @return - */ - private List<MonitoredStream> cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) { - List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams); - - // clear deprecated streams - clearMonitoredStreams(monitoredStreams); - - // build queueId-> streamGroup - Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>(); - for (MonitoredStream ms : result) { - for (StreamWorkSlotQueue q : ms.getQueues()) { - queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup()); - } - } - - // decide the assignment delete set - Set<StreamGroup> usedGroups = new HashSet<StreamGroup>(); - Set<String> toRemove = new HashSet<String>(); - // check if queue is still referenced by policy assignments - for (PolicyAssignment assignment : assignments.values()) { - PolicyDefinition def = policies.get(assignment.getPolicyName()); - StreamGroup group = queue2StreamGroup.get(assignment.getQueueId()); - if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) { - LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, " - + "this indicates a policy stream partition spec change, the assignment would be removed! ", - assignment, def.getPartitionSpec(), group == null ? "'not found'" :group.getStreamPartitions()); - toRemove.add(assignment.getPolicyName()); - } else { - usedGroups.add(group); - } - } - - // remove useless - assignments.keySet().removeAll(toRemove); - // remove non-referenced monitored streams - result.removeIf((t) -> { - boolean used = usedGroups.contains(t.getStreamGroup()); - if (!used) { - LOG.warn("monitor stream with stream group {} is not referenced, " - + "this monitored stream and its worker queu will be removed", t.getStreamGroup()); - } - return !used; - }); - - return result; - } - - private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) { - Iterator<MonitoredStream> it = monitoredStreams.iterator(); - while (it.hasNext()) { - MonitoredStream ms = it.next(); - Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator(); - // clean queue that underly topology is changed(removed/down) - while (queueIt.hasNext()) { - StreamWorkSlotQueue queue = queueIt.next(); - boolean deprecated = false; - for (WorkSlot ws : queue.getWorkingSlots()) { - // check if topology available or bolt available - if (!topologies.containsKey(ws.topologyName) - || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) { - deprecated = true; - break; - } - } - if (deprecated) { - queueIt.remove(); - } - } - - if (ms.getQueues().isEmpty()) { - it.remove(); - } - } - } - - private List<PolicyAssignment> cleanupDeprecatedAssignments(List<PolicyAssignment> list) { - List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list); - Iterator<PolicyAssignment> paIt = result.iterator(); - while (paIt.hasNext()) { - PolicyAssignment assignment = paIt.next(); - if (!policies.containsKey(assignment.getPolicyName())) { - LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment); - paIt.remove(); - } - } - return result; - } - - private <T, K> Map<K, T> listToMap(List<T> collections) { - Map<K, T> maps = new HashMap<K, T>(collections.size()); - for (T t : collections) { - maps.put(getKey(t), t); - } - return maps; - } - - /* - * One drawback, once we add class, this code need to be changed! - */ - @SuppressWarnings("unchecked") - private <T, K> K getKey(T t) { - if (t instanceof Topology) { - return (K) ((Topology) t).getName(); - } else if (t instanceof PolicyAssignment) { - return (K) ((PolicyAssignment) t).getPolicyName(); - } else if (t instanceof Kafka2TupleMetadata) { - return (K) ((Kafka2TupleMetadata) t).getName(); - } else if (t instanceof PolicyDefinition) { - return (K) ((PolicyDefinition) t).getName(); - } else if (t instanceof Publishment) { - return (K) ((Publishment) t).getName(); - } else if (t instanceof StreamDefinition) { - return (K) ((StreamDefinition) t).getStreamId(); - } else if (t instanceof MonitoredStream) { - return (K) ((MonitoredStream) t).getStreamGroup(); - } - throw new RuntimeException("unexpected key class " + t.getClass()); - } - - private Map<String, TopologyUsage> buildTopologyUsage() { - Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>(); - - // pre-build data structure for help - Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>(); - Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>(); - // simply assume no bolt with the same id - Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>(); - // simply assume no bolt with the same id - Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>(); - // simply assume no bolt with the same id - Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>(); - Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>(); - - preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap); - - for (Topology t : topologies.values()) { - TopologyUsage u = new TopologyUsage(t.getName()); - // add group/bolt usages - for (String grpBolt : t.getGroupNodeIds()) { - GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt); - u.getGroupUsages().put(grpBolt, grpUsage); - } - for (String alertBolt : t.getAlertBoltIds()) { - String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt); - - AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt); - u.getAlertUsages().put(alertBolt, alertUsage); - // complete usage - addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap); - } - - // policy -- policy assignment - if (topo2Policies.containsKey(u.getTopoName())) { - u.getPolicies().addAll(topo2Policies.get(u.getTopoName())); - } - - // data source - buildTopologyDataSource(u); - - // topology usage monitored stream -- from monitored steams' queue slot item info - if (topo2MonitorStream.containsKey(u.getTopoName())) { - u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName())); - } - - usages.put(u.getTopoName(), u); - } - - return usages; - } - - private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies, - Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt, - AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) { - // - if (bolt2Policies.containsKey(uniqueAlertBolt)) { - alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt)); - } - // - if (bolt2Partition.containsKey(uniqueAlertBolt)) { - alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt)); - } - // - if (bolt2QueueIds.containsKey(uniqueAlertBolt)) { - for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) { - if (queueMap.containsKey(qId)) { - alertUsage.getReferQueues().add(queueMap.get(qId)); - } else { - LOG.error(" queue id {} not found in queue map !", qId); - } - } - } - } - - private void buildTopologyDataSource(TopologyUsage u) { - for (String policyName : u.getPolicies()) { - PolicyDefinition def = policies.get(policyName); - if (def != null) { - u.getDataSources().addAll(findDatasource(def)); - } else { - LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName()); - } - } - } - - private List<String> findDatasource(PolicyDefinition def) { - List<String> result = new ArrayList<String>(); - List<String> inputStreams = def.getInputStreams(); - for (String is : inputStreams) { - StreamDefinition ss = this.streamDefinitions.get(is); - if (ss == null) { - LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is); - } else { - result.add(ss.getDataSource()); - } - } - return result; - } - - private void preBuildQueue2TopoMap( - Map<String, Set<MonitoredStream>> topo2MonitorStream, - Map<String, Set<String>> topo2Policies, - Map<String, Set<String>> bolt2Policies, - Map<String, Set<StreamGroup>> bolt2Partition, - Map<String, Set<String>> bolt2QueueIds, - Map<String, StreamWorkSlotQueue> queueMap) { - // pre-build structure - // why don't reuse the queue.getPolicies - Map<String, Set<String>> queue2Policies= new HashMap<String, Set<String>>(); - for (PolicyAssignment pa : assignments.values()) { - if (!queue2Policies.containsKey(pa.getQueueId())) { - queue2Policies.put(pa.getQueueId(), new HashSet<String>()); - } - queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName()); - } - - for (MonitoredStream stream : monitoredStreamMap.values()) { - for (StreamWorkSlotQueue q : stream.getQueues()) { - queueMap.put(q.getQueueId(), q); - Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>(); - - for (WorkSlot slot : q.getWorkingSlots()) { - // topo2monitoredstream - if (!topo2MonitorStream.containsKey(slot.getTopologyName())) { - topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>()); - } - topo2MonitorStream.get(slot.getTopologyName()).add(stream); - - // topo2policy - if (!topo2Policies.containsKey(slot.getTopologyName())) { - topo2Policies.put(slot.getTopologyName(), new HashSet<String>()); - } - topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ); - - // bolt2Policy - if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) { - bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>()); - } - bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ); - - // bolt2Queue - if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) { - bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>()); - } - bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId()); - - // bolt2Partition - if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) { - bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>()); - } - bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup()); - } - } - } - } - - private String getUniqueBoltId(WorkSlot slot) { - return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java deleted file mode 100644 index 6b8495e..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.resource; - -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; - -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.utils.JsonUtils; - -/** - * This is to provide API access even we don't have ZK as intermediate access. - * FIXME : more elogant status code - * - * @since Mar 24, 2016 <br/> - */ -@Path("/coordinator") -@Produces({ "application/json" }) -public class CoordinatorResource { - - // sprint config here? - private Coordinator alertCoordinator = new Coordinator(); - - @GET - @Path("/assignments") - public String getAssignments() throws Exception { - ScheduleState state = alertCoordinator.getState(); - return JsonUtils.writeValueAsString(state); - } - - @POST - @Path("/build") - public String build() throws Exception { - ScheduleOption option = new ScheduleOption(); - ScheduleState state = alertCoordinator.schedule(option); - return JsonUtils.writeValueAsString(state); - } - - /** - * Manually update the topology usages, for administration - * - * @return - */ - @POST - @Path("/refreshUsages") - public String refreshUsages() { - // TODO - return ""; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java deleted file mode 100644 index 15333da..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.coordinator.trigger; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Poll policy change and notify listeners - */ -public class DynamicPolicyLoader implements Runnable{ - private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class); - - private IMetadataServiceClient client; - // initial cachedPolicies should be empty - private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); - private List<PolicyChangeListener> listeners = new ArrayList<>(); - - public DynamicPolicyLoader(IMetadataServiceClient client){ - this.client = client; - } - - public synchronized void addPolicyChangeListener(PolicyChangeListener listener){ - listeners.add(listener); - } - - /** - * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected - * to be addedPolicies - */ - @SuppressWarnings("unchecked") - @Override - public void run() { - // we should catch every exception to avoid zombile thread - try { - List<PolicyDefinition> current = client.listPolicies(); - Map<String, PolicyDefinition> currPolicies = new HashMap<>(); - current.forEach(pe -> currPolicies.put(pe.getName(), pe)); - - Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet()); - Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet()); - Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet()); - - List<String> reallyModifiedPolicies = new ArrayList<>(); - for (String updatedPolicy : potentiallyModifiedPolicies) { - if (!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) { - reallyModifiedPolicies.add(updatedPolicy); - } - } - - boolean policyChanged = false; - if (addedPolicies.size() != 0 || - removedPolicies.size() != 0 || - reallyModifiedPolicies.size() != 0) { - policyChanged = true; - } - - if (!policyChanged) { - LOG.info("policy is not changed since last run"); - return; - } - synchronized (this) { - for (PolicyChangeListener listener : listeners) { - listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies); - } - } - - // reset cached policies - cachedPolicies = currPolicies; - } catch (Throwable t) { - LOG.error("error loading policy, but continue to run", t); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java deleted file mode 100644 index cd5265d..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limit - */ -package org.apache.eagle.alert.coordinator.trigger; - -import java.util.Collection; -import java.util.List; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; - -public interface PolicyChangeListener { - void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf deleted file mode 100644 index 23ee161..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf +++ /dev/null @@ -1,41 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -{ - "coordinator" : { - "policiesPerBolt" : 5, - "boltParallelism" : 5, - "policyDefaultParallelism" : 5, - "boltLoadUpbound": 0.8, - "topologyLoadUpbound" : 0.8, - "numOfAlertBoltsPerTopology" : 5, - "zkConfig" : { - "zkQuorum" : "localhost:2181", - "zkRoot" : "/alert", - "zkSessionTimeoutMs" : 10000, - "connectionTimeoutMs" : 10000, - "zkRetryTimes" : 3, - "zkRetryInterval" : 3000 - }, - "metadataService" : { - "host" : "localhost", - "port" : 8080, - "context" : "/api" - }, - "metadataDynamicCheck" : { - "initDelayMillis" : 1000, - "delayMillis" : 30000 - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties deleted file mode 100644 index d4bc126..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml deleted file mode 100644 index 1aa925e..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml +++ /dev/null @@ -1,87 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<web-app xmlns="http://java.sun.com/xml/ns/javaee" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://java.sun.com/xml/ns/javaee - http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" - version="3.0"> - <welcome-file-list> - <welcome-file>index.html</welcome-file> - </welcome-file-list> - <servlet> - <servlet-name>Jersey Web Application</servlet-name> - <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class> - <init-param> - <param-name>com.sun.jersey.config.property.packages</param-name> - <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value> - </init-param> - <init-param> - <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name> - <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value> - </init-param> - <init-param> - <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name> - <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value> - </init-param> - <load-on-startup>1</load-on-startup> - </servlet> - <!-- Servlet for swagger initialization only, no URL mapping. --> - <servlet> - <servlet-name>swaggerConfig</servlet-name> - <servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class> - <init-param> - <param-name>api.version</param-name> - <param-value>1.0.0</param-value> - </init-param> - <init-param> - <param-name>swagger.api.basepath</param-name> - <param-value>/api</param-value> - </init-param> - <load-on-startup>2</load-on-startup> - </servlet> - - <servlet-mapping> - <servlet-name>Jersey Web Application</servlet-name> - <url-pattern>/api/*</url-pattern> - </servlet-mapping> - <filter> - <filter-name>CorsFilter</filter-name> - <!-- this should be replaced by tomcat ones, see also metadata resource --> - <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class> - <init-param> - <param-name>cors.allowed.origins</param-name> - <param-value>*</param-value> - </init-param> - <init-param> - <param-name>cors.allowed.headers</param-name> - <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value> - </init-param> - <init-param> - <param-name>cors.allowed.methods</param-name> - <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value> - </init-param> - <init-param> - <param-name>cors.support.credentials</param-name> - <param-value>true</param-value> - </init-param> - </filter> - <filter-mapping> - <filter-name>CorsFilter</filter-name> - <url-pattern>/*</url-pattern> - </filter-mapping> -</web-app> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html deleted file mode 100644 index 1c4ea76..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html +++ /dev/null @@ -1,18 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -UMP Coordinator service! http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java deleted file mode 100644 index 78b72b3..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.eagle.alert.config.ConfigBusConsumer; -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ConfigChangeCallback; -import org.apache.eagle.alert.config.ConfigValue; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.coordinator.ScheduleOption; -import org.apache.eagle.alert.service.IMetadataServiceClient; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @since May 5, 2016 - * - */ -public class CoordinatorTest { - - @SuppressWarnings({ "resource", "unused" }) - @Ignore - @Test - public void test() throws Exception { - before(); - Config config = ConfigFactory.load().getConfig("coordinator"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); - IMetadataServiceClient client = new MetadataServiceClientImpl(config); - - Coordinator coordinator = new Coordinator(config, producer, client); - ScheduleOption option = new ScheduleOption(); - ScheduleState state = coordinator.schedule(option); - String v = state.getVersion(); - - AtomicBoolean validated = new AtomicBoolean(false); - ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() { - @Override - public void onNewConfig(ConfigValue value) { - String vId = value.getValue().toString(); - Assert.assertEquals(v, vId); - validated.set(true); - } - }); - - Thread.sleep(1000); - Assert.assertTrue(validated.get()); - } - - @SuppressWarnings({ "resource", "unused" }) - @Test - public void test_01() throws Exception { - before(); - Config config = ConfigFactory.load().getConfig("coordinator"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); - IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService(); - - Coordinator coordinator = new Coordinator(config, producer, client); - ScheduleOption option = new ScheduleOption(); - ScheduleState state = coordinator.schedule(option); - String v = state.getVersion(); - - // TODO : assert version - - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean validated = new AtomicBoolean(false); - ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() { - @Override - public void onNewConfig(ConfigValue value) { - String vId = value.getValue().toString(); - Assert.assertEquals(v, vId); - validated.set(true); - latch.countDown(); - } - }); - - latch.await(3, TimeUnit.SECONDS); - Assert.assertTrue(validated.get()); - } - - @Ignore - @Test - public void test_main() throws Exception { - before(); - - Coordinator.main(null); - } - - @Before - public void before() { - System.setProperty("config.resource", "/test-application.conf"); - ConfigFactory.invalidateCaches(); - ConfigFactory.load().getConfig("coordinator"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java deleted file mode 100644 index 155a9e5..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.alert.coordinator; - -/** - * Since 4/28/16. - */ -public class DynamicPolicyLoaderTest { -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java deleted file mode 100644 index e2ea031..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import org.apache.eagle.alert.config.ConfigBusProducer; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordinator.Coordinator; -import org.apache.eagle.alert.service.MetadataServiceClientImpl; -import org.junit.Ignore; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @since May 9, 2016 - * - */ -public class MetadataServiceClientImplTest { - - @Ignore - @Test - public void addScheduleState() throws Exception { - ConfigFactory.invalidateCaches(); - System.setProperty("config.resource", "/test-application.conf"); - Config config = ConfigFactory.load("test-application.conf").getConfig("coordinator"); - MetadataServiceClientImpl client = new MetadataServiceClientImpl(config); - - ScheduleState ss = new ScheduleState(); - ss.setVersion("spec_version_1463764252582"); - - client.addScheduleState(ss); - - client.close(); - - ss.setVersion("spec_version_1464764252582"); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); - Coordinator.postSchedule(client, ss, producer); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java deleted file mode 100644 index f2e67de..0000000 --- a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.alert.coordinator; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.alert.coordinator.mock.InMemMetadataServiceClient; -import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; -import org.apache.eagle.alert.coordination.model.ScheduleState; -import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.coordination.model.internal.MonitoredStream; -import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment; -import org.apache.eagle.alert.coordination.model.internal.StreamGroup; -import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue; -import org.apache.eagle.alert.coordination.model.internal.Topology; -import org.apache.eagle.alert.coordinator.IScheduleContext; -import org.apache.eagle.alert.coordinator.model.AlertBoltUsage; -import org.apache.eagle.alert.coordinator.model.TopologyUsage; -import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.junit.Assert; -import org.junit.Test; - -/** - * @since May 5, 2016 - * - */ -public class ScheduleContextBuilderTest { - - @Test - public void test() { - InMemMetadataServiceClient client = getSampleMetadataService(); - - ScheduleContextBuilder builder = new ScheduleContextBuilder(client); - - IScheduleContext context = builder.buildContext(); - - // assert topology usage - Map<String, TopologyUsage> usages = context.getTopologyUsages(); - Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size()); - Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1)); - - String alertBolt0 = TOPO1 + "-alert-" + "0"; - String alertBolt1 = TOPO1 + "-alert-" + "1"; - String alertBolt2 = TOPO1 + "-alert-" + "2"; - for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) { - if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1) - || u.getBoltId().equals(alertBolt2)) { - Assert.assertEquals(1, u.getPolicies().size()); - Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1)); - Assert.assertEquals(1, u.getPartitions().size()); - Assert.assertEquals(1, u.getReferQueues().size()); - } - } - } - - @Test - public void test_remove_policy() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(client); - - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - client.listPolicies().remove(0); - context = builder.buildContext(); - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - WorkSlot slot = queue.getWorkingSlots().get(0); - Set<String> topoPolicies = context.getTopologyUsages().get(slot.topologyName).getPolicies(); - Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1)); - Assert.assertEquals(0, topoPolicies.size()); - } - - @Test - public void test_changed_policy() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(client); - PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight(); - - PolicyDefinition pd1 = client.listPolicies().get(0); - // add a new group by column : need to replace the partiton spec, to - // avoid reference same object in - // on jvm (no serialization and deserialization) - StreamPartition par = new StreamPartition(pd1.getPartitionSpec().get(0)); - par.getColumns().add("s1"); - pd1.getPartitionSpec().clear(); - pd1.getPartitionSpec().add(par); - - context = builder.buildContext(); - - // assert the policy assignment is removed - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - // assert the monitored stream is removed as no policy on it now. - Assert.assertEquals(0, context.getMonitoredStreams().size()); - // assert the topology usage doesn't contain policy - WorkSlot slot = queue.getWorkingSlots().get(0); - TopologyUsage topologyUsage = context.getTopologyUsages().get(slot.topologyName); - Set<String> topoPolicies = topologyUsage.getPolicies(); - Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1)); - Assert.assertEquals(0, topoPolicies.size()); - // assert the topology usage doesn't contain the monitored stream - Assert.assertEquals(0, topologyUsage.getMonitoredStream().size()); - // assert the alert bolt usage doesn't have the queue reference - Assert.assertEquals(0, topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size()); - } - - @Test - public void test_renamed_topologies() { - InMemMetadataServiceClient client = getSampleMetadataService(); - ScheduleContextBuilder builder = new ScheduleContextBuilder(client); - - IScheduleContext context = builder.buildContext(); - Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - - Topology t = client.listTopologies().get(0); - t.setName("newName"); - - context = builder.buildContext(); - Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1)); - } - - private static final String TOPO1 = "topo1"; - private static final String V1 = "v1"; - private static final String COL1 = "col1"; - private static final String OUT_STREAM1 = "out-stream1"; - private static final String TEST_POLICY_1 = "test-policy-1"; - private static final String TEST_STREAM_DEF_1 = "testStreamDef"; - private static final String TEST_DATASOURCE_1 = "test-datasource-1"; - private static StreamPartition par; - private static String queueId; - private static StreamGroup streamGroup; - - public static InMemMetadataServiceClient getSampleMetadataService() { - InMemMetadataServiceClient client = new InMemMetadataServiceClient(); - client.listTopologies().add(createSampleTopology()); - client.listDataSources().add(createKafka2TupleMetadata()); - // client.listSpoutMetadata().add(createS) - client.listPolicies().add(createPolicy()); - client.listPublishment().add(createPublishment()); - client.listStreams().add(createStreamDefinition()); - client.addScheduleState(createScheduleState()); - return client; - } - - private static ScheduleState createScheduleState() { - ScheduleState ss = new ScheduleState(); - ss.setVersion(V1); - - ss.getMonitoredStreams().add(createMonitoredStream()); - ss.getAssignments().add(createAssignment()); - - return ss; - } - - private static MonitoredStream createMonitoredStream() { - MonitoredStream ms = new MonitoredStream(streamGroup); - ms.setVersion(V1); - - List<WorkSlot> slots = new ArrayList<WorkSlot>(); - WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0); - WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1); - WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2); - slots.add(slot0); - slots.add(slot1); - slots.add(slot2); - - StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots); - ms.addQueues(q); - queueId = q.getQueueId(); - return ms; - } - - private static PolicyAssignment createAssignment() { - PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId); - return pa; - } - - private static PolicyDefinition createPolicy() { - PolicyDefinition def = new PolicyDefinition(); - def.setName(TEST_POLICY_1); - def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1)); - def.setOutputStreams(Arrays.asList(OUT_STREAM1)); - def.setParallelismHint(5); - - streamGroup = new StreamGroup(); - par = new StreamPartition(); - par.setStreamId(TEST_STREAM_DEF_1); - par.getColumns().add(COL1); - StreamSortSpec sortSpec = new StreamSortSpec(); -// sortSpec.setColumn("col1"); - sortSpec.setWindowMargin(3); - sortSpec.setWindowPeriod("PT1M"); - - par.setSortSpec(sortSpec); - streamGroup.addStreamPartition(par); - - List<StreamPartition> lists = new ArrayList<StreamPartition>(); - lists.add(par); - def.setPartitionSpec(lists); - return def; - } - - private static StreamDefinition createStreamDefinition() { - StreamDefinition def = new StreamDefinition(); - def.setStreamId(TEST_STREAM_DEF_1); - def.setDataSource(TEST_DATASOURCE_1); - - StreamColumn col = new StreamColumn(); - col.setName(COL1); - col.setRequired(true); - col.setType(Type.STRING); - def.getColumns().add(col); - - return def; - } - - private static Publishment createPublishment() { - Publishment pub = new Publishment(); - pub.setType("KAFKA"); - pub.setName("test-stream-output"); - pub.setPolicyIds(Arrays.asList(TEST_POLICY_1)); - return pub; - } - - private static Kafka2TupleMetadata createKafka2TupleMetadata() { - Kafka2TupleMetadata ktm = new Kafka2TupleMetadata(); - ktm.setName(TEST_DATASOURCE_1); - ktm.setSchemeCls("SchemeClass"); - ktm.setTopic("tupleTopic"); - ktm.setType("KAFKA"); - ktm.setCodec(new Tuple2StreamMetadata()); - return ktm; - } - - private static Topology createSampleTopology() { - Topology t = new Topology(TOPO1, 3, 10); - for (int i = 0; i < t.getNumOfGroupBolt(); i++) { - t.getGroupNodeIds().add(t.getName() + "-grp-" + i); - } - for (int i = 0; i < t.getNumOfAlertBolt(); i++) { - t.getAlertBoltIds().add(t.getName() + "-alert-" + i); - } - return t; - } - -}