http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
deleted file mode 100644
index a9f9f39..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/PartitionedEventTimeOrderingComparator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.Comparator;
-import java.util.Objects;
-
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-
-/**
- * TODO: Stable sorting algorithm for better performance to avoid event 
resorting with same timestamp?
- */
-public class PartitionedEventTimeOrderingComparator implements 
Comparator<PartitionedEvent> {
-    public static final PartitionedEventTimeOrderingComparator INSTANCE = new 
PartitionedEventTimeOrderingComparator();
-
-    @Override
-    public int compare(PartitionedEvent o1, PartitionedEvent o2) {
-        if(Objects.equals(o1,o2)){
-            return 0;
-        }else {
-            if(o1 == null && o2 == null){
-                return 0;
-            }else if(o1 != null && o2 == null){
-                return 1;
-            }else if(o1 == null){
-                return -1;
-            }
-            // Unstable Sorting Algorithm
-            if(o1.getTimestamp() <= o2.getTimestamp()){
-                return -1;
-            } else {
-                return 1;
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
deleted file mode 100644
index 7be69e1..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortWindowHandlerImpl.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.io.IOException;
-
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.router.StreamSortHandler;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamSortWindowHandlerImpl implements StreamSortHandler {
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamSortWindowHandlerImpl.class);
-    private StreamWindowManager windowManager;
-    private StreamSortSpec streamSortSpecSpec;
-    private PartitionedEventCollector outputCollector;
-    private String streamId;
-
-    public void prepare(String streamId, StreamSortSpec streamSortSpecSpec, 
PartitionedEventCollector outputCollector) {
-        this.windowManager = new StreamWindowManagerImpl(
-                Period.parse(streamSortSpecSpec.getWindowPeriod()),
-                streamSortSpecSpec.getWindowMargin(),
-                PartitionedEventTimeOrderingComparator.INSTANCE,
-                outputCollector);
-        this.streamSortSpecSpec = streamSortSpecSpec;
-        this.streamId = streamId;
-        this.outputCollector = outputCollector;
-    }
-
-    /**
-     * Entry point to manage window lifecycle
-     *
-     * @param event StreamEvent
-     */
-    public void nextEvent(PartitionedEvent event) {
-        final long eventTime = event.getEvent().getTimestamp();
-        boolean handled = false;
-
-        synchronized (this.windowManager){
-            for (StreamWindow window : this.windowManager.getWindows()) {
-                if (window.alive() && window.add(event)) {
-                    handled = true;
-                }
-            }
-
-            // No window found for the event but not too late being rejected
-            if (!handled && !windowManager.reject(eventTime)) {
-                // later then all events, create later window
-                StreamWindow window = windowManager.addNewWindow(eventTime);
-                if (window.add(event)) {
-                    LOG.info("Created {} of {} at {}", window, this.streamId, 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(eventTime));
-                    handled = true;
-                }
-            }
-        }
-
-        if(!handled){
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Drop expired event {}", event);
-            }
-            outputCollector.drop(event);
-        }
-    }
-
-    @Override
-    public void onTick(StreamTimeClock clock,long globalSystemTime) {
-        windowManager.onTick(clock, globalSystemTime);
-    }
-
-    @Override
-    public void close() {
-        try {
-            windowManager.close();
-        } catch (IOException e) {
-            LOG.error("Got exception while closing window manager",e);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return super.toString();
-    }
-
-    @Override
-    public int hashCode(){
-        if(streamSortSpecSpec == null){
-            throw new NullPointerException("streamSortSpec is null");
-        }else{
-            return streamSortSpecSpec.hashCode();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
deleted file mode 100644
index c1c289d..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowInMapDB.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-import org.mapdb.BTreeMap;
-import org.mapdb.DB;
-import org.mapdb.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * StreamSortedWindow based on MapDB to support off-heap or disk storage.
- *
- * Stable sorting algorithm
- *
- * <br/><br/>
- *
- * See <a href="http://www.mapdb.org";>http://www.mapdb.org</a>
- */
-public class StreamSortedWindowInMapDB extends BaseStreamWindow {
-    private final String mapId;
-    private BTreeMap<Long, PartitionedEvent[]> bTreeMap;
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamSortedWindowInMapDB.class);
-    private final AtomicInteger size;
-    private  long replaceOpCount = 0;
-    private final static PartitionedEventGroupSerializer 
STREAM_EVENT_GROUP_SERIALIZER = new PartitionedEventGroupSerializer();
-
-    /**
-     * @param start
-     * @param end
-     * @param margin
-     * @param db
-     * @param mapId physical map id, used to decide whether to reuse or not
-     */
-    @SuppressWarnings("unused")
-    public StreamSortedWindowInMapDB(long start, long end, long margin,DB 
db,String mapId) {
-        super(start, end, margin);
-        this.mapId = mapId;
-        try {
-            bTreeMap = db.<Long, StreamEvent>treeMap(mapId).
-                    keySerializer(Serializer.LONG).
-                    valueSerializer(STREAM_EVENT_GROUP_SERIALIZER).
-                    createOrOpen();
-            LOG.debug("Created BTree map {}",mapId);
-        } catch (Error error){
-            LOG.info("Failed create BTree {}",mapId,error);
-        }
-        size = new AtomicInteger(0);
-    }
-
-    /**
-     * Assumed: most of adding operation will do putting only and few require 
replacing.
-     *
-     * <ol>
- *     <li>
-     * First of all, always try to put with created event directly
-     * </li>
-     * <li>
-     * If not absent (key already exists), then append and replace,
-     * replace operation will cause more consumption
-     * </li>
-     * </ol>
-     * @param event coming-in event
-     * @return whether success
-     */
-    @Override
-    public synchronized boolean add(PartitionedEvent event) {
-        long timestamp = event.getEvent().getTimestamp();
-        if(accept(timestamp)) {
-            boolean absent = bTreeMap.putIfAbsentBoolean(timestamp, new 
PartitionedEvent[]{event});
-            if (!absent) {
-                size.incrementAndGet();
-                return true;
-            } else {
-                if(LOG.isDebugEnabled()) LOG.debug("Duplicated timestamp {}, 
will reduce performance as replacing",timestamp);
-                PartitionedEvent[] oldValue = bTreeMap.get(timestamp);
-                PartitionedEvent[] newValue = oldValue == null ? new 
PartitionedEvent[1]: Arrays.copyOf(oldValue,oldValue.length+1);
-                newValue[newValue.length-1] = event;
-                PartitionedEvent[] removedValue = 
bTreeMap.replace(timestamp,newValue);
-                replaceOpCount ++;
-                if(replaceOpCount % 1000 == 0){
-                    LOG.warn("Too many events ({}) with overlap timestamp, may 
reduce insertion performance",replaceOpCount);
-                }
-                if(removedValue!=null) {
-                    size.incrementAndGet();
-                } else {
-                    throw new IllegalStateException("Failed to replace key 
"+timestamp+" with "+newValue.length+" entities array to replace old 
"+oldValue.length+" entities array");
-                }
-                return true;
-            }
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    protected synchronized void flush(PartitionedEventCollector collector) {
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        bTreeMap.valueIterator().forEachRemaining((events)->{
-            for(PartitionedEvent event:events){
-                collector.emit(event);
-            }
-        });
-        bTreeMap.clear();
-        replaceOpCount = 0;
-        stopWatch.stop();
-        LOG.info("Flushed {} events in {} ms", size, stopWatch.getTime());
-        size.set(0);
-    }
-
-    @Override
-    public synchronized void close(){
-        super.close();
-        bTreeMap.close();
-        LOG.info("Closed {}",this.mapId);
-    }
-
-    @Override
-    public synchronized int size() {
-        return size.get();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
deleted file mode 100644
index d3b1d7d..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamSortedWindowOnHeap.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.Comparator;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.BaseStreamWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.TreeMultiset;
-
-public class StreamSortedWindowOnHeap extends BaseStreamWindow {
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamSortedWindowOnHeap.class);
-    private final TreeMultiset<PartitionedEvent> treeMultisetCache;
-
-    /**
-     * @param start start time
-     * @param end end time
-     * @param margin margin time
-     */
-    public StreamSortedWindowOnHeap(long start, long end, long margin, 
Comparator<PartitionedEvent> comparator ){
-        super(start,end,margin);
-        treeMultisetCache = TreeMultiset.create(comparator);
-    }
-
-    public StreamSortedWindowOnHeap(long start, long end, long margin){
-        this(start,end,margin,new PartitionedEventTimeOrderingComparator());
-    }
-
-    @Override
-    public boolean add(PartitionedEvent partitionedEvent) {
-        synchronized (treeMultisetCache) {
-            if (accept(partitionedEvent.getEvent().getTimestamp())) {
-                treeMultisetCache.add(partitionedEvent);
-                return true;
-            } else {
-                if(LOG.isDebugEnabled()) LOG.debug("{} is not acceptable, 
ignored", partitionedEvent);
-                return false;
-            }
-        }
-    }
-
-    @Override
-    protected void flush(PartitionedEventCollector collector) {
-        synchronized (treeMultisetCache) {
-            StopWatch stopWatch = new StopWatch();
-            stopWatch.start();
-            treeMultisetCache.forEach(collector::emit);
-            int size = treeMultisetCache.size();
-            treeMultisetCache.clear();
-            stopWatch.stop();
-            LOG.info("Flushed {} events in {} ms from {}", size, 
stopWatch.getTime(),this.toString());
-        }
-    }
-
-    @Override
-    public int size() {
-        synchronized (treeMultisetCache) {
-            return treeMultisetCache.size();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
deleted file mode 100644
index 91a4a37..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockInLocalMemory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-
-
-/**
- * In memory thread-safe time clock service
- *
- * TODO: maybe need to synchronize time clock globally, how to?
- */
-public class StreamTimeClockInLocalMemory implements StreamTimeClock {
-    private final AtomicLong currentTime;
-    private final String streamId;
-
-    public StreamTimeClockInLocalMemory(String streamId,long initialTime){
-        this.streamId = streamId;
-        this.currentTime = new AtomicLong(initialTime);
-    }
-    public StreamTimeClockInLocalMemory(String streamId){
-        this(streamId,0L);
-    }
-
-    @Override
-    public void moveForward(long timestamp){
-        if(timestamp < currentTime.get()){
-            throw new IllegalArgumentException(timestamp +" < 
"+currentTime.get()+", should not move time back");
-        }
-        this.currentTime.set(timestamp);
-    }
-
-    @Override
-    public String getStreamId() {
-        return streamId;
-    }
-
-    @Override
-    public long getTime() {
-        return currentTime.get();
-    }
-
-    @Override
-    public String toString() {
-        return String.format("StreamClock[streamId=%s, now=%s]",streamId, 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(currentTime.get()));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
deleted file mode 100644
index 741d7f0..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamTimeClockManagerImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockListener;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClockManager;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class StreamTimeClockManagerImpl implements 
StreamTimeClockManager {
-    private static final long serialVersionUID = -2770823821511195343L;
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamTimeClockManagerImpl.class);
-    private final Map<String,StreamTimeClock> streamIdTimeClockMap;
-    private Timer timer;
-
-    private final Map<StreamTimeClockListener,String> listenerStreamIdMap;
-    private final static AtomicInteger num = new AtomicInteger();
-
-    public StreamTimeClockManagerImpl(){
-        listenerStreamIdMap = new HashMap<>();
-        streamIdTimeClockMap = new HashMap<>();
-        timer = new Timer("StreamScheduler-"+num.getAndIncrement());
-        timer.schedule(new TimerTask() {
-            @Override
-            public void run() {
-                // Make sure the timer tick happens one by one
-                    triggerTickOnAll();
-            }
-        },1000,1000);
-    }
-
-    /**
-     *
-     * By default, we could keep the current time clock in memory,
-     * Eventually we may need to consider the global time synchronization 
across all nodes
-     *
-     * 1) When to initialize window according to start time
-     * 2) When to close expired window according to current time
-     *
-     * @return StreamTimeClock instance
-     */
-    @Override
-    public StreamTimeClock createStreamTimeClock(String streamId){
-        synchronized (streamIdTimeClockMap) {
-            if (!streamIdTimeClockMap.containsKey(streamId)) {
-                StreamTimeClock instance = new 
StreamTimeClockInLocalMemory(streamId);
-                LOG.info("Created {}", instance);
-                streamIdTimeClockMap.put(streamId, instance);
-            } else {
-                LOG.warn("TimeClock for stream already existss: 
"+streamIdTimeClockMap.get(streamId));
-            }
-            return streamIdTimeClockMap.get(streamId);
-        }
-    }
-
-    /**
-     * @param streamId
-     * @return
-     */
-    @Override
-    public StreamTimeClock getStreamTimeClock(String streamId) {
-        synchronized (streamIdTimeClockMap) {
-            if (!streamIdTimeClockMap.containsKey(streamId)) {
-                LOG.warn("TimeClock for stream {} is not initialized before 
being called, create now", streamId);
-                return createStreamTimeClock(streamId);
-            }
-            return streamIdTimeClockMap.get(streamId);
-        }
-    }
-
-    /**
-     * @param streamId
-     */
-    @Override
-    public void removeStreamTimeClock(String streamId){
-        synchronized (streamIdTimeClockMap) {
-            if (streamIdTimeClockMap.containsKey(streamId)) {
-                streamIdTimeClockMap.remove(streamId);
-                LOG.info("Removed TimeClock for stream {}: {}", streamId, 
streamIdTimeClockMap.get(streamId));
-            } else {
-                LOG.warn("No TimeClock found for stream {}, nothing to 
remove", streamId);
-            }
-        }
-    }
-
-    @Override
-    public void registerListener(String streamId,StreamTimeClockListener 
listener) {
-        synchronized (listenerStreamIdMap) {
-            if (listenerStreamIdMap.containsKey(listener))
-                throw new IllegalArgumentException("Duplicated listener: " + 
listener.toString());
-            LOG.info("Register {} on {}", listener, streamId);
-            listenerStreamIdMap.put(listener, streamId);
-        }
-    }
-
-    @Override
-    public void registerListener(StreamTimeClock streamClock, 
StreamTimeClockListener listener) {
-        registerListener(streamClock.getStreamId(),listener);
-    }
-
-    @Override
-    public void removeListener(StreamTimeClockListener listener) {
-        listenerStreamIdMap.remove(listener);
-    }
-
-    @Override
-    public synchronized void triggerTickOn(String streamId) {
-        int count = 0;
-        for (Map.Entry<StreamTimeClockListener, String> entry : 
listenerStreamIdMap.entrySet()) {
-            if (entry.getValue().equals(streamId)) {
-                entry.getKey().onTick(streamIdTimeClockMap.get(streamId), 
getCurrentSystemTime());
-                count++;
-            }
-        }
-        if (LOG.isDebugEnabled()) LOG.debug("Triggered {} time-clock listeners 
on stream {}", count, streamId);
-    }
-
-    private static long getCurrentSystemTime(){
-        return System.currentTimeMillis();
-    }
-
-    @Override
-    public void onTimeUpdate(String streamId, long timestamp) {
-        StreamTimeClock timeClock = getStreamTimeClock(streamId);
-        if(timeClock == null)
-            return;
-        // Trigger time clock only when time moves forward
-        if(timestamp >= timeClock.getTime()) {
-            timeClock.moveForward(timestamp);
-            if(LOG.isDebugEnabled()) LOG.debug("Tick on stream {} with latest 
time {}",streamId, 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(timeClock.getTime()));
-            triggerTickOn(streamId);
-        }
-    }
-
-    private void triggerTickOnAll(){
-        synchronized (listenerStreamIdMap) {
-            for (Map.Entry<StreamTimeClockListener, String> entry : 
listenerStreamIdMap.entrySet()) {
-                triggerTickOn(entry.getValue());
-            }
-        }
-    }
-
-    @Override
-    public void close() {
-        timer.cancel();
-        triggerTickOnAll();
-        LOG.info("Closed StreamTimeClockManager {}",this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
deleted file mode 100644
index 4e1212f..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/sorter/impl/StreamWindowManagerImpl.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.sorter.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.eagle.alert.engine.PartitionedEventCollector;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.sorter.StreamTimeClock;
-import org.apache.eagle.alert.engine.sorter.StreamWindow;
-import org.apache.eagle.alert.engine.sorter.StreamWindowManager;
-import org.apache.eagle.alert.engine.sorter.StreamWindowRepository;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamWindowManagerImpl implements StreamWindowManager {
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamWindowManagerImpl.class);
-    private final TreeMap<Long,StreamWindow> windowBuckets;
-    private final PartitionedEventCollector collector;
-    private final Period windowPeriod;
-    private final long windowMargin;
-    @SuppressWarnings("unused")
-    private final Comparator<PartitionedEvent> comparator;
-    private long rejectTime;
-
-    public StreamWindowManagerImpl(Period windowPeriod, long windowMargin, 
Comparator<PartitionedEvent> comparator, PartitionedEventCollector collector){
-        this.windowBuckets = new TreeMap<>();
-        this.windowPeriod = windowPeriod;
-        this.windowMargin = windowMargin;
-        this.collector = collector;
-        this.comparator = comparator;
-    }
-
-    @Override
-    public StreamWindow addNewWindow(long initialTime) {
-        synchronized (windowBuckets) {
-            if (!reject(initialTime)) {
-                Long windowStartTime = 
TimePeriodUtils.formatMillisecondsByPeriod(initialTime, windowPeriod);
-                Long windowEndTime = windowStartTime + 
TimePeriodUtils.getMillisecondsOfPeriod(windowPeriod);
-                StreamWindow window = 
StreamWindowRepository.getSingletonInstance().createWindow(windowStartTime, 
windowEndTime, windowMargin);
-                window.register(collector);
-                addWindow(window);
-                return window;
-            } else {
-                throw new IllegalStateException("Failed to create new window, 
as " + DateTimeUtil.millisecondsToHumanDateWithMilliseconds(initialTime) + " is 
too late, only allow timestamp after " + 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(rejectTime));
-            }
-        }
-    }
-
-    private void addWindow(StreamWindow window){
-        if (!windowBuckets.containsKey(window.startTime())) {
-            windowBuckets.put(window.startTime(), window);
-        } else {
-            throw new IllegalArgumentException("Duplicated " + 
window.toString());
-        }
-    }
-
-    @Override
-    public void removeWindow(StreamWindow window) {
-        synchronized (windowBuckets) {
-            windowBuckets.remove(window.startTime());
-        }
-    }
-
-    @Override
-    public boolean hasWindow(StreamWindow window) {
-        synchronized (windowBuckets) {
-            return windowBuckets.containsKey(window.startTime());
-        }
-    }
-
-    @Override
-    public boolean hasWindowFor(long timestamp) {
-        return getWindowFor(timestamp) != null;
-    }
-
-    @Override
-    public Collection<StreamWindow> getWindows() {
-        synchronized (windowBuckets) {
-            return windowBuckets.values();
-        }
-    }
-
-    @Override
-    public StreamWindow getWindowFor(long timestamp) {
-        synchronized (windowBuckets) {
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                if (timestamp >= windowBucket.startTime() && timestamp < 
windowBucket.endTime()) {
-                    return windowBucket;
-                }
-            }
-            return null;
-        }
-    }
-
-    @Override
-    public boolean reject(long timestamp) {
-        return timestamp < rejectTime;
-    }
-
-    @Override
-    public void onTick(StreamTimeClock clock,long globalSystemTime) {
-        synchronized (windowBuckets) {
-            List<StreamWindow> toRemoved = new ArrayList<>();
-            List<StreamWindow> aliveWindow = new ArrayList<>();
-
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                windowBucket.onTick(clock, globalSystemTime);
-                if (windowBucket.rejectTime() > rejectTime) rejectTime = 
windowBucket.rejectTime();
-            }
-            for (StreamWindow windowBucket : windowBuckets.values()) {
-                if (windowBucket.expired() || windowBucket.endTime() 
<=rejectTime) {
-                    toRemoved.add(windowBucket);
-                } else {
-                    aliveWindow.add(windowBucket);
-                }
-            }
-            toRemoved.forEach(this::CloseAndRemoveWindow);
-            if (toRemoved.size() > 0) LOG.info("Windows: {} alive = {}, {} 
expired = {}", aliveWindow.size(), aliveWindow, toRemoved.size(), toRemoved);
-        }
-    }
-
-    private void CloseAndRemoveWindow(StreamWindow windowBucket){
-        StopWatch stopWatch = new StopWatch();
-        stopWatch.start();
-        CloseWindow(windowBucket);
-        removeWindow(windowBucket);
-        stopWatch.stop();
-        LOG.info("Removed {} in {} ms",windowBucket,stopWatch.getTime());
-    }
-
-    private void CloseWindow(StreamWindow windowBucket){
-        windowBucket.close();
-    }
-
-    public void close() {
-        synchronized (windowBuckets) {
-            LOG.debug("Closing");
-            StopWatch stopWatch = new StopWatch();
-            stopWatch.start();
-            int count = 0;
-            for (StreamWindow windowBucket : getWindows()) {
-                count++;
-                CloseWindow(windowBucket);
-            }
-            windowBuckets.clear();
-            stopWatch.stop();
-            LOG.info("Closed {} windows in {} ms", count, stopWatch.getTime());
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
deleted file mode 100644
index cd23405..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.IMetadataChangeNotifyService;
-import org.apache.eagle.alert.engine.coordinator.MetadataType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.router.SpoutSpecListener;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import 
org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.engine.serialization.Serializers;
-import org.apache.eagle.alert.utils.AlertConstants;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpoutMetric;
-import storm.kafka.KafkaSpoutWrapper;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-
-import com.typesafe.config.Config;
-
-/**
- * wrap KafkaSpout to provide parallel processing of messages for multiple 
Kafka topics
- *
- * 1. onNewConfig() is interface for outside to update new metadata. Upon new 
metadata, this class will calculate if there is any new topic, removed topic or
- *    updated topic
- *
- */
-public class CorrelationSpout extends BaseRichSpout implements 
SpoutSpecListener,SerializationMetadataProvider {
-    private static final long serialVersionUID = -5280723341236671580L;
-    private static final Logger LOG  = 
LoggerFactory.getLogger(CorrelationSpout.class);
-
-    public static final String DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT = 
"/consumers";
-    public static final String 
DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH = "/eagle_consumer";
-
-    // topic to KafkaSpoutWrapper
-    private volatile Map<String, KafkaSpoutWrapper> kafkaSpoutList = new 
HashMap<>();
-    private int numOfRouterBolts;
-
-    private SpoutSpec cachedSpoutSpec;
-
-    private transient KafkaSpoutMetric kafkaSpoutMetric;
-
-    @SuppressWarnings("rawtypes")
-    private Map conf;
-    private TopologyContext context;
-    private SpoutOutputCollector collector;
-    private final Config config;
-    private String topologyId;
-    private String spoutName;
-    private String routeBoltName;
-    @SuppressWarnings("unused")
-    private int taskIndex;
-    private IMetadataChangeNotifyService changeNotifyService;
-    private PartitionedEventSerializer serializer;
-    private volatile Map<String, StreamDefinition> sds;
-
-    /**
-     * FIXME one single changeNotifyService may have issues as possibly 
multiple spout tasks will register themselves and initialize service
-     * @param config
-     * @param topologyId
-     * @param changeNotifyService
-     * @param numOfRouterBolts
-     */
-    public CorrelationSpout(Config config, String topologyId, 
IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts){
-        this(config, topologyId, changeNotifyService, numOfRouterBolts, 
AlertConstants.DEFAULT_SPOUT_NAME, AlertConstants.DEFAULT_ROUTERBOLT_NAME);
-    }
-    /**
-     *
-     * @param config
-     * @param topologyId used for distinguishing kafka offset for different 
topologies
-     * @param numOfRouterBolts used for generating streamId and routing
-     * @param spoutName used for generating streamId between spout and router 
bolt
-     * @param routerBoltName used for generating streamId between spout and 
router bolt
-     */
-    public CorrelationSpout(Config config, String topologyId, 
IMetadataChangeNotifyService changeNotifyService, int numOfRouterBolts, String 
spoutName, String routerBoltName){
-        this.config = config;
-        this.topologyId = topologyId;
-        this.changeNotifyService = changeNotifyService;
-        this.numOfRouterBolts = numOfRouterBolts;
-        this.spoutName = spoutName;
-        this.routeBoltName = routerBoltName;
-    }
-
-    public String getSpoutName(){
-        return spoutName;
-    }
-
-    public String getRouteBoltName(){
-        return routeBoltName;
-    }
-
-    /**
-     * the only output field is for StreamEvent
-     * @param declarer
-     */
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (int i = 0; i < numOfRouterBolts; i++) {
-            String streamId = 
StreamIdConversion.generateStreamIdBetween(spoutName, routeBoltName + i);
-            declarer.declareStream(streamId, new 
Fields(AlertConstants.FIELD_0));
-            LOG.info("declare stream between spout and streamRouterBolt " + 
streamId);
-        }
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("open method invoked");
-        }
-        this.conf = conf;
-        this.context = context;
-        this.collector = collector;
-        this.taskIndex = context.getThisTaskIndex();
-
-        // initialize an empty SpoutSpec
-        cachedSpoutSpec = new SpoutSpec(topologyId, new HashMap<>(), new 
HashMap<>(), new HashMap<>());
-
-        changeNotifyService.registerListener(this);
-        changeNotifyService.init(config, MetadataType.SPOUT);
-
-        // register KafkaSpout metric
-        kafkaSpoutMetric = new KafkaSpoutMetric();
-        context.registerMetric("kafkaSpout", kafkaSpoutMetric, 60);
-
-        this.serializer = Serializers.newPartitionedEventSerializer(this);
-    }
-
-    @Override
-    public void onSpoutSpecChange(SpoutSpec spec, Map<String, 
StreamDefinition> sds) {
-        LOG.info("new metadata is updated " + spec);
-        try{
-            onReload(spec, sds);
-        }catch(Exception ex){
-            LOG.error("error applying new SpoutSpec", ex);
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.nextTuple();
-        }
-    }
-
-    /**
-     * find the correct wrapper to do ack that means msgId should be mapped to
-     * wrapper
-     *
-     * @param msgId
-     */
-    @Override
-    public void ack(Object msgId) {
-        // decode and get topic
-        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
-        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        spout.ack(id.id);
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        // decode and get topic
-        KafkaMessageIdWrapper id = (KafkaMessageIdWrapper) msgId;
-        LOG.error("Failing message {}, with topic {}", msgId, id.topic);
-        KafkaSpoutWrapper spout = kafkaSpoutList.get(id.topic);
-        spout.fail(id.id);
-    }
-
-    @Override
-    public void deactivate() {
-        System.out.println("deactivate");
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.deactivate();
-        }
-    }
-
-    @Override
-    public void close() {
-        System.out.println("close");
-        for (KafkaSpoutWrapper wrapper : kafkaSpoutList.values()) {
-            wrapper.close();
-        }
-    }
-    
-    private List<String> getTopics(SpoutSpec spoutSpec) {
-        List<String> meta = new ArrayList<String>();
-        for (Kafka2TupleMetadata entry : 
spoutSpec.getKafka2TupleMetadataMap().values()) {
-            meta.add(entry.getTopic());
-        }
-        return meta;
-    }
-
-    @SuppressWarnings("unchecked")
-    public void onReload(final SpoutSpec newMeta, Map<String, 
StreamDefinition> sds) throws Exception {
-        // calculate topic create/remove/update
-        List<String> topics = getTopics(newMeta);
-        List<String> cachedTopcies = getTopics(cachedSpoutSpec);
-        Collection<String> newTopics = CollectionUtils.subtract(topics, 
cachedTopcies);
-        Collection<String> removeTopics = 
CollectionUtils.subtract(cachedTopcies, topics);
-        Collection<String> updateTopics = CollectionUtils.intersection(topics, 
cachedTopcies);
-
-        LOG.info("Topics were added={}, removed={}, modified={}", newTopics, 
removeTopics, updateTopics);
-
-        // build lookup table for scheme
-        Map<String, String> newSchemaName = new HashMap<String, String>();
-        for (Kafka2TupleMetadata ds : 
newMeta.getKafka2TupleMetadataMap().values()) {
-            newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
-        }
-
-        // copy and swap
-        Map<String, KafkaSpoutWrapper> newKafkaSpoutList = new 
HashMap<>(this.kafkaSpoutList);
-        // iterate new topics and then create KafkaSpout
-        for(String topic : newTopics){
-            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
-            if (wrapper != null) {
-                LOG.warn(MessageFormat.format("try to create new topic {0}, 
but found in the active spout list, this may indicate some inconsistency", 
topic));
-                continue;
-            }
-            KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, 
collector, topic, newSchemaName.get(topic), newMeta, sds);
-            newKafkaSpoutList.put(topic, newWrapper);
-        }
-        // iterate remove topics and then close KafkaSpout
-        for(String topic : removeTopics){
-            KafkaSpoutWrapper wrapper = newKafkaSpoutList.get(topic);
-            if (wrapper == null) {
-                LOG.warn(MessageFormat.format("try to remove topic {0}, but 
not found in the active spout list, this may indicate some inconsistency", 
topic));
-                continue;
-            }
-            removeKafkaSpout(wrapper);
-            newKafkaSpoutList.remove(topic);
-        }
-
-        // iterate update topic and then update metadata
-        for(String topic : updateTopics){
-            KafkaSpoutWrapper spoutWrapper = newKafkaSpoutList.get(topic);
-            if (spoutWrapper == null) {
-                LOG.warn(MessageFormat.format("try to update topic {0}, but 
not found in the active spout list, this may indicate some inconsistency", 
topic));
-                continue;
-            }
-            spoutWrapper.update(newMeta, sds);
-        }
-
-        // swap
-        this.cachedSpoutSpec = newMeta;
-        this.kafkaSpoutList = newKafkaSpoutList;
-        this.sds = sds;
-    }
-
-    /**
-     * make this method protected to make sure unit test can work well
-     * Q: Where to persist consumer state, i.e. what offset has been consumed 
for each topic and partition
-     * A: stormKafkaTransactionZkPath + "/" + consumerId + "/" + topic + "/" + 
topologyId + "/" + partitionId
-     * Note1: PartitionManager.committedPath for composing zkState path,  
_spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
-     * consumerId by default is EagleConsumer unless it is specified by 
"stormKafkaEagleConsumer"
-     * Note2: put topologyId as part of zkState because one topic by design 
can be consumed by multiple topologies so one topology needs to know
-     * processed offset for itself
-     *
-     * TODO: Should avoid use Config.get in deep calling stack, should 
generate config bean as early as possible
-     *
-     * @param conf
-     * @param context
-     * @param collector
-     * @param topic
-     * @param spoutSpec
-     * @return
-     */
-    @SuppressWarnings("rawtypes")
-    protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext 
context, SpoutOutputCollector collector, final String topic,
-                                                 String schemeClsName, 
SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception{
-        String kafkaBrokerZkQuorum = 
config.getString("spout.kafkaBrokerZkQuorum");
-        BrokerHosts hosts = new ZkHosts(kafkaBrokerZkQuorum);
-        String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
-        if(config.hasPath("spout.stormKafkaTransactionZkPath")) {
-            transactionZkRoot = 
config.getString("spout.stormKafkaTransactionZkPath");
-        }
-        // write partition offset etc. into zkRoot+id, see 
PartitionManager.committedPath
-        String zkStateTransactionRelPath = 
DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
-        if(config.hasPath("spout.stormKafkaEagleConsumer")){
-            zkStateTransactionRelPath = 
config.getString("spout.stormKafkaEagleConsumer");
-        }
-        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, 
transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
-        // transaction zkServers
-        boolean stormKafkaUseSameZkQuorumWithKafkaBroker = 
config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
-        if(stormKafkaUseSameZkQuorumWithKafkaBroker){
-            ZkServerPortUtils utils = new 
ZkServerPortUtils(kafkaBrokerZkQuorum);
-            spoutConfig.zkServers = utils.getZkHosts();
-            spoutConfig.zkPort = utils.getZkPort();
-        }else{
-            ZkServerPortUtils utils = new 
ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum"));
-            spoutConfig.zkServers = utils.getZkHosts();
-            spoutConfig.zkPort = utils.getZkPort();
-        }
-        // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = 
config.getLong("spout.stormKafkaStateUpdateIntervalMs");
-        // Kafka fetch size
-        spoutConfig.fetchSizeBytes = 
config.getInt("spout.stormKafkaFetchSizeBytes");
-        // "startOffsetTime" is for test usage, prod should not use this
-        if (config.hasPath("spout.stormKafkaStartOffsetTime")) {
-            spoutConfig.startOffsetTime = 
config.getInt("spout.stormKafkaStartOffsetTime");
-        }
-
-        spoutConfig.scheme = new 
SchemeAsMultiScheme(SchemeBuilder.buildFromClsName(schemeClsName, topic));
-        KafkaSpoutWrapper wrapper = new KafkaSpoutWrapper(spoutConfig, 
kafkaSpoutMetric);
-        SpoutOutputCollectorWrapper collectorWrapper = new 
SpoutOutputCollectorWrapper(this, collector, topic, spoutSpec, 
numOfRouterBolts, sds,this.serializer);
-        wrapper.open(conf, context, collectorWrapper);
-        return wrapper;
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return sds.get(streamId);
-    }
-
-    /**
-     * utility to get list of zkServers and zkPort.(It is assumed that zkPort 
is same for all zkServers as storm-kafka library requires this though it is not 
efficient)
-     */
-    private static class ZkServerPortUtils{
-        private List<String> zkHosts = new ArrayList<>();
-        private Integer zkPort;
-        public ZkServerPortUtils(String zkQuorum){
-            String[] zkConnections = zkQuorum.split(",");
-            for (String zkConnection : zkConnections) {
-                zkHosts.add(zkConnection.split(":")[0]);
-            }
-            zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-        }
-
-        public List<String> getZkHosts(){
-            return zkHosts;
-        }
-
-        public Integer getZkPort(){
-            return zkPort;
-        }
-    }
-
-    protected void removeKafkaSpout(KafkaSpoutWrapper wrapper){
-        try {
-            wrapper.close();
-        } catch (Exception e) {
-            LOG.error("Close wrapper failed. Ignore and continue!", e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
deleted file mode 100644
index a2c9219..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/CreateTopicUtils.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import java.util.Properties;
-
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKStringSerializer$;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.slf4j.Logger;
-
-/**
- * normally this is used in unit test for convenience
- */
-public class CreateTopicUtils {
-    private static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(CreateTopicUtils.class);
-    private static final int partitions = 2;
-    private static final int replicationFactor = 1;
-    public static void ensureTopicReady(String zkQuorum, String topic){
-        ZkClient zkClient = new ZkClient(zkQuorum, 10000, 10000, 
ZKStringSerializer$.MODULE$);
-        if(!AdminUtils.topicExists(zkClient, topic)) {
-            LOG.info("create topic " + topic + " with partitions " + 
partitions + ", and replicationFactor " + replicationFactor);
-            AdminUtils.createTopic(zkClient, topic, partitions, 
replicationFactor, new Properties());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
deleted file mode 100644
index 23e94c3..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/ISpoutSpecLCM.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * topic to stream metadata lifecycle method
- * one topic may spawn multiple streams, the metadata change includes
- * 1. add/remove stream
- * 2. for a specific stream, groupingstrategy is changed
- *    ex1, this stream has more alert bolts than before, then this spout would 
take more traffic
- *    ex2, this stream has less alert bolts than before, then this spout would 
take less traffic
- */
-public interface ISpoutSpecLCM {
-    /**
-     * stream metadata is used for SPOUT to filter traffic and route traffic 
to following groupby bolts.
-     * @param metadata
-     */
-    void update(SpoutSpec metadata, Map<String, StreamDefinition> sds);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
deleted file mode 100644
index c786c01..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/KafkaMessageIdWrapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-/**
- * Created on 2/18/16.
- */
-public class KafkaMessageIdWrapper {
-    public Object id;
-    public KafkaMessageIdWrapper(Object o){
-        this.id = o;
-    }
-    public String topic;
-    private final static ObjectMapper objectMapper = new ObjectMapper();
-
-    public String toString(){
-        try {
-            return String.format("KafkaMessageIdWrapper[topic=%s, id=%s]", 
topic, objectMapper.writeValueAsString(id));
-        } catch (JsonProcessingException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
deleted file mode 100644
index 223f1b5..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SchemeBuilder.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.spout;
-
-import backtype.storm.spout.Scheme;
-
-
-/**
- * All Scheme implementations should have the following conditions
- * 1) implement Scheme interface
- * 2) has one constructor with topic name as parameter
- */
-public class SchemeBuilder {
-    public static Scheme buildFromClsName(String clsName, String topic) throws 
Exception{
-        Object o = 
Class.forName(clsName).getConstructor(String.class).newInstance(topic);
-        return (Scheme)o;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
deleted file mode 100644
index b37f7b3..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.spout;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
-import 
org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
-import org.apache.eagle.alert.utils.StreamIdConversion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-
-/**
- * intercept the message sent from within KafkaSpout and select downstream 
bolts based on meta-data
- * This is topic based. each topic will have one SpoutOutputCollectorWrapper
- */
-public class SpoutOutputCollectorWrapper extends SpoutOutputCollector 
implements ISpoutSpecLCM,SerializationMetadataProvider {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SpoutOutputCollectorWrapper.class);
-
-    private final ISpoutOutputCollector delegate;
-    private final String topic;
-    private final PartitionedEventSerializer serializer;
-    private int numOfRouterBolts;
-
-    private volatile List<StreamRepartitionMetadata> 
streamRepartitionMetadataList;
-    private volatile Tuple2StreamConverter converter;
-    private CorrelationSpout spout;
-    private volatile Map<String, StreamDefinition> sds;
-
-    /**
-     * @param delegate   actual SpoutOutputCollector to send data to following 
bolts
-     * @param topic      topic for this KafkaSpout to handle
-     * @param numGroupbyBolts bolts following this spout
-     * @param serializer
-     */
-    public SpoutOutputCollectorWrapper(CorrelationSpout spout,
-                                       ISpoutOutputCollector delegate,
-                                       String topic,
-                                       SpoutSpec spoutSpec,
-                                       int numGroupbyBolts,
-                                       Map<String, StreamDefinition> sds, 
PartitionedEventSerializer serializer) {
-        super(delegate);
-        this.spout = spout;
-        this.delegate = delegate;
-        this.topic = topic;
-        this.streamRepartitionMetadataList = 
spoutSpec.getStreamRepartitionMetadataMap().get(topic);
-        this.converter = new 
Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic));
-        this.numOfRouterBolts = numGroupbyBolts;
-        this.sds = sds;
-        this.serializer = serializer;
-    }
-
-    /**
-     * How to assert that numTotalGroupbyBolts >= numOfRouterBolts, otherwise
-     * there is runtime issue by default, tuple includes 2 fields field 1: 
topic
-     * name field 2: map of key/value
-     */
-    @SuppressWarnings("rawtypes")
-    @Override
-    public List<Integer> emit(List<Object> tuple, Object messageId) {
-        if (!sanityCheck()) {
-            LOG.error(
-                    "spout collector for topic {} see monitored metadata 
invalid, is this data source removed! Trigger message id {} ",
-                    topic, messageId);
-            return null;
-        }
-
-        KafkaMessageIdWrapper newMessageId = new 
KafkaMessageIdWrapper(messageId);
-        newMessageId.topic = topic;
-        /**
-            phase 1: tuple to stream converter
-            if this topic multiplexes multiple streams, then retrieve the 
individual streams
-        */
-        List<Object> convertedTuple = converter.convert(tuple);
-        if(convertedTuple == null) {
-            LOG.warn("source data {} can't be converted to a stream, ignore 
this message", tuple);
-            spout.ack(newMessageId);
-            return null;
-        }
-        Map m = (Map)convertedTuple.get(3);
-        Object streamId = convertedTuple.get(1);
-
-        StreamDefinition sd = sds.get(streamId);
-        if(sd == null){
-            LOG.warn("StreamDefinition {} is not found within {}, ignore this 
message", streamId, sds);
-            spout.ack(newMessageId);
-            return null;
-        }
-
-        StreamEvent event = 
convertToStreamEventByStreamDefinition((Long)convertedTuple.get(2), m, 
sds.get(streamId));
-        /*
-            phase 2: stream repartition
-        */
-        for(StreamRepartitionMetadata md : streamRepartitionMetadataList) {
-            // one stream may have multiple group-by strategies, each strategy 
is for a specific group-by
-            for(StreamRepartitionStrategy groupingStrategy : 
md.groupingStrategies){
-                int hash = 0;
-                
if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.GROUPBY))
 {
-                    hash = getRoutingHashByGroupingStrategy(m, 
groupingStrategy);
-                }else 
if(groupingStrategy.getPartition().getType().equals(StreamPartition.Type.SHUFFLE)){
-                    hash = Math.abs((int)System.currentTimeMillis());
-                }
-                int mod = hash % 
groupingStrategy.numTotalParticipatingRouterBolts;
-                // filter out message
-                if (mod >= groupingStrategy.startSequence && mod < 
groupingStrategy.startSequence + numOfRouterBolts) {
-                    // framework takes care of field grouping instead of using 
storm internal field grouping
-                    String sid = 
StreamIdConversion.generateStreamIdBetween(spout.getSpoutName(), 
spout.getRouteBoltName()+ (hash % numOfRouterBolts));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Emitted tuple: {} with message Id: {}, with 
topic {}, to streamId {}", convertedTuple, messageId, topic, sid);
-                    }
-                    // send message to StreamRouterBolt
-                    PartitionedEvent pEvent = new PartitionedEvent(event, 
groupingStrategy.partition, hash);
-                    if(this.serializer == null){
-                         delegate.emit(sid, Collections.singletonList(pEvent), 
newMessageId);
-                    }else {
-                        try {
-                            delegate.emit(sid, 
Collections.singletonList(serializer.serialize(pEvent)), newMessageId);
-                        } catch (IOException e) {
-                            LOG.error("Failed to serialize {}", pEvent, e);
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }else{
-                    // ******* short-cut ack ********
-                    // we should simply ack those messages which are not 
processed in this topology because KafkaSpout implementation requires _pending 
is empty
-                    // before moving to next offsets.
-                    if(LOG.isDebugEnabled()){
-                        LOG.debug("Message filtered with mod {} not within 
range {} and {} for message {}", mod, groupingStrategy.startSequence,
-                                groupingStrategy.startSequence+ 
numOfRouterBolts, tuple);
-                    }
-                    spout.ack(newMessageId);
-                }
-            }
-        }
-
-        return null;
-    }
-
-    @SuppressWarnings("rawtypes")
-    private int getRoutingHashByGroupingStrategy(Map data, 
StreamRepartitionStrategy gs){
-        // calculate hash value for values from group-by fields
-        HashCodeBuilder hashCodeBuilder = new HashCodeBuilder();
-        for(String groupingField : gs.partition.getColumns()) {
-            if(data.get(groupingField) != null){
-                hashCodeBuilder.append(data.get(groupingField));
-            } else {
-                LOG.warn("Required GroupBy fields {} not found: {}", 
gs.partition.getColumns(), data);
-            }
-        }
-        int hash = hashCodeBuilder.toHashCode();
-        hash = Math.abs(hash);
-        return hash;
-    }
-
-    private boolean sanityCheck() {
-        boolean isOk = true;
-        if (streamRepartitionMetadataList == null) {
-            LOG.error("streamRepartitionMetadataList is null!");
-            isOk = false;
-        }
-        if (converter == null) {
-            LOG.error("tuple2StreamMetadata is null!");
-            isOk = false;
-        }
-        return isOk;
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private StreamEvent convertToStreamEventByStreamDefinition(long timestamp, 
Map m, StreamDefinition sd){
-        return 
StreamEvent.Builder().timestamep(timestamp).attributes(m,sd).build();
-    }
-
-    /**
-     * SpoutSpec may be changed, this class will respond to changes on 
tuple2StreamMetadataMap and streamRepartitionMetadataMap
-     * @param spoutSpec
-     * @param sds
-     */
-    @Override
-    public void update(SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) 
{
-        this.streamRepartitionMetadataList = 
spoutSpec.getStreamRepartitionMetadataMap().get(topic);
-        this.converter = new 
Tuple2StreamConverter(spoutSpec.getTuple2StreamMetadataMap().get(topic));
-        this.sds = sds;
-    }
-
-    @Override
-    public StreamDefinition getStreamDefinition(String streamId) {
-        return this.sds.get(streamId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
deleted file mode 100644
index f526cad..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/CompressionUtils.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.utils;
-
-import com.google.common.io.ByteStreams;
-
-import java.io.*;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-
-public class CompressionUtils {
-    public static byte[] compress(byte[] source) throws IOException {
-        if (source == null || source.length == 0) {
-            return source;
-        }
-        ByteArrayInputStream sourceStream = new ByteArrayInputStream(source);
-        ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(source.length / 2);
-        try (OutputStream compressor = new GZIPOutputStream(outputStream)) {
-            ByteStreams.copy(sourceStream, compressor);
-            compressor.close();
-        }
-        try {
-            return outputStream.toByteArray();
-        } finally {
-            sourceStream.close();
-            outputStream.close();
-        }
-    }
-
-    public static byte[] decompress(byte[] compressed) throws IOException{
-        if (compressed == null || compressed.length == 0) {
-            return compressed;
-        }
-        ByteArrayInputStream sourceStream = new 
ByteArrayInputStream(compressed);
-        ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(compressed.length * 2);
-        try (GZIPInputStream compressor = new GZIPInputStream(sourceStream)) {
-            ByteStreams.copy(compressor, outputStream);
-            compressor.close();
-        }
-        try {
-            return outputStream.toByteArray();
-        } finally {
-            sourceStream.close();
-            outputStream.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
deleted file mode 100644
index a576404..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/MetadataSerDeser.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.utils;
-
-import java.io.InputStream;
-
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Since 5/6/16.
- */
-public class MetadataSerDeser {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MetadataSerDeser.class);
-
-    @SuppressWarnings("rawtypes")
-    public static <K> K deserialize(InputStream is, TypeReference typeRef){
-        ObjectMapper mapper = new ObjectMapper();
-        try {
-            K spec = mapper.readValue(is, typeRef);
-            return spec;
-        }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from input 
stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
-        }
-        return null;
-    }
-
-    public static <K> K deserialize(InputStream is, Class<K> cls){
-        ObjectMapper mapper = new ObjectMapper();
-        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS,true);
-        try {
-            K spec = mapper.readValue(is, cls);
-            return spec;
-        }catch(Exception ex){
-            LOG.error("Got error to deserialize metadata of type {} from input 
stream", new TypeReference<K>(){}.getType().getTypeName(), ex);
-        }
-        return null;
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static <K> K deserialize(String json, TypeReference typeRef){
-        ObjectMapper mapper = new ObjectMapper();
-        try {
-            K spec = mapper.readValue(json, typeRef);
-            return spec;
-        }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from {}", 
new TypeReference<K>(){}.getType().getTypeName(), json, ex);
-        }
-        return null;
-    }
-
-    public static <K> K deserialize(String json, Class<K> cls){
-        ObjectMapper mapper = new ObjectMapper();
-        try {
-            K spec = mapper.readValue(json, cls);
-            return spec;
-        }catch(Exception ex){
-            LOG.error("error in deserializing metadata of type {} from {}", 
new TypeReference<K>(){}.getType().getTypeName(), json, ex);
-        }
-        return null;
-    }
-
-    public static <K> String serialize(K spec){
-        ObjectMapper mapper = new ObjectMapper();
-        try{
-            String json = mapper.writeValueAsString(spec);
-            return json;
-        }catch(Exception ex){
-            LOG.error("error in serializing object {} with type {}", spec, new 
TypeReference<K>(){}.getType().getTypeName(), ex);
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
deleted file mode 100644
index f4652a3..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/utils/SerializableUtils.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.utils;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
-
-/**
- * Utilities for working with Serializables.
- *
- * Derived from "com.google.cloud.dataflow.sdk.util.SerializableUtils":
- * 
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java
- */
-public class SerializableUtils {
-  /**
-   * Serializes the argument into an array of bytes, and returns it.
-   *
-   * @throws IllegalArgumentException if there are errors when serializing
-   */
-  public static byte[] serializeToCompressedByteArray(Object value) {
-    try {
-      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-      try (ObjectOutputStream oos = new ObjectOutputStream(new 
SnappyOutputStream(buffer))) {
-        oos.writeObject(value);
-      }
-      return buffer.toByteArray();
-    } catch (IOException exn) {
-      throw new IllegalArgumentException(
-          "unable to serialize " + value,
-          exn);
-    }
-  }
-
-  /**
-   * Serializes the argument into an array of bytes, and returns it.
-   *
-   * @throws IllegalArgumentException if there are errors when serializing
-   */
-  public static byte[] serializeToByteArray(Object value) {
-    try {
-      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-      try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) {
-        oos.writeObject(value);
-      }
-      return buffer.toByteArray();
-    } catch (IOException exn) {
-      throw new IllegalArgumentException("unable to serialize " + value, exn);
-    }
-  }
-
-  /**
-   * Deserializes an object from the given array of bytes, e.g., as
-   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
-   *
-   * @throws IllegalArgumentException if there are errors when
-   * deserializing, using the provided description to identify what
-   * was being deserialized
-   */
-  public static Object deserializeFromByteArray(byte[] encodedValue,
-                                                          String description) {
-    try {
-      try (ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(encodedValue))) {
-        return ois.readObject();
-      }
-    } catch (IOException | ClassNotFoundException exn) {
-      throw new IllegalArgumentException(
-          "unable to deserialize " + description,
-          exn);
-    }
-  }
-
-  /**
-   * Deserializes an object from the given array of bytes, e.g., as
-   * serialized using {@link #serializeToCompressedByteArray}, and returns it.
-   *
-   * @throws IllegalArgumentException if there are errors when
-   * deserializing, using the provided description to identify what
-   * was being deserialized
-   */
-  public static Object deserializeFromCompressedByteArray(byte[] encodedValue,
-                                                          String description) {
-    try {
-      try (ObjectInputStream ois = new ObjectInputStream(
-          new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) {
-        return ois.readObject();
-      }
-    } catch (IOException | ClassNotFoundException exn) {
-      throw new IllegalArgumentException(
-          "unable to deserialize " + description,
-          exn);
-    }
-  }
-
-  public static <T extends Serializable> T ensureSerializable(T value) {
-    @SuppressWarnings("unchecked")
-    T copy = (T) 
deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
-        value.toString());
-    return copy;
-  }
-
-  public static <T extends Serializable> T clone(T value) {
-    @SuppressWarnings("unchecked")
-    T copy = (T) 
deserializeFromCompressedByteArray(serializeToCompressedByteArray(value),
-        value.toString());
-    return copy;
-  }
-}
\ No newline at end of file


Reply via email to