http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
deleted file mode 100644
index 84a4061..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.provider;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * @since Mar 28, 2016
- *
- */
-public class InMemScheduleConext implements IScheduleContext {
-
-    private Map<String, Topology> topologies = new HashMap<String, Topology>();
-    private Map<String, TopologyUsage> usages = new HashMap<String, 
TopologyUsage>();
-    private Map<String, PolicyDefinition> policies = new HashMap<String, 
PolicyDefinition>();
-    private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, 
Kafka2TupleMetadata>();
-    private Map<String, PolicyAssignment> policyAssignments = new 
HashMap<String, PolicyAssignment>();
-    private Map<String, StreamDefinition> schemas = new HashMap<String, 
StreamDefinition>();
-    private Map<StreamGroup, MonitoredStream> monitoredStreams = new 
HashMap<StreamGroup, MonitoredStream>();
-    private Map<String, Publishment> publishments = new HashMap<String, 
Publishment>();
-
-    public InMemScheduleConext() {
-    }
-
-    public InMemScheduleConext(IScheduleContext context) {
-        this.topologies = new HashMap<String, 
Topology>(context.getTopologies());
-        this.usages = new HashMap<String, 
TopologyUsage>(context.getTopologyUsages());
-        this.policies = new HashMap<String, 
PolicyDefinition>(context.getPolicies());
-        this.datasources = new HashMap<String, 
Kafka2TupleMetadata>(context.getDataSourceMetadata());
-        this.policyAssignments = new HashMap<String, 
PolicyAssignment>(context.getPolicyAssignments());
-        this.schemas = new HashMap<String, 
StreamDefinition>(context.getStreamSchemas());
-        this.monitoredStreams = new HashMap<StreamGroup, 
MonitoredStream>(context.getMonitoredStreams());
-        this.publishments = new HashMap<String, 
Publishment>(context.getPublishments());
-    }
-
-    public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, 
PolicyAssignment> assignments,
-            Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, 
PolicyDefinition> policies2,
-            Map<String, Publishment> publishments2, Map<String, 
StreamDefinition> streamDefinitions,
-            Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, 
TopologyUsage> usages2) {
-        this.topologies = topologies2;
-        this.policyAssignments = assignments;
-        this.datasources = kafkaSources;
-        this.policies = policies2;
-        this.publishments = publishments2;
-        this.schemas = streamDefinitions;
-        this.monitoredStreams = monitoredStreamMap;
-        this.usages = usages2;
-    }
-
-    public Map<String, Topology> getTopologies() {
-        return topologies;
-    }
-
-    public void addTopology(Topology topo) {
-        topologies.put(topo.getName(), topo);
-    }
-
-    public Map<String, TopologyUsage> getTopologyUsages() {
-        return usages;
-    }
-
-    public void addTopologyUsages(TopologyUsage usage) {
-        usages.put(usage.getTopoName(), usage);
-    }
-
-    public Map<String, PolicyDefinition> getPolicies() {
-        return policies;
-    }
-
-    public void addPoilcy(PolicyDefinition pd) {
-        this.policies.put(pd.getName(), pd);
-    }
-
-    public Map<String, Kafka2TupleMetadata> getDatasources() {
-        return datasources;
-    }
-
-    public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) {
-        this.datasources = datasources;
-    }
-
-    public void addDataSource(Kafka2TupleMetadata dataSource) {
-        this.datasources.put(dataSource.getName(), dataSource);
-    }
-
-    @Override
-    public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() {
-        return datasources;
-    }
-
-    public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> 
policyAssignments) {
-        this.policyAssignments = policyAssignments;
-    }
-
-    public Map<String, PolicyAssignment> getPolicyAssignments() {
-        return this.policyAssignments;
-    }
-
-    @Override
-    public Map<String, StreamDefinition> getStreamSchemas() {
-        return schemas;
-    }
-
-    public void addSchema(StreamDefinition schema) {
-        this.schemas.put(schema.getStreamId(), schema);
-    }
-
-    public void setStreamSchemas(Map<String, StreamDefinition> schemas) {
-        this.schemas = schemas;
-    }
-
-    @Override
-    public Map<StreamGroup, MonitoredStream> getMonitoredStreams() {
-        return monitoredStreams;
-    }
-
-    @Override
-    public Map<String, Publishment> getPublishments() {
-        return publishments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
deleted file mode 100644
index d4d6c0c..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.provider;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * FIXME: this class focus on correctness, not the efficiency now. There might
- * be problem when metadata size grows too big.
- * 
- * @since May 3, 2016
- *
- */
-public class ScheduleContextBuilder {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ScheduleContextBuilder.class);
-    private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
-
-    private IMetadataServiceClient client;
-
-    private Map<String, Topology> topologies;
-    private Map<String, PolicyAssignment> assignments;
-    private Map<String, Kafka2TupleMetadata> kafkaSources;
-    private Map<String, PolicyDefinition> policies;
-    private Map<String, Publishment> publishments;
-    private Map<String, StreamDefinition> streamDefinitions;
-    private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
-    private Map<String, TopologyUsage> usages;
-
-    public ScheduleContextBuilder(Config config) {
-        client = new MetadataServiceClientImpl(config);
-    }
-
-    public ScheduleContextBuilder(IMetadataServiceClient client) {
-        this.client = client;
-    }
-
-    /**
-     * Built a shcedule context for metadata client service.
-     * 
-     * @return
-     */
-    public IScheduleContext buildContext() {
-        topologies = listToMap(client.listTopologies());
-        kafkaSources = listToMap(client.listDataSources());
-        policies = listToMap(client.listPolicies());
-        publishments = listToMap(client.listPublishment());
-        streamDefinitions = listToMap(client.listStreams());
-
-        // TODO: See ScheduleState comments on how to improve the storage
-        ScheduleState state = client.getVersionedSpec();
-        assignments = listToMap(state == null ? new 
ArrayList<PolicyAssignment>() : 
cleanupDeprecatedAssignments(state.getAssignments()));
-
-        monitoredStreamMap = listToMap(state == null ? new 
ArrayList<MonitoredStream>() : 
cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams()));
-
-        // build based on existing data
-        usages = buildTopologyUsage();
-
-        // copy to shedule context now
-        return new InMemScheduleConext(topologies, assignments, kafkaSources, 
policies, publishments,
-                streamDefinitions, monitoredStreamMap, usages);
-    }
-
-    /**
-     * 1.
-     * <pre>
-     * Check for deprecated policy stream group with its assigned monitored 
stream groups.
-     * If this is unmatched, we think the policy' stream group has been 
changed, remove the policy assignments
-     * If finally, no assignment refer to a given monitored stream, this 
monitored stream could be removed.
-     * Log when every time a remove happens.
-     * </pre>
-     * 2.
-     * <pre>
-     * if monitored stream's queue's is on non-existing topology, remove it.
-     * </pre>
-     * @param monitoredStreams
-     * @return
-     */
-    private List<MonitoredStream> 
cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) {
-        List<MonitoredStream> result = new 
ArrayList<MonitoredStream>(monitoredStreams);
-        
-        // clear deprecated streams
-        clearMonitoredStreams(monitoredStreams);
-
-        // build queueId-> streamGroup
-        Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, 
StreamGroup>();
-        for (MonitoredStream ms : result) {
-            for (StreamWorkSlotQueue q : ms.getQueues()) {
-                queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup());
-            }
-        }
-
-        // decide the assignment delete set
-        Set<StreamGroup> usedGroups = new HashSet<StreamGroup>();
-        Set<String> toRemove = new HashSet<String>();
-        // check if queue is still referenced by policy assignments
-        for (PolicyAssignment assignment : assignments.values()) {
-            PolicyDefinition def = policies.get(assignment.getPolicyName());
-            StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
-            if (group == null || !Objects.equals(group.getStreamPartitions(), 
def.getPartitionSpec())) {
-                LOG.warn(" policy assgiment {} 's policy group {} is different 
to the monitored stream's partition group {}, "
-                                + "this indicates a policy stream partition 
spec change, the assignment would be removed! ",
-                        assignment, def.getPartitionSpec(), group == null ? 
"'not found'" :group.getStreamPartitions());
-                toRemove.add(assignment.getPolicyName());
-            } else {
-                usedGroups.add(group);
-            }
-        }
-
-        // remove useless
-        assignments.keySet().removeAll(toRemove);
-        // remove non-referenced monitored streams
-        result.removeIf((t) -> {
-            boolean used = usedGroups.contains(t.getStreamGroup());
-            if (!used) {
-                LOG.warn("monitor stream with stream group {} is not 
referenced, "
-                        + "this monitored stream and its worker queu will be 
removed", t.getStreamGroup());
-            }
-            return !used; 
-        });
-
-        return result;
-    }
-
-    private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) 
{
-        Iterator<MonitoredStream> it = monitoredStreams.iterator();
-        while (it.hasNext()) {
-            MonitoredStream ms = it.next();
-            Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
-            // clean queue that underly topology is changed(removed/down)
-            while (queueIt.hasNext()) {
-                StreamWorkSlotQueue queue = queueIt.next();
-                boolean deprecated = false;
-                for (WorkSlot ws : queue.getWorkingSlots()) {
-                    // check if topology available or bolt available
-                    if (!topologies.containsKey(ws.topologyName)
-                            || 
!topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
-                        deprecated = true;
-                        break;
-                    }
-                }
-                if (deprecated) {
-                    queueIt.remove();
-                }
-            }
-
-            if (ms.getQueues().isEmpty()) {
-                it.remove();
-            }
-        }
-    }
-
-    private List<PolicyAssignment> 
cleanupDeprecatedAssignments(List<PolicyAssignment> list) {
-        List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
-        Iterator<PolicyAssignment> paIt = result.iterator();
-        while (paIt.hasNext()) {
-            PolicyAssignment assignment = paIt.next();
-            if (!policies.containsKey(assignment.getPolicyName())) {
-                LOG.info("Policy assignment {} 's policy not found, this 
assignment will be removed!", assignment);
-                paIt.remove();
-            }
-        }
-        return result;
-    }
-
-    private <T, K> Map<K, T> listToMap(List<T> collections) {
-        Map<K, T> maps = new HashMap<K, T>(collections.size());
-        for (T t : collections) {
-            maps.put(getKey(t), t);
-        }
-        return maps;
-    }
-
-    /*
-     * One drawback, once we add class, this code need to be changed!
-     */
-    @SuppressWarnings("unchecked")
-    private <T, K> K getKey(T t) {
-        if (t instanceof Topology) {
-            return (K) ((Topology) t).getName();
-        } else if (t instanceof PolicyAssignment) {
-            return (K) ((PolicyAssignment) t).getPolicyName();
-        } else if (t instanceof Kafka2TupleMetadata) {
-            return (K) ((Kafka2TupleMetadata) t).getName();
-        } else if (t instanceof PolicyDefinition) {
-            return (K) ((PolicyDefinition) t).getName();
-        } else if (t instanceof Publishment) {
-            return (K) ((Publishment) t).getName();
-        } else if (t instanceof StreamDefinition) {
-            return (K) ((StreamDefinition) t).getStreamId();
-        } else if (t instanceof MonitoredStream) {
-            return (K) ((MonitoredStream) t).getStreamGroup();
-        }
-        throw new RuntimeException("unexpected key class " + t.getClass());
-    }
-
-    private Map<String, TopologyUsage> buildTopologyUsage() {
-        Map<String, TopologyUsage> usages = new HashMap<String, 
TopologyUsage>();
-
-        // pre-build data structure for help
-        Map<String, Set<MonitoredStream>> topo2MonitorStream = new 
HashMap<String, Set<MonitoredStream>>();
-        Map<String, Set<String>> topo2Policies = new HashMap<String, 
Set<String>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<String>> bolt2Policies = new HashMap<String, 
Set<String>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, 
Set<StreamGroup>>();
-        // simply assume no bolt with the same id
-        Map<String, Set<String>> bolt2QueueIds = new HashMap<String, 
Set<String>>();
-        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, 
StreamWorkSlotQueue>();
-
-        preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, 
bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
-        
-        for (Topology t : topologies.values()) {
-            TopologyUsage u = new TopologyUsage(t.getName());
-            // add group/bolt usages
-            for (String grpBolt : t.getGroupNodeIds()) {
-                GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt);
-                u.getGroupUsages().put(grpBolt, grpUsage);
-            }
-            for (String alertBolt : t.getAlertBoltIds()) {
-                String uniqueBoltId = String.format(UNIQUE_BOLT_ID, 
t.getName(), alertBolt);
-
-                AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt);
-                u.getAlertUsages().put(alertBolt, alertUsage);
-                // complete usage
-                addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, 
uniqueBoltId, alertUsage, queueMap);
-            }
-
-            // policy -- policy assignment
-            if (topo2Policies.containsKey(u.getTopoName())) {
-                u.getPolicies().addAll(topo2Policies.get(u.getTopoName()));
-            }
-
-            // data source
-            buildTopologyDataSource(u);
-
-            // topology usage monitored stream -- from monitored steams' queue 
slot item info
-            if (topo2MonitorStream.containsKey(u.getTopoName())) {
-                
u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName()));
-            }
-
-            usages.put(u.getTopoName(), u);
-        }
-
-        return usages;
-    }
-
-    private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
-            Map<String, Set<StreamGroup>> bolt2Partition, Map<String, 
Set<String>> bolt2QueueIds, String uniqueAlertBolt,
-            AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> 
queueMap) {
-        //
-        if (bolt2Policies.containsKey(uniqueAlertBolt)) {
-            
alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
-        }
-        //
-        if (bolt2Partition.containsKey(uniqueAlertBolt)) {
-            
alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt));
-        }
-        //
-        if (bolt2QueueIds.containsKey(uniqueAlertBolt)) {
-            for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) {
-                if (queueMap.containsKey(qId)) {
-                    alertUsage.getReferQueues().add(queueMap.get(qId));
-                } else {
-                    LOG.error(" queue id {} not found in queue map !", qId);
-                }
-            }
-        }
-    }
-
-    private void buildTopologyDataSource(TopologyUsage u) {
-        for (String policyName : u.getPolicies()) {
-            PolicyDefinition def = policies.get(policyName);
-            if (def != null) {
-                u.getDataSources().addAll(findDatasource(def));
-            } else {
-                LOG.error(" policy not find {}, but reference in topology 
usage {} !", policyName, u.getTopoName());
-            }
-        }
-    }
-
-    private List<String> findDatasource(PolicyDefinition def) {
-        List<String> result = new ArrayList<String>();
-        List<String> inputStreams = def.getInputStreams();
-        for (String is : inputStreams) {
-            StreamDefinition ss = this.streamDefinitions.get(is);
-            if (ss == null) {
-                LOG.error("policy {} referenced stream definition {} not found 
in definiton !", def.getName(), is);
-            } else {
-                result.add(ss.getDataSource());
-            }
-        }
-        return result;
-    }
-
-    private void preBuildQueue2TopoMap(
-            Map<String, Set<MonitoredStream>> topo2MonitorStream,
-            Map<String, Set<String>> topo2Policies, 
-            Map<String, Set<String>> bolt2Policies, 
-            Map<String, Set<StreamGroup>> bolt2Partition, 
-            Map<String, Set<String>> bolt2QueueIds,
-            Map<String, StreamWorkSlotQueue> queueMap) {
-        // pre-build structure
-        // why don't reuse the queue.getPolicies
-        Map<String, Set<String>> queue2Policies= new HashMap<String, 
Set<String>>();
-        for (PolicyAssignment pa : assignments.values()) {
-            if (!queue2Policies.containsKey(pa.getQueueId())) {
-                queue2Policies.put(pa.getQueueId(), new HashSet<String>());
-            }
-            queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName());
-        }
-
-        for (MonitoredStream stream : monitoredStreamMap.values()) {
-            for (StreamWorkSlotQueue q : stream.getQueues()) {
-                queueMap.put(q.getQueueId(), q);
-                Set<String> policiesOnQ = 
queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) 
: new HashSet<String>();
-                
-                for (WorkSlot slot : q.getWorkingSlots()) {
-                    // topo2monitoredstream
-                    if 
(!topo2MonitorStream.containsKey(slot.getTopologyName())) {
-                        topo2MonitorStream.put(slot.getTopologyName(), new 
HashSet<MonitoredStream>());
-                    }
-                    topo2MonitorStream.get(slot.getTopologyName()).add(stream);
-                    
-                    // topo2policy
-                    if (!topo2Policies.containsKey(slot.getTopologyName())) {
-                        topo2Policies.put(slot.getTopologyName(), new 
HashSet<String>());
-                    }
-                    
topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
-                    
-                    // bolt2Policy
-                    if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
-                        bolt2Policies.put(getUniqueBoltId(slot), new 
HashSet<String>());
-                    }
-                    
bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
-                    
-                    // bolt2Queue
-                    if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
-                        bolt2QueueIds.put(getUniqueBoltId(slot), new 
HashSet<String>());
-                    }
-                    
bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
-                    
-                    // bolt2Partition
-                    if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
-                        bolt2Partition.put(getUniqueBoltId(slot), new 
HashSet<StreamGroup>());
-                    }
-                    
bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup());
-                }
-            }
-        }
-    }
-
-    private String getUniqueBoltId(WorkSlot slot) {
-        return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), 
slot.getBoltId());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
deleted file mode 100644
index 6b8495e..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.resource;
-
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.utils.JsonUtils;
-
-/**
- * This is to provide API access even we don't have ZK as intermediate access.
- * FIXME : more elogant status code
- * 
- * @since Mar 24, 2016 <br/>
- */
-@Path("/coordinator")
-@Produces({ "application/json" })
-public class CoordinatorResource {
-
-    // sprint config here?
-    private Coordinator alertCoordinator = new Coordinator();
-
-    @GET
-    @Path("/assignments")
-    public String getAssignments() throws Exception {
-        ScheduleState state = alertCoordinator.getState();
-        return JsonUtils.writeValueAsString(state);
-    }
-
-    @POST
-    @Path("/build")
-    public String build() throws Exception {
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = alertCoordinator.schedule(option);
-        return JsonUtils.writeValueAsString(state);
-    }
-
-    /**
-     * Manually update the topology usages, for administration
-     * 
-     * @return
-     */
-    @POST
-    @Path("/refreshUsages")
-    public String refreshUsages() {
-        // TODO
-        return "";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
deleted file mode 100644
index 15333da..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.trigger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Poll policy change and notify listeners
- */
-public class DynamicPolicyLoader implements Runnable{
-    private static Logger LOG = 
LoggerFactory.getLogger(DynamicPolicyLoader.class);
-
-    private IMetadataServiceClient client;
-    // initial cachedPolicies should be empty
-    private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
-    private List<PolicyChangeListener> listeners = new ArrayList<>();
-
-    public DynamicPolicyLoader(IMetadataServiceClient client){
-        this.client = client;
-    }
-
-    public synchronized void addPolicyChangeListener(PolicyChangeListener 
listener){
-        listeners.add(listener);
-    }
-
-    /**
-     * When it is run at the first time, due to cachedPolicies being empty, 
all existing policies are expected
-     * to be addedPolicies
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void run() {
-        // we should catch every exception to avoid zombile thread
-        try {
-            List<PolicyDefinition> current = client.listPolicies();
-            Map<String, PolicyDefinition> currPolicies = new HashMap<>();
-            current.forEach(pe -> currPolicies.put(pe.getName(), pe));
-
-            Collection<String> addedPolicies = 
CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet());
-            Collection<String> removedPolicies = 
CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet());
-            Collection<String> potentiallyModifiedPolicies = 
CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet());
-
-            List<String> reallyModifiedPolicies = new ArrayList<>();
-            for (String updatedPolicy : potentiallyModifiedPolicies) {
-                if 
(!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) {
-                    reallyModifiedPolicies.add(updatedPolicy);
-                }
-            }
-
-            boolean policyChanged = false;
-            if (addedPolicies.size() != 0 ||
-                    removedPolicies.size() != 0 ||
-                    reallyModifiedPolicies.size() != 0) {
-                policyChanged = true;
-            }
-
-            if (!policyChanged) {
-                LOG.info("policy is not changed since last run");
-                return;
-            }
-            synchronized (this) {
-                for (PolicyChangeListener listener : listeners) {
-                    listener.onPolicyChange(current, addedPolicies, 
removedPolicies, reallyModifiedPolicies);
-                }
-            }
-
-            // reset cached policies
-            cachedPolicies = currPolicies;
-        } catch (Throwable t) {
-            LOG.error("error loading policy, but continue to run", t);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
deleted file mode 100644
index cd5265d..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limit
- */
-package org.apache.eagle.alert.coordinator.trigger;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-public interface PolicyChangeListener {
-    void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> 
addedPolicies, Collection<String> removedPolicies, Collection<String> 
modifiedPolicies);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
deleted file mode 100644
index 23ee161..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
+++ /dev/null
@@ -1,41 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-{
-       "coordinator" : {
-               "policiesPerBolt" : 5,
-               "boltParallelism" : 5,
-               "policyDefaultParallelism" : 5,
-               "boltLoadUpbound": 0.8,
-               "topologyLoadUpbound" : 0.8,
-               "numOfAlertBoltsPerTopology" : 5,
-               "zkConfig" : {
-                       "zkQuorum" : "localhost:2181",
-                       "zkRoot" : "/alert",
-                       "zkSessionTimeoutMs" : 10000,
-                       "connectionTimeoutMs" : 10000,
-                       "zkRetryTimes" : 3,
-                       "zkRetryInterval" : 3000
-               },
-               "metadataService" : {
-                       "host" : "localhost",
-                       "port" : 8080,
-                       "context" : "/api"
-               },
-               "metadataDynamicCheck" : {
-                       "initDelayMillis" : 1000,
-                       "delayMillis" : 30000
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
deleted file mode 100644
index d4bc126..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
deleted file mode 100644
index 1aa925e..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
+++ /dev/null
@@ -1,87 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<web-app xmlns="http://java.sun.com/xml/ns/javaee";
-           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
-                 http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd";
-           version="3.0">
-    <welcome-file-list>
-        <welcome-file>index.html</welcome-file>
-    </welcome-file-list>
-    <servlet>
-        <servlet-name>Jersey Web Application</servlet-name>
-        
<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
-        <init-param>
-            <param-name>com.sun.jersey.config.property.packages</param-name>
-            
<param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value>
-        </init-param>
-        <init-param>
-            
<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
-            
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
-        </init-param>
-        <init-param>
-            
<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
-            
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
-        </init-param>
-        <load-on-startup>1</load-on-startup>
-    </servlet>
-    <!-- Servlet for swagger initialization only, no URL mapping. -->
-       <servlet>
-               <servlet-name>swaggerConfig</servlet-name>
-               
<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
-               <init-param>
-                       <param-name>api.version</param-name>
-                       <param-value>1.0.0</param-value>
-               </init-param>
-               <init-param>
-                       <param-name>swagger.api.basepath</param-name>
-                       <param-value>/api</param-value>
-               </init-param>
-               <load-on-startup>2</load-on-startup>
-       </servlet>
-
-    <servlet-mapping>
-        <servlet-name>Jersey Web Application</servlet-name>
-        <url-pattern>/api/*</url-pattern>
-    </servlet-mapping>
-    <filter>
-        <filter-name>CorsFilter</filter-name>
-        <!-- this should be replaced by tomcat ones, see also metadata 
resource -->
-        
<filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
-        <init-param>
-            <param-name>cors.allowed.origins</param-name>
-            <param-value>*</param-value>
-        </init-param>
-        <init-param>
-            <param-name>cors.allowed.headers</param-name>
-            <param-value>Authorization,Origin, No-Cache, X-Requested-With, 
If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, 
X-E4M-With, Accept</param-value>
-        </init-param>
-        <init-param>
-            <param-name>cors.allowed.methods</param-name>
-            <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
-        </init-param>
-        <init-param>
-            <param-name>cors.support.credentials</param-name>
-            <param-value>true</param-value>
-        </init-param>
-    </filter>
-    <filter-mapping>
-        <filter-name>CorsFilter</filter-name>
-        <url-pattern>/*</url-pattern>
-    </filter-mapping>
-</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
deleted file mode 100644
index 1c4ea76..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
+++ /dev/null
@@ -1,18 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-UMP Coordinator service!

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
deleted file mode 100644
index 78b72b3..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.eagle.alert.config.ConfigBusConsumer;
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ConfigChangeCallback;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since May 5, 2016
- *
- */
-public class CoordinatorTest {
-
-    @SuppressWarnings({ "resource", "unused" })
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        before();
-        Config config = ConfigFactory.load().getConfig("coordinator");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
-        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-
-        Coordinator coordinator = new Coordinator(config, producer, client);
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = coordinator.schedule(option);
-        String v = state.getVersion();
-
-        AtomicBoolean validated = new AtomicBoolean(false);
-        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, 
"topo1/spout", new ConfigChangeCallback() {
-            @Override
-            public void onNewConfig(ConfigValue value) {
-                String vId = value.getValue().toString();
-                Assert.assertEquals(v, vId);
-                validated.set(true);
-            }
-        });
-
-        Thread.sleep(1000);
-        Assert.assertTrue(validated.get());
-    }
-
-    @SuppressWarnings({ "resource", "unused" })
-    @Test
-    public void test_01() throws Exception {
-        before();
-        Config config = ConfigFactory.load().getConfig("coordinator");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
-        IMetadataServiceClient client = 
ScheduleContextBuilderTest.getSampleMetadataService();
-
-        Coordinator coordinator = new Coordinator(config, producer, client);
-        ScheduleOption option = new ScheduleOption();
-        ScheduleState state = coordinator.schedule(option);
-        String v = state.getVersion();
-
-        // TODO : assert version
-
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicBoolean validated = new AtomicBoolean(false);
-        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, 
"topo1/spout", new ConfigChangeCallback() {
-            @Override
-            public void onNewConfig(ConfigValue value) {
-                String vId = value.getValue().toString();
-                Assert.assertEquals(v, vId);
-                validated.set(true);
-                latch.countDown();
-            }
-        });
-
-        latch.await(3, TimeUnit.SECONDS);
-        Assert.assertTrue(validated.get());
-    }
-
-    @Ignore
-    @Test
-    public void test_main() throws Exception {
-        before();
-
-        Coordinator.main(null);
-    }
-
-    @Before
-    public void before() {
-        System.setProperty("config.resource", "/test-application.conf");
-        ConfigFactory.invalidateCaches();
-        ConfigFactory.load().getConfig("coordinator");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
deleted file mode 100644
index 155a9e5..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.alert.coordinator;
-
-/**
- * Since 4/28/16.
- */
-public class DynamicPolicyLoaderTest {
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
deleted file mode 100644
index e2ea031..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.Coordinator;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since May 9, 2016
- *
- */
-public class MetadataServiceClientImplTest {
-
-    @Ignore
-    @Test
-    public void addScheduleState() throws Exception {
-        ConfigFactory.invalidateCaches();
-        System.setProperty("config.resource", "/test-application.conf");
-        Config config = 
ConfigFactory.load("test-application.conf").getConfig("coordinator");
-        MetadataServiceClientImpl client = new 
MetadataServiceClientImpl(config);
-
-        ScheduleState ss = new ScheduleState();
-        ss.setVersion("spec_version_1463764252582");
-
-        client.addScheduleState(ss);
-
-        client.close();
-
-        ss.setVersion("spec_version_1464764252582");
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
-        Coordinator.postSchedule(client, ss, producer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
 
b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
deleted file mode 100644
index f2e67de..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.alert.coordinator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @since May 5, 2016
- *
- */
-public class ScheduleContextBuilderTest {
-
-    @Test
-    public void test() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
-
-        IScheduleContext context = builder.buildContext();
-
-        // assert topology usage
-        Map<String, TopologyUsage> usages = context.getTopologyUsages();
-        Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size());
-        
Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1));
-
-        String alertBolt0 = TOPO1 + "-alert-" + "0";
-        String alertBolt1 = TOPO1 + "-alert-" + "1";
-        String alertBolt2 = TOPO1 + "-alert-" + "2";
-        for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) {
-            if (u.getBoltId().equals(alertBolt0) || 
u.getBoltId().equals(alertBolt1)
-                    || u.getBoltId().equals(alertBolt2)) {
-                Assert.assertEquals(1, u.getPolicies().size());
-                Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1));
-                Assert.assertEquals(1, u.getPartitions().size());
-                Assert.assertEquals(1, u.getReferQueues().size());
-            }
-        }
-    }
-
-    @Test
-    public void test_remove_policy() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
-
-        PolicyAssignment assignment1 = 
client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        
Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, 
assignment1.getQueueId()).getRight();
-
-        client.listPolicies().remove(0);
-        context = builder.buildContext();
-        
Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        WorkSlot slot = queue.getWorkingSlots().get(0);
-        Set<String> topoPolicies = 
context.getTopologyUsages().get(slot.topologyName).getPolicies();
-        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
-        Assert.assertEquals(0, topoPolicies.size());
-    }
-
-    @Test
-    public void test_changed_policy() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
-        PolicyAssignment assignment1 = 
client.getVersionedSpec().getAssignments().get(0);
-
-        IScheduleContext context = builder.buildContext();
-        
Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, 
assignment1.getQueueId()).getRight();
-
-        PolicyDefinition pd1 = client.listPolicies().get(0);
-        // add a new group by column : need to replace the partiton spec, to
-        // avoid reference same object in
-        // on jvm (no serialization and deserialization)
-        StreamPartition par = new 
StreamPartition(pd1.getPartitionSpec().get(0));
-        par.getColumns().add("s1");
-        pd1.getPartitionSpec().clear();
-        pd1.getPartitionSpec().add(par);
-
-        context = builder.buildContext();
-
-        // assert the policy assignment is removed
-        
Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-        // assert the monitored stream is removed as no policy on it now.
-        Assert.assertEquals(0, context.getMonitoredStreams().size());
-        // assert the topology usage doesn't contain policy
-        WorkSlot slot = queue.getWorkingSlots().get(0);
-        TopologyUsage topologyUsage = 
context.getTopologyUsages().get(slot.topologyName);
-        Set<String> topoPolicies = topologyUsage.getPolicies();
-        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
-        Assert.assertEquals(0, topoPolicies.size());
-        // assert the topology usage doesn't contain the monitored stream
-        Assert.assertEquals(0, topologyUsage.getMonitoredStream().size());
-        // assert the alert bolt usage doesn't have the queue reference
-        Assert.assertEquals(0, 
topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size());
-    }
-
-    @Test
-    public void test_renamed_topologies() {
-        InMemMetadataServiceClient client = getSampleMetadataService();
-        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
-
-        IScheduleContext context = builder.buildContext();
-        
Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-
-        Topology t = client.listTopologies().get(0);
-        t.setName("newName");
-
-        context = builder.buildContext();
-        
Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
-    }
-
-    private static final String TOPO1 = "topo1";
-    private static final String V1 = "v1";
-    private static final String COL1 = "col1";
-    private static final String OUT_STREAM1 = "out-stream1";
-    private static final String TEST_POLICY_1 = "test-policy-1";
-    private static final String TEST_STREAM_DEF_1 = "testStreamDef";
-    private static final String TEST_DATASOURCE_1 = "test-datasource-1";
-    private static StreamPartition par;
-    private static String queueId;
-    private static StreamGroup streamGroup;
-
-    public static InMemMetadataServiceClient getSampleMetadataService() {
-        InMemMetadataServiceClient client = new InMemMetadataServiceClient();
-        client.listTopologies().add(createSampleTopology());
-        client.listDataSources().add(createKafka2TupleMetadata());
-        // client.listSpoutMetadata().add(createS)
-        client.listPolicies().add(createPolicy());
-        client.listPublishment().add(createPublishment());
-        client.listStreams().add(createStreamDefinition());
-        client.addScheduleState(createScheduleState());
-        return client;
-    }
-
-    private static ScheduleState createScheduleState() {
-        ScheduleState ss = new ScheduleState();
-        ss.setVersion(V1);
-
-        ss.getMonitoredStreams().add(createMonitoredStream());
-        ss.getAssignments().add(createAssignment());
-
-        return ss;
-    }
-
-    private static MonitoredStream createMonitoredStream() {
-        MonitoredStream ms = new MonitoredStream(streamGroup);
-        ms.setVersion(V1);
-
-        List<WorkSlot> slots = new ArrayList<WorkSlot>();
-        WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0);
-        WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1);
-        WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2);
-        slots.add(slot0);
-        slots.add(slot1);
-        slots.add(slot2);
-
-        StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, 
new HashMap<>(), slots);
-        ms.addQueues(q);
-        queueId = q.getQueueId();
-        return ms;
-    }
-
-    private static PolicyAssignment createAssignment() {
-        PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId);
-        return pa;
-    }
-
-    private static PolicyDefinition createPolicy() {
-        PolicyDefinition def = new PolicyDefinition();
-        def.setName(TEST_POLICY_1);
-        def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1));
-        def.setOutputStreams(Arrays.asList(OUT_STREAM1));
-        def.setParallelismHint(5);
-
-        streamGroup = new StreamGroup();
-        par = new StreamPartition();
-        par.setStreamId(TEST_STREAM_DEF_1);
-        par.getColumns().add(COL1);
-        StreamSortSpec sortSpec = new StreamSortSpec();
-//        sortSpec.setColumn("col1");
-        sortSpec.setWindowMargin(3);
-        sortSpec.setWindowPeriod("PT1M");
-
-        par.setSortSpec(sortSpec);
-        streamGroup.addStreamPartition(par);
-
-        List<StreamPartition> lists = new ArrayList<StreamPartition>();
-        lists.add(par);
-        def.setPartitionSpec(lists);
-        return def;
-    }
-
-    private static StreamDefinition createStreamDefinition() {
-        StreamDefinition def = new StreamDefinition();
-        def.setStreamId(TEST_STREAM_DEF_1);
-        def.setDataSource(TEST_DATASOURCE_1);
-
-        StreamColumn col = new StreamColumn();
-        col.setName(COL1);
-        col.setRequired(true);
-        col.setType(Type.STRING);
-        def.getColumns().add(col);
-
-        return def;
-    }
-
-    private static Publishment createPublishment() {
-        Publishment pub = new Publishment();
-        pub.setType("KAFKA");
-        pub.setName("test-stream-output");
-        pub.setPolicyIds(Arrays.asList(TEST_POLICY_1));
-        return pub;
-    }
-
-    private static Kafka2TupleMetadata createKafka2TupleMetadata() {
-        Kafka2TupleMetadata ktm = new Kafka2TupleMetadata();
-        ktm.setName(TEST_DATASOURCE_1);
-        ktm.setSchemeCls("SchemeClass");
-        ktm.setTopic("tupleTopic");
-        ktm.setType("KAFKA");
-        ktm.setCodec(new Tuple2StreamMetadata());
-        return ktm;
-    }
-
-    private static Topology createSampleTopology() {
-        Topology t = new Topology(TOPO1, 3, 10);
-        for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
-            t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
-        }
-        for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
-            t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
-        }
-        return t;
-    }
-
-}

Reply via email to