http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
new file mode 100644
index 0000000..40f16e9
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
+import org.apache.eagle.alert.coordination.model.PublishSpec;
+import org.apache.eagle.alert.coordination.model.RouterSpec;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.SpoutSpec;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
+import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
+import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Apr 26, 2016
+ * Given current policy placement, figure out monitor metadata
+ * 
+ * TODO: refactor to eliminate the duplicate of stupid 
if-notInMap-then-create....
+ * FIXME: too many duplicated code logic : check null; add list to map; add to 
list.. 
+ */
+public class MonitorMetadataGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MonitorMetadataGenerator.class);
+
+    private IScheduleContext context;
+
+    public MonitorMetadataGenerator(IScheduleContext context) {
+        this.context = context;
+    }
+
+    public ScheduleState generate(List<WorkItem> expandworkSets) {
+        // topologyId -> SpoutSpec
+        Map<String, SpoutSpec> topoSpoutSpecsMap = 
generateSpoutMonitorMetadata();
+
+        // grp-by meta spec(sort & grp)
+        Map<String, RouterSpec> groupSpecsMap = 
generateGroupbyMonitorMetadata();
+
+        // alert bolt spec
+        Map<String, AlertBoltSpec> alertSpecsMap = 
generateAlertMonitorMetadata();
+
+        Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
+
+        String uniqueVersion = generateVersion();
+        ScheduleState status = new ScheduleState(uniqueVersion, 
+                topoSpoutSpecsMap, 
+                groupSpecsMap, 
+                alertSpecsMap,
+                publishSpecsMap, 
+                context.getPolicyAssignments().values(), 
+                context.getMonitoredStreams().values(),
+                context.getPolicies().values(),
+                context.getStreamSchemas().values());
+        return status;
+    }
+
+    private Map<String, PublishSpec> generatePublishMetadata() {
+        Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>();
+        // prebuild policy to publishment map
+        Map<String, List<Publishment>> policyToPub = new HashMap<String, 
List<Publishment>>();
+        for (Publishment pub : context.getPublishments().values()) {
+            for (String policyId : pub.getPolicyIds()) {
+                List<Publishment> policyPubs = policyToPub.get(policyId);
+                if (policyPubs == null) {
+                    policyPubs = new ArrayList<>();
+                    policyToPub.put(policyId, policyPubs);
+                }
+                policyPubs.add(pub);
+            }
+        }
+
+        // build per topology
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            PublishSpec pubSpec = pubSpecs.get(u.getTopoName());
+            if (pubSpec == null) {
+                pubSpec = new PublishSpec(u.getTopoName(), 
context.getTopologies().get(u.getTopoName()).getPubBoltId());
+                pubSpecs.put(u.getTopoName(), pubSpec);
+            }
+
+            for (String p : u.getPolicies()) {
+                PolicyDefinition definition = context.getPolicies().get(p);
+                if (definition == null) {
+                    continue;
+                }
+                if (policyToPub.containsKey(p)) {
+                    for (Publishment pub : policyToPub.get(p)) {
+                        pubSpec.addPublishment(pub);
+                    }
+                }
+            }
+        }
+        return pubSpecs;
+    }
+
+    /**
+     * FIXME: add auto-increment version number?
+     */
+    private String generateVersion() {
+        return "spec_version_" + System.currentTimeMillis();
+    }
+
+    private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() {
+        Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, 
AlertBoltSpec>();
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName());
+            if (alertSpec == null) {
+                alertSpec = new AlertBoltSpec(u.getTopoName());
+                alertSpecs.put(u.getTopoName(), alertSpec);
+            }
+            for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) {
+                for (String policyName : boltUsage.getPolicies()) {
+                    PolicyDefinition definition = 
context.getPolicies().get(policyName);
+                    alertSpec.addBoltPolicy(boltUsage.getBoltId(), 
definition.getName());
+                }
+            }
+        }
+        return alertSpecs;
+    }
+
+    private Map<String, RouterSpec> generateGroupbyMonitorMetadata() {
+        Map<String, RouterSpec> groupSpecsMap = new HashMap<String, 
RouterSpec>();
+        for (TopologyUsage u : context.getTopologyUsages().values()) {
+            RouterSpec spec = groupSpecsMap.get(u.getTopoName());
+            if (spec == null) {
+                spec = new RouterSpec(u.getTopoName());
+                groupSpecsMap.put(u.getTopoName(), spec);
+            }
+            
+            for (MonitoredStream ms : u.getMonitoredStream()) {
+                // mutiple stream on the same policy group : for correlation 
group case:
+                for (StreamPartition partiton : 
ms.getStreamGroup().getStreamPartitions()) {
+                    StreamRouterSpec routeSpec = new StreamRouterSpec();
+                    routeSpec.setPartition(partiton);
+                    routeSpec.setStreamId(partiton.getStreamId());
+
+                    for (StreamWorkSlotQueue sq : ms.getQueues()) {
+                        PolicyWorkerQueue queue = new PolicyWorkerQueue();
+                        queue.setWorkers(sq.getWorkingSlots());
+                        queue.setPartition(partiton);
+                        routeSpec.addQueue(queue);
+                    }
+
+                    spec.addRouterSpec(routeSpec);
+                }
+            }
+        }
+
+        return groupSpecsMap;
+    }
+
+    private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
+        Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
+        
+        Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, 
SpoutSpec>();
+        // streamName -> StreamDefinition
+        Map<String, StreamDefinition> streamSchemaMap = 
context.getStreamSchemas();
+        Map<String, Kafka2TupleMetadata> datasourcesMap = 
context.getDataSourceMetadata();
+        for (TopologyUsage usage : context.getTopologyUsages().values()) {
+            Topology topo = context.getTopologies().get(usage.getTopoName());
+
+            // based on data source schemas
+            // generate topic -> Kafka2TupleMetadata
+            // generate topic -> Tuple2StreamMetadata (actually the schema 
selector)
+            Map<String, Kafka2TupleMetadata> dss = new HashMap<String, 
Kafka2TupleMetadata>();
+            Map<String, Tuple2StreamMetadata> tss = new HashMap<String, 
Tuple2StreamMetadata>();
+            for (String dataSourceId : usage.getDataSources()) {
+                Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId);
+                dss.put(ds.getTopic(), ds);
+                tss.put(ds.getTopic(), ds.getCodec());
+            }
+
+            // generate topicId -> StreamRepartitionMetadata
+            Map<String, List<StreamRepartitionMetadata>> streamsMap = new 
HashMap<String, List<StreamRepartitionMetadata>>();
+            for (String policyName : usage.getPolicies()) {
+                PolicyDefinition def = context.getPolicies().get(policyName);
+                
+                PolicyAssignment assignment = 
context.getPolicyAssignments().get(policyName);
+                if (assignment == null) {
+                    LOG.error(" can not find assignment for policy {} ! ", 
policyName);
+                    continue;
+                }
+
+                for (StreamPartition policyStreamPartition : 
def.getPartitionSpec()) {
+                    String stream = policyStreamPartition.getStreamId();
+                    StreamDefinition schema = streamSchemaMap.get(stream);
+                    String topic = 
datasourcesMap.get(schema.getDataSource()).getTopic();
+
+                    // add stream name to tuple metadata
+                    if (tss.containsKey(topic)) {
+                        Tuple2StreamMetadata tupleMetadata = tss.get(topic);
+                        tupleMetadata.getActiveStreamNames().add(stream);
+                    }
+
+                    // grouping strategy
+                    StreamRepartitionStrategy gs = new 
StreamRepartitionStrategy();
+                    gs.partition = policyStreamPartition;
+                    gs.numTotalParticipatingRouterBolts = 
queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts();
+                    gs.startSequence = 
queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName());
+                    gs.totalTargetBoltIds = new 
ArrayList<String>(topo.getGroupNodeIds());
+
+                    // add to map
+                    addGroupingStrategy(streamsMap, stream, schema, topic, 
schema.getDataSource(), gs);
+                }
+            }
+
+            SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, 
tss, dss);
+            topoSpoutSpecsMap.put(topo.getName(), spoutSpec);
+        }
+        return topoSpoutSpecsMap;
+    }
+
+    /**
+     * Work queue not a root level object, thus we need to build a map from
+     * MonitoredStream for later quick lookup
+     * 
+     * @return
+     */
+    private Map<String, StreamWorkSlotQueue> buildQueueMap() {
+        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, 
StreamWorkSlotQueue>();
+        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
+            for (StreamWorkSlotQueue queue : ms.getQueues()) {
+                queueMap.put(queue.getQueueId(), queue);
+            }
+        }
+        return queueMap;
+    }
+
+    private void addGroupingStrategy(Map<String, 
List<StreamRepartitionMetadata>> streamsMap, String stream,
+            StreamDefinition schema, String topicName, String datasourceName, 
StreamRepartitionStrategy gs) {
+        List<StreamRepartitionMetadata> dsStreamMeta;
+        if (streamsMap.containsKey(topicName)) {
+            dsStreamMeta = streamsMap.get(topicName);
+        } else {
+            dsStreamMeta = new ArrayList<StreamRepartitionMetadata>();
+            streamsMap.put(topicName, dsStreamMeta);
+        }
+        StreamRepartitionMetadata targetSm = null;
+        for (StreamRepartitionMetadata sm : dsStreamMeta) {
+            if (stream.equalsIgnoreCase(sm.getStreamId())) {
+                targetSm = sm;
+                break;
+            }
+        }
+        if (targetSm == null) {
+            targetSm = new StreamRepartitionMetadata(datasourceName, 
schema.getStreamId());
+            dsStreamMeta.add(targetSm);
+        }
+        if (!targetSm.groupingStrategies.contains(gs)) {
+            targetSm.addGroupStrategy(gs);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
new file mode 100644
index 0000000..ea96d79
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+
+/**
+ * Schedule result for one policy
+ * 
+ * 
+ * @since Apr 26, 2016
+ *
+ */
+public class ScheduleResult {
+    int code;
+    String message;
+    String policyName;
+    StreamPartition partition;
+    int index;
+    List<PolicyAssignment> topoliciesScheduled;
+
+    public String toString() {
+        return String.format("policy: %s, result code: %d ", policyName, code, 
message);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
new file mode 100644
index 0000000..baa489d
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+public class WorkItem {
+    public final PolicyDefinition def;
+    public final int requestParallelism;
+
+    public WorkItem(PolicyDefinition def, int workNum) {
+        this.def = def;
+        this.requestParallelism = workNum;
+    }
+
+    public String toString() {
+        return "policy name: " + def.getName() + "(" + requestParallelism + 
")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
new file mode 100644
index 0000000..a32b8fb
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy;
+import 
org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @since Apr 27, 2016
+ *
+ */
+public class WorkQueueBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WorkQueueBuilder.class);
+
+    private final IScheduleContext context;
+    private final TopologyMgmtService mgmtService;
+
+    public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService 
mgmtService) {
+        this.context = context;
+        this.mgmtService = mgmtService;
+    }
+
+    public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean 
isDedicated, int size,
+            Map<String, Object> properties) {
+        // FIXME: make extensible and configurable
+        IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, 
stream.getStreamGroup(), mgmtService);
+        List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, 
properties);
+        if (slots.size() < size) {
+            LOG.error("allocat stream work queue failed, required size");
+            return null;
+        }
+        StreamWorkSlotQueue queue = new 
StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
+                slots);
+        calculateGroupIndexAndCount(queue);
+        assignQueueSlots(stream, queue);// build reverse reference
+        stream.addQueues(queue);
+
+        return queue;
+    }
+
+    private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue 
queue) {
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            TopologyUsage u = 
context.getTopologyUsages().get(slot.getTopologyName());
+            AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId());
+            boltUsage.addQueue(stream.getStreamGroup(), queue);
+            u.addMonitoredStream(stream);
+        }
+    }
+
+    private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) {
+        Map<String, Integer> result = new HashMap<String, Integer>();
+        int total = 0;
+        for (WorkSlot slot : queue.getWorkingSlots()) {
+            if (result.containsKey(slot.getTopologyName())) {
+                continue;
+            }
+            result.put(slot.getTopologyName(), total);
+            total += 
context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt();
+        }
+
+        queue.setNumberOfGroupBolts(total);
+        queue.setTopoGroupStartIndex(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
new file mode 100644
index 0000000..28df3c4
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl.strategies;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+
+/**
+ * @since Apr 27, 2016
+ *
+ */
+public interface IWorkSlotStrategy {
+
+    List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, 
Object> properties);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
new file mode 100644
index 0000000..e755237
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl.strategies;
+
+import static 
org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.CoordinatorConstants;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * A simple strategy that only find the bolts in the same topology as the
+ * required work slots.
+ * 
+ * Invariant:<br/>
+ * One slot queue only on the one topology.<br/>
+ * One topology doesn't contains two same partition slot queues.
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class SameTopologySlotStrategy implements IWorkSlotStrategy {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SameTopologySlotStrategy.class);
+
+    private final IScheduleContext context;
+    private final StreamGroup partitionGroup;
+    private final TopologyMgmtService mgmtService;
+
+//    private final int numOfPoliciesBoundPerBolt;
+    private final double topoLoadUpbound;
+
+    public SameTopologySlotStrategy(IScheduleContext context, StreamGroup 
streamPartitionGroup,
+            TopologyMgmtService mgmtService) {
+        this.context = context;
+        this.partitionGroup = streamPartitionGroup;
+        this.mgmtService = mgmtService;
+
+        Config config = 
ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
+//        numOfPoliciesBoundPerBolt = 
config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+        topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
+    }
+
+    /**
+     * @param isDedicated
+     *            - not used yet!
+     */
+    @Override
+    public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, 
Map<String, Object> properties) {
+        Iterator<Topology> it = 
context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() 
>= size)
+                .iterator();
+        // priority strategy first???
+        List<WorkSlot> slots = new ArrayList<WorkSlot>();
+        while (it.hasNext()) {
+            Topology t = it.next();
+            if (getQueueOnTopology(size, slots, t)) {
+                break;
+            }
+        }
+
+        if (slots.size() == 0) {
+            int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
+            if (size > supportedSize) {
+                LOG.error("can not find available slots for queue, required 
size {}, supported size {} !", size, supportedSize);
+                return Collections.emptyList();
+            }
+            TopologyMeta topoMeta = mgmtService.creatTopology();
+            if (topoMeta == null) {
+                LOG.error("can not create topology for given queue 
requirement, required size {}, requried partition group {} !", size, 
partitionGroup);
+                return Collections.emptyList();
+            }
+
+            context.getTopologies().put(topoMeta.topologyId, 
topoMeta.topology);
+            context.getTopologyUsages().put(topoMeta.topologyId, 
topoMeta.usage);
+            boolean placed = getQueueOnTopology(size, slots, 
topoMeta.topology);
+            if (!placed) {
+                LOG.error("can not find available slots from new created 
topology, required size {}. This indicates an error !", size);
+            }
+        }
+        return slots;
+    }
+
+    private boolean getQueueOnTopology(int size, List<WorkSlot> slots, 
Topology t) {
+        TopologyUsage u = context.getTopologyUsages().get(t.getName());
+        if (!isTopologyAvailable(u)) {
+            return false;
+        }
+
+        List<String> bolts = new ArrayList<String>();
+        for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
+            if (isBoltAvailable(alertUsage)) {
+                bolts.add(alertUsage.getBoltId());
+            }
+
+            if (bolts.size() == size) {
+                break;
+            }
+        }
+
+        if (bolts.size() == size) {
+            for (String boltId : bolts) {
+                WorkSlot slot = new WorkSlot(t.getName(), boltId);
+                slots.add(slot);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isTopologyAvailable(TopologyUsage u) {
+//        for (MonitoredStream stream : u.getMonitoredStream()) {
+//            if (partition.equals(stream.getStreamParitition())) {
+//                return false;
+//            }
+//        }
+        if (u == null || u.getLoad() > topoLoadUpbound) {
+            return false;
+        }
+        
+        return true;
+    }
+
+    private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
+        // FIXME : more detail to compare on queue exclusion check
+        if (alertUsage.getQueueSize() > 0) {
+            return false;
+        }
+        // actually it's now 0;
+        return true;
+//        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
new file mode 100644
index 0000000..e9148f5
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class AlertBoltUsage {
+
+    private String boltId;
+    private List<String> policies = new ArrayList<String>();
+    // the stream partitions group that scheduled for this given alert bolt
+    private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
+    // the slot queue that scheduled for this given aler bolt
+    private List<StreamWorkSlotQueue> referQueues = new 
ArrayList<StreamWorkSlotQueue>();
+    private double load;
+
+    public AlertBoltUsage(String anid) {
+        this.boltId = anid;
+    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+    public List<String> getPolicies() {
+        return policies;
+    }
+
+    public void addPolicies(PolicyDefinition pd) {
+        policies.add(pd.getName());
+        // add first partition
+//        for (StreamPartition par : pd.getPartitionSpec()) {
+//            partitions.add(par);
+//        }
+    }
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+    public List<StreamGroup> getPartitions() {
+        return partitions;
+    }
+
+    public List<StreamWorkSlotQueue> getReferQueues() {
+        return referQueues;
+    }
+    
+    public int getQueueSize() {
+        return referQueues.size();
+    }
+
+    public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue 
queue) {
+        this.referQueues.add(queue);
+        this.partitions.add(streamPartition);
+    }
+
+    public void removeQueue(StreamWorkSlotQueue queue) {
+        this.referQueues.remove(queue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
new file mode 100644
index 0000000..86238d1
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class GroupBoltUsage {
+
+    private String boltId;
+    private double load;
+    
+    public GroupBoltUsage(String boltId) {
+        this.boltId = boltId;
+    }
+
+//    private final Set<String> streams = new HashSet<String>();
+//    private final Map<String, StreamFilter> filters = new HashMap<String, 
StreamFilter>();
+
+//    private final Map<String, List<StreamPartition>> groupByMeta;
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+//    public Set<String> getStreams() {
+//        return streams;
+//    }
+//
+//
+//    public Map<String, StreamFilter> getFilters() {
+//        return filters;
+//    }
+
+//    public Map<String, List<StreamPartition>> getGroupByMeta() {
+//        return groupByMeta;
+//    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
new file mode 100644
index 0000000..6eb6195
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+
+/**
+ * @since Mar 27, 2016
+ *
+ */
+public class TopologyUsage {
+    // topo info
+    private String topoName;
+    private final Set<String> datasources = new HashSet<String>();
+    // usage information
+    private final Set<String> policies = new HashSet<String>();
+    private final Map<String, AlertBoltUsage> alertUsages = new 
HashMap<String, AlertBoltUsage>();
+    private final Map<String, GroupBoltUsage> groupUsages = new 
HashMap<String, GroupBoltUsage>();
+    private final List<MonitoredStream> monitoredStream = new 
ArrayList<MonitoredStream>();
+
+    private double load;
+
+    /**
+     * This is to be the existing/previous meta-data. <br/>
+     * Only one group meta-data for all of the group bolts in this topology.
+     */
+
+    public TopologyUsage() {
+    }
+    
+    public TopologyUsage(String name) {
+        this.topoName = name;
+    }
+    
+    public String getTopoName() {
+        return topoName;
+    }
+
+    public void setTopoName(String topoId) {
+        this.topoName = topoId;
+    }
+
+    public Set<String> getDataSources() {
+        return datasources;
+    }
+
+    public Set<String> getPolicies() {
+        return policies;
+    }
+
+    public Map<String, AlertBoltUsage> getAlertUsages() {
+        return alertUsages;
+    }
+
+    public AlertBoltUsage getAlertBoltUsage(String boltId) {
+        return alertUsages.get(boltId);
+    }
+
+    public Map<String, GroupBoltUsage> getGroupUsages() {
+        return groupUsages;
+    }
+
+    public List<MonitoredStream> getMonitoredStream() {
+        return monitoredStream;
+    }
+
+    public void addMonitoredStream(MonitoredStream par) {
+        if (!this.monitoredStream.contains(par)) {
+            this.monitoredStream.add(par);
+        }
+    }
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
new file mode 100644
index 0000000..84a4061
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class InMemScheduleConext implements IScheduleContext {
+
+    private Map<String, Topology> topologies = new HashMap<String, Topology>();
+    private Map<String, TopologyUsage> usages = new HashMap<String, 
TopologyUsage>();
+    private Map<String, PolicyDefinition> policies = new HashMap<String, 
PolicyDefinition>();
+    private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, 
Kafka2TupleMetadata>();
+    private Map<String, PolicyAssignment> policyAssignments = new 
HashMap<String, PolicyAssignment>();
+    private Map<String, StreamDefinition> schemas = new HashMap<String, 
StreamDefinition>();
+    private Map<StreamGroup, MonitoredStream> monitoredStreams = new 
HashMap<StreamGroup, MonitoredStream>();
+    private Map<String, Publishment> publishments = new HashMap<String, 
Publishment>();
+
+    public InMemScheduleConext() {
+    }
+
+    public InMemScheduleConext(IScheduleContext context) {
+        this.topologies = new HashMap<String, 
Topology>(context.getTopologies());
+        this.usages = new HashMap<String, 
TopologyUsage>(context.getTopologyUsages());
+        this.policies = new HashMap<String, 
PolicyDefinition>(context.getPolicies());
+        this.datasources = new HashMap<String, 
Kafka2TupleMetadata>(context.getDataSourceMetadata());
+        this.policyAssignments = new HashMap<String, 
PolicyAssignment>(context.getPolicyAssignments());
+        this.schemas = new HashMap<String, 
StreamDefinition>(context.getStreamSchemas());
+        this.monitoredStreams = new HashMap<StreamGroup, 
MonitoredStream>(context.getMonitoredStreams());
+        this.publishments = new HashMap<String, 
Publishment>(context.getPublishments());
+    }
+
+    public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, 
PolicyAssignment> assignments,
+            Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, 
PolicyDefinition> policies2,
+            Map<String, Publishment> publishments2, Map<String, 
StreamDefinition> streamDefinitions,
+            Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, 
TopologyUsage> usages2) {
+        this.topologies = topologies2;
+        this.policyAssignments = assignments;
+        this.datasources = kafkaSources;
+        this.policies = policies2;
+        this.publishments = publishments2;
+        this.schemas = streamDefinitions;
+        this.monitoredStreams = monitoredStreamMap;
+        this.usages = usages2;
+    }
+
+    public Map<String, Topology> getTopologies() {
+        return topologies;
+    }
+
+    public void addTopology(Topology topo) {
+        topologies.put(topo.getName(), topo);
+    }
+
+    public Map<String, TopologyUsage> getTopologyUsages() {
+        return usages;
+    }
+
+    public void addTopologyUsages(TopologyUsage usage) {
+        usages.put(usage.getTopoName(), usage);
+    }
+
+    public Map<String, PolicyDefinition> getPolicies() {
+        return policies;
+    }
+
+    public void addPoilcy(PolicyDefinition pd) {
+        this.policies.put(pd.getName(), pd);
+    }
+
+    public Map<String, Kafka2TupleMetadata> getDatasources() {
+        return datasources;
+    }
+
+    public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) {
+        this.datasources = datasources;
+    }
+
+    public void addDataSource(Kafka2TupleMetadata dataSource) {
+        this.datasources.put(dataSource.getName(), dataSource);
+    }
+
+    @Override
+    public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() {
+        return datasources;
+    }
+
+    public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> 
policyAssignments) {
+        this.policyAssignments = policyAssignments;
+    }
+
+    public Map<String, PolicyAssignment> getPolicyAssignments() {
+        return this.policyAssignments;
+    }
+
+    @Override
+    public Map<String, StreamDefinition> getStreamSchemas() {
+        return schemas;
+    }
+
+    public void addSchema(StreamDefinition schema) {
+        this.schemas.put(schema.getStreamId(), schema);
+    }
+
+    public void setStreamSchemas(Map<String, StreamDefinition> schemas) {
+        this.schemas = schemas;
+    }
+
+    @Override
+    public Map<StreamGroup, MonitoredStream> getMonitoredStreams() {
+        return monitoredStreams;
+    }
+
+    @Override
+    public Map<String, Publishment> getPublishments() {
+        return publishments;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
new file mode 100644
index 0000000..d4d6c0c
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * FIXME: this class focus on correctness, not the efficiency now. There might
+ * be problem when metadata size grows too big.
+ * 
+ * @since May 3, 2016
+ *
+ */
+public class ScheduleContextBuilder {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ScheduleContextBuilder.class);
+    private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
+
+    private IMetadataServiceClient client;
+
+    private Map<String, Topology> topologies;
+    private Map<String, PolicyAssignment> assignments;
+    private Map<String, Kafka2TupleMetadata> kafkaSources;
+    private Map<String, PolicyDefinition> policies;
+    private Map<String, Publishment> publishments;
+    private Map<String, StreamDefinition> streamDefinitions;
+    private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
+    private Map<String, TopologyUsage> usages;
+
+    public ScheduleContextBuilder(Config config) {
+        client = new MetadataServiceClientImpl(config);
+    }
+
+    public ScheduleContextBuilder(IMetadataServiceClient client) {
+        this.client = client;
+    }
+
+    /**
+     * Built a shcedule context for metadata client service.
+     * 
+     * @return
+     */
+    public IScheduleContext buildContext() {
+        topologies = listToMap(client.listTopologies());
+        kafkaSources = listToMap(client.listDataSources());
+        policies = listToMap(client.listPolicies());
+        publishments = listToMap(client.listPublishment());
+        streamDefinitions = listToMap(client.listStreams());
+
+        // TODO: See ScheduleState comments on how to improve the storage
+        ScheduleState state = client.getVersionedSpec();
+        assignments = listToMap(state == null ? new 
ArrayList<PolicyAssignment>() : 
cleanupDeprecatedAssignments(state.getAssignments()));
+
+        monitoredStreamMap = listToMap(state == null ? new 
ArrayList<MonitoredStream>() : 
cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams()));
+
+        // build based on existing data
+        usages = buildTopologyUsage();
+
+        // copy to shedule context now
+        return new InMemScheduleConext(topologies, assignments, kafkaSources, 
policies, publishments,
+                streamDefinitions, monitoredStreamMap, usages);
+    }
+
+    /**
+     * 1.
+     * <pre>
+     * Check for deprecated policy stream group with its assigned monitored 
stream groups.
+     * If this is unmatched, we think the policy' stream group has been 
changed, remove the policy assignments
+     * If finally, no assignment refer to a given monitored stream, this 
monitored stream could be removed.
+     * Log when every time a remove happens.
+     * </pre>
+     * 2.
+     * <pre>
+     * if monitored stream's queue's is on non-existing topology, remove it.
+     * </pre>
+     * @param monitoredStreams
+     * @return
+     */
+    private List<MonitoredStream> 
cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) {
+        List<MonitoredStream> result = new 
ArrayList<MonitoredStream>(monitoredStreams);
+        
+        // clear deprecated streams
+        clearMonitoredStreams(monitoredStreams);
+
+        // build queueId-> streamGroup
+        Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, 
StreamGroup>();
+        for (MonitoredStream ms : result) {
+            for (StreamWorkSlotQueue q : ms.getQueues()) {
+                queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup());
+            }
+        }
+
+        // decide the assignment delete set
+        Set<StreamGroup> usedGroups = new HashSet<StreamGroup>();
+        Set<String> toRemove = new HashSet<String>();
+        // check if queue is still referenced by policy assignments
+        for (PolicyAssignment assignment : assignments.values()) {
+            PolicyDefinition def = policies.get(assignment.getPolicyName());
+            StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
+            if (group == null || !Objects.equals(group.getStreamPartitions(), 
def.getPartitionSpec())) {
+                LOG.warn(" policy assgiment {} 's policy group {} is different 
to the monitored stream's partition group {}, "
+                                + "this indicates a policy stream partition 
spec change, the assignment would be removed! ",
+                        assignment, def.getPartitionSpec(), group == null ? 
"'not found'" :group.getStreamPartitions());
+                toRemove.add(assignment.getPolicyName());
+            } else {
+                usedGroups.add(group);
+            }
+        }
+
+        // remove useless
+        assignments.keySet().removeAll(toRemove);
+        // remove non-referenced monitored streams
+        result.removeIf((t) -> {
+            boolean used = usedGroups.contains(t.getStreamGroup());
+            if (!used) {
+                LOG.warn("monitor stream with stream group {} is not 
referenced, "
+                        + "this monitored stream and its worker queu will be 
removed", t.getStreamGroup());
+            }
+            return !used; 
+        });
+
+        return result;
+    }
+
+    private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) 
{
+        Iterator<MonitoredStream> it = monitoredStreams.iterator();
+        while (it.hasNext()) {
+            MonitoredStream ms = it.next();
+            Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
+            // clean queue that underly topology is changed(removed/down)
+            while (queueIt.hasNext()) {
+                StreamWorkSlotQueue queue = queueIt.next();
+                boolean deprecated = false;
+                for (WorkSlot ws : queue.getWorkingSlots()) {
+                    // check if topology available or bolt available
+                    if (!topologies.containsKey(ws.topologyName)
+                            || 
!topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
+                        deprecated = true;
+                        break;
+                    }
+                }
+                if (deprecated) {
+                    queueIt.remove();
+                }
+            }
+
+            if (ms.getQueues().isEmpty()) {
+                it.remove();
+            }
+        }
+    }
+
+    private List<PolicyAssignment> 
cleanupDeprecatedAssignments(List<PolicyAssignment> list) {
+        List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
+        Iterator<PolicyAssignment> paIt = result.iterator();
+        while (paIt.hasNext()) {
+            PolicyAssignment assignment = paIt.next();
+            if (!policies.containsKey(assignment.getPolicyName())) {
+                LOG.info("Policy assignment {} 's policy not found, this 
assignment will be removed!", assignment);
+                paIt.remove();
+            }
+        }
+        return result;
+    }
+
+    private <T, K> Map<K, T> listToMap(List<T> collections) {
+        Map<K, T> maps = new HashMap<K, T>(collections.size());
+        for (T t : collections) {
+            maps.put(getKey(t), t);
+        }
+        return maps;
+    }
+
+    /*
+     * One drawback, once we add class, this code need to be changed!
+     */
+    @SuppressWarnings("unchecked")
+    private <T, K> K getKey(T t) {
+        if (t instanceof Topology) {
+            return (K) ((Topology) t).getName();
+        } else if (t instanceof PolicyAssignment) {
+            return (K) ((PolicyAssignment) t).getPolicyName();
+        } else if (t instanceof Kafka2TupleMetadata) {
+            return (K) ((Kafka2TupleMetadata) t).getName();
+        } else if (t instanceof PolicyDefinition) {
+            return (K) ((PolicyDefinition) t).getName();
+        } else if (t instanceof Publishment) {
+            return (K) ((Publishment) t).getName();
+        } else if (t instanceof StreamDefinition) {
+            return (K) ((StreamDefinition) t).getStreamId();
+        } else if (t instanceof MonitoredStream) {
+            return (K) ((MonitoredStream) t).getStreamGroup();
+        }
+        throw new RuntimeException("unexpected key class " + t.getClass());
+    }
+
+    private Map<String, TopologyUsage> buildTopologyUsage() {
+        Map<String, TopologyUsage> usages = new HashMap<String, 
TopologyUsage>();
+
+        // pre-build data structure for help
+        Map<String, Set<MonitoredStream>> topo2MonitorStream = new 
HashMap<String, Set<MonitoredStream>>();
+        Map<String, Set<String>> topo2Policies = new HashMap<String, 
Set<String>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<String>> bolt2Policies = new HashMap<String, 
Set<String>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, 
Set<StreamGroup>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<String>> bolt2QueueIds = new HashMap<String, 
Set<String>>();
+        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, 
StreamWorkSlotQueue>();
+
+        preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, 
bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
+        
+        for (Topology t : topologies.values()) {
+            TopologyUsage u = new TopologyUsage(t.getName());
+            // add group/bolt usages
+            for (String grpBolt : t.getGroupNodeIds()) {
+                GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt);
+                u.getGroupUsages().put(grpBolt, grpUsage);
+            }
+            for (String alertBolt : t.getAlertBoltIds()) {
+                String uniqueBoltId = String.format(UNIQUE_BOLT_ID, 
t.getName(), alertBolt);
+
+                AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt);
+                u.getAlertUsages().put(alertBolt, alertUsage);
+                // complete usage
+                addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, 
uniqueBoltId, alertUsage, queueMap);
+            }
+
+            // policy -- policy assignment
+            if (topo2Policies.containsKey(u.getTopoName())) {
+                u.getPolicies().addAll(topo2Policies.get(u.getTopoName()));
+            }
+
+            // data source
+            buildTopologyDataSource(u);
+
+            // topology usage monitored stream -- from monitored steams' queue 
slot item info
+            if (topo2MonitorStream.containsKey(u.getTopoName())) {
+                
u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName()));
+            }
+
+            usages.put(u.getTopoName(), u);
+        }
+
+        return usages;
+    }
+
+    private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
+            Map<String, Set<StreamGroup>> bolt2Partition, Map<String, 
Set<String>> bolt2QueueIds, String uniqueAlertBolt,
+            AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> 
queueMap) {
+        //
+        if (bolt2Policies.containsKey(uniqueAlertBolt)) {
+            
alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
+        }
+        //
+        if (bolt2Partition.containsKey(uniqueAlertBolt)) {
+            
alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt));
+        }
+        //
+        if (bolt2QueueIds.containsKey(uniqueAlertBolt)) {
+            for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) {
+                if (queueMap.containsKey(qId)) {
+                    alertUsage.getReferQueues().add(queueMap.get(qId));
+                } else {
+                    LOG.error(" queue id {} not found in queue map !", qId);
+                }
+            }
+        }
+    }
+
+    private void buildTopologyDataSource(TopologyUsage u) {
+        for (String policyName : u.getPolicies()) {
+            PolicyDefinition def = policies.get(policyName);
+            if (def != null) {
+                u.getDataSources().addAll(findDatasource(def));
+            } else {
+                LOG.error(" policy not find {}, but reference in topology 
usage {} !", policyName, u.getTopoName());
+            }
+        }
+    }
+
+    private List<String> findDatasource(PolicyDefinition def) {
+        List<String> result = new ArrayList<String>();
+        List<String> inputStreams = def.getInputStreams();
+        for (String is : inputStreams) {
+            StreamDefinition ss = this.streamDefinitions.get(is);
+            if (ss == null) {
+                LOG.error("policy {} referenced stream definition {} not found 
in definiton !", def.getName(), is);
+            } else {
+                result.add(ss.getDataSource());
+            }
+        }
+        return result;
+    }
+
+    private void preBuildQueue2TopoMap(
+            Map<String, Set<MonitoredStream>> topo2MonitorStream,
+            Map<String, Set<String>> topo2Policies, 
+            Map<String, Set<String>> bolt2Policies, 
+            Map<String, Set<StreamGroup>> bolt2Partition, 
+            Map<String, Set<String>> bolt2QueueIds,
+            Map<String, StreamWorkSlotQueue> queueMap) {
+        // pre-build structure
+        // why don't reuse the queue.getPolicies
+        Map<String, Set<String>> queue2Policies= new HashMap<String, 
Set<String>>();
+        for (PolicyAssignment pa : assignments.values()) {
+            if (!queue2Policies.containsKey(pa.getQueueId())) {
+                queue2Policies.put(pa.getQueueId(), new HashSet<String>());
+            }
+            queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName());
+        }
+
+        for (MonitoredStream stream : monitoredStreamMap.values()) {
+            for (StreamWorkSlotQueue q : stream.getQueues()) {
+                queueMap.put(q.getQueueId(), q);
+                Set<String> policiesOnQ = 
queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) 
: new HashSet<String>();
+                
+                for (WorkSlot slot : q.getWorkingSlots()) {
+                    // topo2monitoredstream
+                    if 
(!topo2MonitorStream.containsKey(slot.getTopologyName())) {
+                        topo2MonitorStream.put(slot.getTopologyName(), new 
HashSet<MonitoredStream>());
+                    }
+                    topo2MonitorStream.get(slot.getTopologyName()).add(stream);
+                    
+                    // topo2policy
+                    if (!topo2Policies.containsKey(slot.getTopologyName())) {
+                        topo2Policies.put(slot.getTopologyName(), new 
HashSet<String>());
+                    }
+                    
topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
+                    
+                    // bolt2Policy
+                    if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
+                        bolt2Policies.put(getUniqueBoltId(slot), new 
HashSet<String>());
+                    }
+                    
bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
+                    
+                    // bolt2Queue
+                    if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
+                        bolt2QueueIds.put(getUniqueBoltId(slot), new 
HashSet<String>());
+                    }
+                    
bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
+                    
+                    // bolt2Partition
+                    if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
+                        bolt2Partition.put(getUniqueBoltId(slot), new 
HashSet<StreamGroup>());
+                    }
+                    
bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup());
+                }
+            }
+        }
+    }
+
+    private String getUniqueBoltId(WorkSlot slot) {
+        return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), 
slot.getBoltId());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
new file mode 100644
index 0000000..6b8495e
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.resource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.utils.JsonUtils;
+
+/**
+ * This is to provide API access even we don't have ZK as intermediate access.
+ * FIXME : more elogant status code
+ * 
+ * @since Mar 24, 2016 <br/>
+ */
+@Path("/coordinator")
+@Produces({ "application/json" })
+public class CoordinatorResource {
+
+    // sprint config here?
+    private Coordinator alertCoordinator = new Coordinator();
+
+    @GET
+    @Path("/assignments")
+    public String getAssignments() throws Exception {
+        ScheduleState state = alertCoordinator.getState();
+        return JsonUtils.writeValueAsString(state);
+    }
+
+    @POST
+    @Path("/build")
+    public String build() throws Exception {
+        ScheduleOption option = new ScheduleOption();
+        ScheduleState state = alertCoordinator.schedule(option);
+        return JsonUtils.writeValueAsString(state);
+    }
+
+    /**
+     * Manually update the topology usages, for administration
+     * 
+     * @return
+     */
+    @POST
+    @Path("/refreshUsages")
+    public String refreshUsages() {
+        // TODO
+        return "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
new file mode 100644
index 0000000..15333da
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Poll policy change and notify listeners
+ */
+public class DynamicPolicyLoader implements Runnable{
+    private static Logger LOG = 
LoggerFactory.getLogger(DynamicPolicyLoader.class);
+
+    private IMetadataServiceClient client;
+    // initial cachedPolicies should be empty
+    private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
+    private List<PolicyChangeListener> listeners = new ArrayList<>();
+
+    public DynamicPolicyLoader(IMetadataServiceClient client){
+        this.client = client;
+    }
+
+    public synchronized void addPolicyChangeListener(PolicyChangeListener 
listener){
+        listeners.add(listener);
+    }
+
+    /**
+     * When it is run at the first time, due to cachedPolicies being empty, 
all existing policies are expected
+     * to be addedPolicies
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+        // we should catch every exception to avoid zombile thread
+        try {
+            List<PolicyDefinition> current = client.listPolicies();
+            Map<String, PolicyDefinition> currPolicies = new HashMap<>();
+            current.forEach(pe -> currPolicies.put(pe.getName(), pe));
+
+            Collection<String> addedPolicies = 
CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet());
+            Collection<String> removedPolicies = 
CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet());
+            Collection<String> potentiallyModifiedPolicies = 
CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet());
+
+            List<String> reallyModifiedPolicies = new ArrayList<>();
+            for (String updatedPolicy : potentiallyModifiedPolicies) {
+                if 
(!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) {
+                    reallyModifiedPolicies.add(updatedPolicy);
+                }
+            }
+
+            boolean policyChanged = false;
+            if (addedPolicies.size() != 0 ||
+                    removedPolicies.size() != 0 ||
+                    reallyModifiedPolicies.size() != 0) {
+                policyChanged = true;
+            }
+
+            if (!policyChanged) {
+                LOG.info("policy is not changed since last run");
+                return;
+            }
+            synchronized (this) {
+                for (PolicyChangeListener listener : listeners) {
+                    listener.onPolicyChange(current, addedPolicies, 
removedPolicies, reallyModifiedPolicies);
+                }
+            }
+
+            // reset cached policies
+            cachedPolicies = currPolicies;
+        } catch (Throwable t) {
+            LOG.error("error loading policy, but continue to run", t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
new file mode 100644
index 0000000..cd5265d
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limit
+ */
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+public interface PolicyChangeListener {
+    void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> 
addedPolicies, Collection<String> removedPolicies, Collection<String> 
modifiedPolicies);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf
new file mode 100644
index 0000000..23ee161
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/application.conf
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+       "coordinator" : {
+               "policiesPerBolt" : 5,
+               "boltParallelism" : 5,
+               "policyDefaultParallelism" : 5,
+               "boltLoadUpbound": 0.8,
+               "topologyLoadUpbound" : 0.8,
+               "numOfAlertBoltsPerTopology" : 5,
+               "zkConfig" : {
+                       "zkQuorum" : "localhost:2181",
+                       "zkRoot" : "/alert",
+                       "zkSessionTimeoutMs" : 10000,
+                       "connectionTimeoutMs" : 10000,
+                       "zkRetryTimes" : 3,
+                       "zkRetryInterval" : 3000
+               },
+               "metadataService" : {
+                       "host" : "localhost",
+                       "port" : 8080,
+                       "context" : "/api"
+               },
+               "metadataDynamicCheck" : {
+                       "initDelayMillis" : 1000,
+                       "delayMillis" : 30000
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties
new file mode 100644
index 0000000..d4bc126
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..1aa925e
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee";
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+                 http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd";
+           version="3.0">
+    <welcome-file-list>
+        <welcome-file>index.html</welcome-file>
+    </welcome-file-list>
+    <servlet>
+        <servlet-name>Jersey Web Application</servlet-name>
+        
<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>com.sun.jersey.config.property.packages</param-name>
+            
<param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value>
+        </init-param>
+        <init-param>
+            
<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+            
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+        </init-param>
+        <init-param>
+            
<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+            
<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+    <!-- Servlet for swagger initialization only, no URL mapping. -->
+       <servlet>
+               <servlet-name>swaggerConfig</servlet-name>
+               
<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+               <init-param>
+                       <param-name>api.version</param-name>
+                       <param-value>1.0.0</param-value>
+               </init-param>
+               <init-param>
+                       <param-name>swagger.api.basepath</param-name>
+                       <param-value>/api</param-value>
+               </init-param>
+               <load-on-startup>2</load-on-startup>
+       </servlet>
+
+    <servlet-mapping>
+        <servlet-name>Jersey Web Application</servlet-name>
+        <url-pattern>/api/*</url-pattern>
+    </servlet-mapping>
+    <filter>
+        <filter-name>CorsFilter</filter-name>
+        <!-- this should be replaced by tomcat ones, see also metadata 
resource -->
+        
<filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+        <init-param>
+            <param-name>cors.allowed.origins</param-name>
+            <param-value>*</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.headers</param-name>
+            <param-value>Authorization,Origin, No-Cache, X-Requested-With, 
If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, 
X-E4M-With, Accept</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.methods</param-name>
+            <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.support.credentials</param-name>
+            <param-value>true</param-value>
+        </init-param>
+    </filter>
+    <filter-mapping>
+        <filter-name>CorsFilter</filter-name>
+        <url-pattern>/*</url-pattern>
+    </filter-mapping>
+</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html
new file mode 100644
index 0000000..1c4ea76
--- /dev/null
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/webapp/index.html
@@ -0,0 +1,18 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+UMP Coordinator service!


Reply via email to