http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
deleted file mode 100644
index 189e2a5..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/internal/Topology.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordination.model.internal;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * @since Mar 24, 2016 Logically one unit topology consists of S spouts, G
- *        groupby bolts, A alertBolts normally S=1 Physically each spout is
- *        composed of s spout nodes, each groupby bolt is composed of g groupby
- *        nodes, and each alert bolt is composed of a alert nodes
- */
-public class Topology {
-
-    private String name;
-    // number of logical nodes
-    private int numOfSpout;
-    private int numOfAlertBolt;
-    private int numOfGroupBolt;
-    private int numOfPublishBolt;
-    private String spoutId;
-    private String pubBoltId;
-    @Deprecated
-    private Set<String> groupNodeIds;
-    @Deprecated
-    private Set<String> alertBoltIds;
-
-    // number of physical nodes for each logic bolt
-    private int spoutParallelism = 1;
-    private int groupParallelism = 1;
-    private int alertParallelism = 1;
-
-    private String clusterName;
-
-    public Topology() {
-    }
-
-    public Topology(String name, int group, int alert) {
-        this.name = name;
-        this.numOfSpout = 1;
-        this.numOfGroupBolt = group;
-        this.numOfAlertBolt = alert;
-        groupNodeIds = new HashSet<String>(group);
-        alertBoltIds = new HashSet<String>(alert);
-
-        spoutParallelism = 1;
-        groupParallelism = 1;
-        alertParallelism = 1;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public int getNumOfSpout() {
-        return numOfSpout;
-    }
-
-    public void setNumOfSpout(int numOfSpout) {
-        this.numOfSpout = numOfSpout;
-    }
-
-    public int getNumOfAlertBolt() {
-        return numOfAlertBolt;
-    }
-
-    public void setNumOfAlertBolt(int numOfAlertBolt) {
-        this.numOfAlertBolt = numOfAlertBolt;
-    }
-
-    public int getNumOfGroupBolt() {
-        return numOfGroupBolt;
-    }
-
-    public void setNumOfGroupBolt(int numOfGroupBolt) {
-        this.numOfGroupBolt = numOfGroupBolt;
-    }
-
-    public String getSpoutId() {
-        return spoutId;
-    }
-
-    public void setSpoutId(String spoutId) {
-        this.spoutId = spoutId;
-    }
-
-    public String getPubBoltId() {
-        return pubBoltId;
-    }
-
-    public void setPubBoltId(String pubBoltId) {
-        this.pubBoltId = pubBoltId;
-    }
-
-    public Set<String> getGroupNodeIds() {
-        return groupNodeIds;
-    }
-
-    public void setGroupNodeIds(Set<String> groupNodeIds) {
-        this.groupNodeIds = groupNodeIds;
-    }
-
-    public Set<String> getAlertBoltIds() {
-        return alertBoltIds;
-    }
-
-    public void setAlertBoltIds(Set<String> alertBoltIds) {
-        this.alertBoltIds = alertBoltIds;
-    }
-
-    public int getNumOfPublishBolt() {
-        return numOfPublishBolt;
-    }
-
-    public void setNumOfPublishBolt(int numOfPublishBolt) {
-        this.numOfPublishBolt = numOfPublishBolt;
-    }
-
-    public int getSpoutParallelism() {
-        return spoutParallelism;
-    }
-
-    public void setSpoutParallelism(int spoutParallelism) {
-        this.spoutParallelism = spoutParallelism;
-    }
-
-    public int getGroupParallelism() {
-        return groupParallelism;
-    }
-
-    public void setGroupParallelism(int groupParallelism) {
-        this.groupParallelism = groupParallelism;
-    }
-
-    public int getAlertParallelism() {
-        return alertParallelism;
-    }
-
-    public void setAlertParallelism(int alertParallelism) {
-        this.alertParallelism = alertParallelism;
-    }
-
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
deleted file mode 100644
index 3272d28..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/IStreamCodec.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.codec;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-/**
- * @since Apr 5, 2016
- *
- */
-public interface IStreamCodec {
-
-    StreamEvent decode(byte[] contents);
-
-    byte[] encode(StreamEvent tuple);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
deleted file mode 100644
index 49726e9..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockEventCodec.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.codec;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-/**
- * This is used for event codec.
- * 
- * @since Apr 5, 2016
- *
- */
-public class SherlockEventCodec implements IStreamCodec {
-
-    @Override
-    public StreamEvent decode(byte[] contents) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public byte[] encode(StreamEvent tuple) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
deleted file mode 100644
index 0e875d8..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/codec/SherlockMetricCodec.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.codec;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-
-/**
- * @since Apr 5, 2016
- *
- */
-public class SherlockMetricCodec implements IStreamCodec {
-
-    @Override
-    public StreamEvent decode(byte[] contents) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public byte[] encode(StreamEvent tuple) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
deleted file mode 100644
index f97eb2b..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * @since Apr 5, 2016
- *
- */
-public class PolicyDefinition implements Serializable{
-    private static final long serialVersionUID = 377581499339572414L;
-    // unique identifier
-    private String name;
-    private String description;
-    private List<String> inputStreams = new ArrayList<String>();
-    private List<String> outputStreams = new ArrayList<String>();
-
-    private Definition definition;
-
-    // one stream only have one partition in one policy, since we don't 
support stream alias
-    private List<StreamPartition> partitionSpec = new 
ArrayList<StreamPartition>();
-
-    // runtime configuration for policy, these are user-invisible
-    private int parallelismHint = 1;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public List<String> getInputStreams() {
-        return inputStreams;
-    }
-
-    public void setInputStreams(List<String> inputStreams) {
-        this.inputStreams = inputStreams;
-    }
-
-    public List<String> getOutputStreams() {
-        return outputStreams;
-    }
-
-    public void setOutputStreams(List<String> outputStreams) {
-        this.outputStreams = outputStreams;
-    }
-
-    public Definition getDefinition() {
-        return definition;
-    }
-
-    public void setDefinition(Definition definition) {
-        this.definition = definition;
-    }
-
-    public List<StreamPartition> getPartitionSpec() {
-        return partitionSpec;
-    }
-
-    public void setPartitionSpec(List<StreamPartition> partitionSpec) {
-        this.partitionSpec = partitionSpec;
-    }
-
-    public void addPartition(StreamPartition par) {
-        this.partitionSpec.add(par);
-    }
-
-    public int getParallelismHint() {
-        return parallelismHint;
-    }
-
-    public void setParallelismHint(int parallelism) {
-        this.parallelismHint = parallelism;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder().
-                append(name).
-                append(description).
-                append(inputStreams).
-                append(outputStreams).
-                append(definition).
-                append(partitionSpec).
-//                append(parallelismHint).
-                build();
-    }
-
-    @Override
-    public boolean equals(Object that){
-        if(that == this)
-            return true;
-        if(! (that instanceof PolicyDefinition))
-            return false;
-        PolicyDefinition another = (PolicyDefinition)that;
-        if(another.name.equals(this.name) &&
-                another.description.equals(this.description) &&
-                CollectionUtils.isEqualCollection(another.inputStreams, 
this.inputStreams) &&
-                CollectionUtils.isEqualCollection(another.outputStreams, 
this.outputStreams) &&
-                another.definition.equals(this.definition) &&
-                CollectionUtils.isEqualCollection(another.partitionSpec, 
this.partitionSpec) 
-//                && another.parallelismHint == this.parallelismHint
-                ) {
-            return true;
-        }
-        return false;
-    }
-
-    public static class Definition implements Serializable{
-        private static final long serialVersionUID = -622366527887848346L;
-
-        public String type;
-        public String value;
-
-        public Definition(String type,String value){
-            this.type = type;
-            this.value = value;
-        }
-
-        public Definition() {
-            this.type = null;
-            this.value = null;
-        }
-
-        @Override
-        public int hashCode() {
-            return new HashCodeBuilder().append(type).append(value).build();
-        }
-
-        @Override
-        public boolean equals(Object that){
-            if(that == this)
-                return true;
-            if(!(that instanceof Definition))
-                return false;
-            Definition another = (Definition)that;
-            if(another.type.equals(this.type) &&
-                    another.value.equals(this.value))
-                return true;
-            return false;
-        }
-
-        public String getType() {
-            return type;
-        }
-
-        public void setType(String type) {
-            this.type = type;
-        }
-
-        public String getValue() {
-            return value;
-        }
-
-        public void setValue(String value) {
-            this.value = value;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("{type=\"%s\",value=\"%s\"",type,value);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return 
String.format("{name=\"%s\",definition=%s}",this.getName(),this.getDefinition().toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
deleted file mode 100644
index d8b4f28..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * @since Apr 11, 2016
- *
- */
-public class Publishment {
-
-    private String name;
-    private String type;
-    private List<String> policyIds;
-    private String dedupIntervalMin;
-    private Map<String, String> properties;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public List<String> getPolicyIds() {
-        return policyIds;
-    }
-
-    public void setPolicyIds(List<String> policyIds) {
-        this.policyIds = policyIds;
-    }
-
-    public String getDedupIntervalMin() {
-        return dedupIntervalMin;
-    }
-
-    public void setDedupIntervalMin(String dedupIntervalMin) {
-        this.dedupIntervalMin = dedupIntervalMin;
-    }
-
-    public Map<String, String> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Map<String, String> properties) {
-        this.properties = properties;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof Publishment) {
-            Publishment p = (Publishment) obj;
-            return (Objects.equals(name, p.getName()) &&
-                    Objects.equals(type, p.getType()) &&
-                    Objects.equals(dedupIntervalMin, p.getDedupIntervalMin()) 
&&
-                    Objects.equals(policyIds, p.getPolicyIds()) &&
-                    properties.equals(p.getProperties()));
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(name)
-                .append(type)
-                .append(dedupIntervalMin)
-                .append(policyIds)
-                .append(properties)
-                .build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
deleted file mode 100644
index f34b971..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.util.Objects;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-public class PublishmentType {
-
-    private String type;
-    private String className;
-    private String description;
-    private String fields;
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getClassName(){
-        return className;
-    }
-    public void setClassName(String className){
-        this.className = className;
-    }
-
-    public String getDescription(){
-        return description;
-    }
-    public void setDescription(String description){
-        this.description = description;
-    }
-
-    public String getFields() {
-        return fields;
-    }
-
-    public void setFields(String fields) {
-        this.fields = fields;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj instanceof PublishmentType) {
-            PublishmentType p = (PublishmentType) obj;
-            return (Objects.equals(className, p.getClassName()) &&
-                    Objects.equals(type, p.type) && 
-                    Objects.equals(description, p.getDescription()) &&
-                    Objects.equals(fields, p.getFields()));
-        }
-        return false;
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(className)
-                .append(type)
-                .append(description)
-                .append(fields)
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
deleted file mode 100644
index dc44571..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-public class StreamColumn implements Serializable{
-    private static final long serialVersionUID = -5457861313624389106L;
-    private String name;
-    private Type type;
-    private Object defaultValue;
-    private boolean required;
-    private String description;
-
-    public String toString(){
-        return String.format("StreamColumn=name[%s], type=[%s], 
defaultValue=[%s], required=[%s]", name, type, defaultValue, required);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public Type getType() {
-        return type;
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    public Object getDefaultValue() {
-        return defaultValue;
-    }
-
-    public void setDefaultValue(Object defaultValue) {
-        this.defaultValue = defaultValue;
-    }
-
-    public boolean isRequired() {
-        return required;
-    }
-
-    public void setRequired(boolean required) {
-        this.required = required;
-    }
-    
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-
-    public enum Type implements Serializable{
-        STRING("string"), INT("int"), LONG("long"), FLOAT("float"), 
DOUBLE("double"), BOOL("bool"), OBJECT("object");
-
-        private final String name;
-
-        Type(String name){
-            this.name = name;
-        }
-
-        @Override
-        public String toString() {
-            return name;
-        }
-
-        @JsonCreator
-        public static Type getEnumFromValue(String value) {
-            for (Type testEnum : values()) {
-                if (testEnum.name.equals(value)) {
-                    return testEnum;
-                }
-            }
-            throw new IllegalArgumentException();
-        }
-    }
-
-    public static class Builder {
-        private StreamColumn column;
-
-        public Builder(){
-            column = new StreamColumn();
-        }
-        public Builder name(String name){
-            column.setName(name);
-            return this;
-        }
-        public Builder type(Type type){
-            column.setType(type);
-            return this;
-        }
-        public Builder defaultValue(Object defaultValue){
-            column.setDefaultValue(defaultValue);
-            return this;
-        }
-        public Builder required(boolean required){
-            column.setRequired(required);
-            return this;
-        }
-
-        public StreamColumn build(){
-            return column;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
deleted file mode 100644
index cd5773a..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This is actually a data source schema.
- * 
- * @since Apr 5, 2016
- *
- */
-public class StreamDefinition implements Serializable{
-    private static final long serialVersionUID = 2352202882328931825L;
-    private String streamId;
-    private String dataSource;
-    private String description;
-    private boolean validate;
-    private boolean timeseries;
-
-    private List<StreamColumn> columns = new ArrayList<StreamColumn>();
-
-    public String toString(){
-        return String.format("StreamDefinition[streamId=%s, dataSource=%s, 
description=%s, validate=%s, timeseries=%s, columns=%s",
-                streamId,
-                dataSource,
-                description,
-                validate,
-                timeseries,
-                columns);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public boolean isValidate() {
-        return validate;
-    }
-
-    public void setValidate(boolean validate) {
-        this.validate = validate;
-    }
-
-    public boolean isTimeseries() {
-        return timeseries;
-    }
-
-    public void setTimeseries(boolean timeseries) {
-        this.timeseries = timeseries;
-    }
-
-    public List<StreamColumn> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(List<StreamColumn> columns) {
-        this.columns = columns;
-    }
-
-    public String getDataSource() {
-        return dataSource;
-    }
-
-    public void setDataSource(String dataSource) {
-        this.dataSource = dataSource;
-    }
-
-    public int getColumnIndex(String column){
-        int i=0;
-        for(StreamColumn col:this.getColumns()){
-            if(col.getName().equals(column)) return i;
-            i++;
-        }
-        return -1;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
deleted file mode 100644
index 7b96024..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamPartition.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.io.Serializable;
-import java.util.*;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * StreamPartition defines how a data stream is partitioned and sorted
- * streamId is used for distinguishing different streams which are spawned 
from the same data source
- * type defines how to partition data among slots within one slotqueue
- * columns are fields based on which stream is grouped
- * sortSpec defines how data is sorted
- */
-public class StreamPartition implements Serializable {
-    private static final long serialVersionUID = -3361648309136926040L;
-
-    private String streamId;
-    private Type type;
-    private List<String> columns = new ArrayList<>();
-    private StreamSortSpec sortSpec;
-
-    public StreamPartition() {
-    }
-
-    public StreamPartition(StreamPartition o) {
-        this.streamId = o.streamId;
-        this.type = o.type;
-        this.columns = new ArrayList<String>(o.columns);
-        this.sortSpec = o.sortSpec == null ? null : new 
StreamSortSpec(o.sortSpec);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (other == this)
-            return true;
-        if (!(other instanceof StreamPartition)) {
-            return false;
-        }
-        StreamPartition sp = (StreamPartition) other;
-        return Objects.equals(streamId, sp.streamId) && Objects.equals(type, 
sp.type)
-                && CollectionUtils.isEqualCollection(columns, sp.columns) && 
Objects.equals(sortSpec, sp.sortSpec);
-    }
-
-    @Override
-    public int hashCode() {
-        return new 
HashCodeBuilder().append(streamId).append(type).append(columns).append(sortSpec).build();
-    }
-
-    public void setType(Type type) {
-        this.type = type;
-    }
-
-    public Type getType(){
-        return this.type;
-    }
-
-    public enum Type{
-        GLOBAL("GLOBAL",0), GROUPBY("GROUPBY",1), SHUFFLE("SHUFFLE",2);
-        private final String name;
-        private final int index;
-        Type(String name, int index){
-            this.name = name;
-            this.index = index;
-        }
-        @Override
-        public String toString() {
-            return this.name;
-        }
-        public static Type locate(String type){
-            Type _type = _NAME_TYPE.get(type.toUpperCase());
-            if(_type == null)
-                throw new IllegalStateException("Illegal type name: "+type);
-            return _type;
-        }
-
-        public static Type locate(int index){
-            Type _type = _INDEX_TYPE.get(index);
-            if(_type == null)
-                throw new IllegalStateException("Illegal type index: "+index);
-            return _type;
-        }
-
-        private static final Map<String,Type> _NAME_TYPE = new HashMap<>();
-        private static final Map<Integer,Type> _INDEX_TYPE = new TreeMap<>();
-        static {
-            _NAME_TYPE.put(GLOBAL.name,GLOBAL);
-            _NAME_TYPE.put(GROUPBY.name,GROUPBY);
-            _NAME_TYPE.put(SHUFFLE.name,SHUFFLE);
-
-            _INDEX_TYPE.put(GLOBAL.index,GLOBAL);
-            _INDEX_TYPE.put(GROUPBY.index,GLOBAL);
-            _INDEX_TYPE.put(SHUFFLE.index,GLOBAL);
-        }
-    }
-
-    public List<String> getColumns() {
-        return columns;
-    }
-
-    public void setColumns(List<String> columns) {
-        this.columns = columns;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public StreamSortSpec getSortSpec() {
-        return sortSpec;
-    }
-
-    public void setSortSpec(StreamSortSpec sortSpec) {
-        this.sortSpec = sortSpec;
-    }
-
-    @Override
-    public String toString() {
-        return 
String.format("StreamPartition[streamId=%s,type=%s,columns=[%s],sortSpec=[%s]]",this.getStreamId(),this.getType(),
 StringUtils.join(this.getColumns(),","), sortSpec);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
deleted file mode 100644
index ee20f81..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamSortSpec.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.eagle.alert.utils.TimePeriodUtils;
-import org.joda.time.Period;
-
-/**
- * streamId is the key
- */
-public class StreamSortSpec implements Serializable{
-    private static final long serialVersionUID = 3626506441441584937L;
-    private String windowPeriod="";
-    private int windowMargin = 30 * 1000; // 30 seconds by default
-
-    public StreamSortSpec() {}
-
-    public StreamSortSpec(StreamSortSpec spec) {
-        this.windowPeriod = spec.windowPeriod;
-        this.windowMargin = spec.windowMargin;
-    }
-
-    public String getWindowPeriod() {
-        return windowPeriod;
-    }
-
-    public int getWindowPeriodMillis() {
-        if(windowPeriod!=null) {
-            return 
TimePeriodUtils.getMillisecondsOfPeriod(Period.parse(windowPeriod));
-        } else return 0;
-    }
-
-    public void setWindowPeriod(String windowPeriod) {
-        this.windowPeriod = windowPeriod;
-    }
-    public void setWindowPeriodMillis(int windowPeriodMillis) {
-        this.windowPeriod = Period.millis(windowPeriodMillis).toString();
-    }
-
-    public void setWindowPeriod2(Period period) {
-        this.windowPeriod = period.toString();
-    }
-
-
-    public int getWindowMargin() {
-        return windowMargin;
-    }
-
-    public void setWindowMargin(int windowMargin) {
-        this.windowMargin = windowMargin;
-    }
-
-    @Override
-    public int hashCode(){
-        return new HashCodeBuilder().
-                append(windowPeriod).
-                append(windowMargin).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object that){
-        if(this == that)
-            return true;
-        if(!(that instanceof StreamSortSpec)){
-            return false;
-        }
-
-        StreamSortSpec another = (StreamSortSpec)that;
-        return 
-                another.windowPeriod.equals(this.windowPeriod) &&
-                another.windowMargin == this.windowMargin;
-    }
-
-    @Override
-    public String toString(){
-        return String.format("StreamSortSpec[windowPeriod=%s,windowMargin=%d]",
-                this.getWindowPeriod(),
-                this.getWindowMargin());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
deleted file mode 100644
index 6cafb16..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamingCluster.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.coordinator;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * @since Apr 5, 2016
- *
- */
-public class StreamingCluster {
-    public static enum StreamingType {
-        STORM
-    }
-
-    @JsonProperty
-    private String name;
-    @JsonProperty
-    private String zone;
-    @JsonProperty
-    private StreamingType type;
-    @JsonProperty
-    private String description;
-    /**
-     * key - nimbus for storm
-     */
-    @JsonProperty
-    private Map<String, String> deployments;
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getZone() {
-        return zone;
-    }
-
-    public void setZone(String zone) {
-        this.zone = zone;
-    }
-
-    public StreamingType getType() {
-        return type;
-    }
-
-    public void setType(StreamingType type) {
-        this.type = type;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public void setDescription(String description) {
-        this.description = description;
-    }
-
-    public Map<String, String> getDeployments() {
-        return deployments;
-    }
-
-    public void setDeployments(Map<String, String> deployments) {
-        this.deployments = deployments;
-    }
-
-    public static final String NIMBUS_HOST = "nimbusHost";
-    public static final String NIMBUS_THRIFT_PORT = "nimbusThriftPort";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
deleted file mode 100644
index f36d3cb..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-
-/**
- * streamId stands for alert type instead of source event streamId
- */
-public class AlertStreamEvent extends StreamEvent {
-    private static final long serialVersionUID = 2392131134670106397L;
-
-    // TODO: Keep policy name only instead of policy entity
-    private PolicyDefinition policy;
-    private StreamDefinition schema;
-    private String createdBy;
-    private long createdTime;
-
-    public PolicyDefinition getPolicy() {
-        return policy;
-    }
-
-    public void setPolicy(PolicyDefinition policy) {
-        this.policy = policy;
-    }
-
-
-    public String getPolicyId() {
-        return policy.getName();
-    }
-
-    @Override
-    public String toString() {
-        List<String> dataStrings = new ArrayList<>(this.getData().length);
-        for(Object obj: this.getData()){
-            if(obj!=null) {
-                dataStrings.add(obj.toString());
-            }else{
-                dataStrings.add(null);
-            }
-        }
-        return 
String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policy=%s, 
createdBy=%s]",
-                this.getStreamId(), 
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), 
StringUtils.join(dataStrings,","),this.getPolicy().getName(),this.getCreatedBy());
-    }
-
-    public String getCreatedBy() {
-        return createdBy;
-    }
-
-    public void setCreatedBy(String createdBy) {
-        this.createdBy = createdBy;
-    }
-
-    public StreamDefinition getSchema() {
-        return schema;
-    }
-
-    public void setSchema(StreamDefinition schema) {
-        this.schema = schema;
-    }
-
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    public void setCreatedTime(long createdTime) {
-        this.createdTime = createdTime;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
deleted file mode 100644
index cfed3e2..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/PartitionedEvent.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-import backtype.storm.tuple.Tuple;
-
-/**
- * This is a critical data structure across spout, router bolt and alert bolt
- * partition[StreamPartition] defines how one incoming data stream is 
partitioned, sorted
- * partitionKey[long] is java hash value of groupby fields. The groupby fields 
are defined in StreamPartition
- * event[StreamEvent] is actual data
- */
-public class PartitionedEvent implements Serializable{
-    private static final long serialVersionUID = -3840016190614238593L;
-    private StreamPartition partition;
-    private long partitionKey;
-    private StreamEvent event;
-
-    /**
-     * Used for bolt-internal but not inter-bolts,
-     * will not pass across bolts
-     */
-    private transient Tuple anchor;
-
-    public PartitionedEvent(){
-        this.event = null;
-        this.partition = null;
-        this.partitionKey = 0L;
-    }
-
-    public PartitionedEvent(StreamEvent event, StreamPartition partition, int 
partitionKey) {
-        this.event = event;
-        this.partition = partition;
-        this.partitionKey = partitionKey;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if(obj == this) return true;
-        if(obj == null) return false;
-        if(obj instanceof PartitionedEvent){
-            PartitionedEvent another = (PartitionedEvent) obj;
-            return !(this.partitionKey != another.getPartitionKey()
-                    || !Objects.equals(this.event, another.getEvent())
-                    || !Objects.equals(this.partition, another.getPartition())
-                    || !Objects.equals(this.anchor, another.anchor));
-        } else {
-            return false;
-        }
-    }
-
-    @Override
-    public int hashCode() {
-        return new HashCodeBuilder()
-                .append(partitionKey)
-                .append(event)
-                .append(partition)
-                .build();
-    }
-
-    public StreamEvent getEvent() {
-        return event;
-    }
-
-    public void setEvent(StreamEvent event) {
-        this.event = event;
-    }
-
-    public StreamPartition getPartition() {
-        return partition;
-    }
-
-    public void setPartition(StreamPartition partition) {
-        this.partition = partition;
-    }
-
-    public void setPartitionKey(long partitionKey){
-        this.partitionKey = partitionKey;
-    }
-
-    public long getPartitionKey(){
-        return this.partitionKey;
-    }
-
-    public String toString(){
-       return String.format("PartitionedEvent[partition=%s,event=%s,key=%s", 
partition, event,partitionKey);
-    }
-
-    public long getTimestamp() {
-        return (event != null) ? event.getTimestamp() : 0L;
-    }
-
-    public String getStreamId(){
-        return (event != null) ? event.getStreamId() : null;
-    }
-
-    public Object[] getData(){
-        return event!=null ? event.getData() : null;
-    }
-
-    public boolean isSortRequired(){
-        return isPartitionRequired() && 
this.getPartition().getSortSpec()!=null;
-    }
-
-    public boolean isPartitionRequired(){
-        return this.getPartition()!=null;
-    }
-
-    public PartitionedEvent copy() {
-        PartitionedEvent copied = new PartitionedEvent();
-        copied.setEvent(this.getEvent());
-        copied.setPartition(this.partition);
-        copied.setPartitionKey(this.partitionKey);
-        return copied;
-    }
-
-    public Tuple getAnchor() {
-        return anchor;
-    }
-
-    public void setAnchor(Tuple anchor) {
-        this.anchor = anchor;
-    }
-
-    public PartitionedEvent withAnchor(Tuple tuple){
-        this.setAnchor(tuple);
-        return this;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
deleted file mode 100644
index 3e4e1df..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEvent.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-
-/**
- * @since Apr 5, 2016
- *
- */
-public class StreamEvent implements Serializable {
-    private static final long serialVersionUID = 2765116509856609763L;
-
-    private String streamId;
-    private Object[] data;
-    private long timestamp;
-
-    public StreamEvent(){}
-
-    public StreamEvent(String streamId,long timestamp,Object[] data){
-        this.setStreamId(streamId);
-        this.setTimestamp(timestamp);
-        this.setData(data);
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    public void setStreamId(String streamId) {
-        this.streamId = streamId;
-    }
-
-    public Object[] getData() {
-        return data;
-    }
-
-    public void setData(Object[] data) {
-        this.data = data;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public int hashCode() {
-        return new 
HashCodeBuilder().append(streamId).append(timestamp).append(data).build();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if(obj == this) return true;
-        if(obj instanceof StreamEvent){
-            StreamEvent another = (StreamEvent) obj;
-            return Objects.equals(this.streamId,another.streamId) && 
this.timestamp == another.timestamp && 
Arrays.deepEquals(this.data,another.data);
-        }
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        List<String> dataStrings = new ArrayList<>();
-        if(this.getData() != null) {
-            for (Object obj : this.getData()) {
-                if (obj != null) {
-                    dataStrings.add(obj.toString());
-                } else {
-                    dataStrings.add(null);
-                }
-            }
-        }
-        return 
String.format("StreamEvent[stream=%S,timestamp=%s,data=[%s]]",this.getStreamId(),
 DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()), 
StringUtils.join(dataStrings,","));
-    }
-
-    public static StreamEventBuilder Builder(){
-        return new StreamEventBuilder();
-    }
-
-    /**
-     * @return cloned new event object
-     */
-    public StreamEvent copy(){
-        StreamEvent newEvent = new StreamEvent();
-        newEvent.setTimestamp(this.getTimestamp());
-        newEvent.setData(this.getData());
-        newEvent.setStreamId(this.getStreamId());
-        return newEvent;
-    }
-
-    public void copyFrom(StreamEvent event){
-        this.setTimestamp(event.getTimestamp());
-        this.setData(event.getData());
-        this.setStreamId(event.getStreamId());
-    }
-
-    /**
-     * @param column
-     * @param streamDefinition
-     * @return
-     */
-    public Object[] getData(StreamDefinition streamDefinition,List<String> 
column) {
-        ArrayList<Object> result = new ArrayList<>(column.size());
-        for (String colName : column) {
-            
result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
-        }
-        return result.toArray();
-    }
-
-    public Object[] getData(StreamDefinition streamDefinition,String ... 
column) {
-        ArrayList<Object> result = new ArrayList<>(column.length);
-        for (String colName : column) {
-            
result.add(this.getData()[streamDefinition.getColumnIndex(colName)]);
-        }
-        return result.toArray();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
deleted file mode 100644
index 136fd8b..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/StreamEventBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.model;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamEventBuilder{
-    private final static Logger LOG = 
LoggerFactory.getLogger(StreamEventBuilder.class);
-
-    private StreamEvent instance;
-    private StreamDefinition streamDefinition;
-    public StreamEventBuilder(){
-        instance = new StreamEvent();
-    }
-
-    public StreamEventBuilder schema(StreamDefinition streamDefinition){
-        this.streamDefinition = streamDefinition;
-        if(instance.getStreamId() == null) 
instance.setStreamId(streamDefinition.getStreamId());
-        return this;
-    }
-
-    public StreamEventBuilder streamId(String streamId){
-        instance.setStreamId(streamId);
-        return this;
-    }
-
-    public StreamEventBuilder attributes(Map<String,Object> data, 
StreamDefinition streamDefinition){
-        this.schema(streamDefinition);
-        List<StreamColumn> columnList = streamDefinition.getColumns();
-        if(columnList!=null && columnList.size() > 0){
-            List<Object> values = new ArrayList<>(columnList.size());
-            for (StreamColumn column : columnList) {
-                
values.add(data.getOrDefault(column.getName(),column.getDefaultValue()));
-            }
-            instance.setData(values.toArray());
-        } else if(LOG.isDebugEnabled()){
-            LOG.warn("All data [{}] are ignored as no columns defined in 
schema {}",data,streamDefinition);
-        }
-        return this;
-    }
-
-    public StreamEventBuilder attributes(Map<String,Object> data){
-        return attributes(data,this.streamDefinition);
-    }
-
-    public StreamEventBuilder attributes(Object ... data){
-        instance.setData(data);
-        return this;
-    }
-
-    public StreamEventBuilder timestamep(long timestamp){
-        instance.setTimestamp(timestamp);
-        return this;
-    }
-
-    public StreamEvent build(){
-        if(instance.getStreamId() == null){
-            throw new IllegalArgumentException("streamId is null of event: " + 
instance);
-        }
-        return instance;
-    }
-
-    public StreamEventBuilder copyFrom(StreamEvent event) {
-        this.instance.copyFrom(event);
-        return this;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
deleted file mode 100644
index 06a99f4..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/IMetricSystem.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.eagle.alert.metric;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.source.MetricSource;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface IMetricSystem {
-
-    /**
-     * Initialize
-     */
-    void start();
-
-    /**
-     * Schedule reporter
-     */
-    void schedule();
-
-    /**
-     * Close and stop all resources and services
-     */
-    void stop();
-
-    /**
-     * Manual report metric
-     */
-    void report();
-
-    /**
-     *
-     * @param sink metric sink
-     */
-    void register(MetricSink sink,Config config);
-
-    /**
-     *
-     * @param source metric source
-     */
-    void register(MetricSource source);
-
-    void tags(Map<String,Object> metricTags);
-
-    MetricRegistry registry();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
deleted file mode 100644
index b91c606..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/MetricSystem.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.metric.sink.MetricSink;
-import org.apache.eagle.alert.metric.sink.MetricSinkRepository;
-import org.apache.eagle.alert.metric.source.MetricSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public class MetricSystem implements IMetricSystem {
-    private final Config config;
-    private Map<MetricSink,Config> sinks = new HashMap<>();
-//    private Map<String,MetricSource> sources = new HashMap<>();
-    private MetricRegistry registry = new MetricRegistry();
-    private boolean running;
-    private boolean initialized;
-    private final static Logger LOG = 
LoggerFactory.getLogger(MetricSystem.class);
-    private final Map<String, Object> metricTags = new HashMap<>();
-
-    public MetricSystem(Config config){
-        this.config = config;
-    }
-
-    public static MetricSystem load(Config config){
-        MetricSystem instance = new MetricSystem(config);
-        instance.loadFromConfig();
-        return instance;
-    }
-
-    @Override
-    public void tags(Map<String,Object> metricTags){
-        this.metricTags.putAll(metricTags);
-    }
-
-    @Override
-    public void start() {
-        if(initialized)
-            throw new IllegalStateException("Attempting to initialize a 
MetricsSystem that is already intialized");
-        sinks.forEach((sink,conf) -> 
sink.prepare(conf.withValue("tags",ConfigFactory.parseMap(metricTags).root()),registry));
-        initialized = true;
-    }
-
-    @Override
-    public void schedule() {
-        if(running){
-           throw  new IllegalStateException("Attempting to start a 
MetricsSystem that is already running");
-        }
-
-        sinks.keySet().forEach((sink)->sink.start(5, TimeUnit.SECONDS));
-        running = true;
-    }
-
-    public void loadFromConfig(){
-        loadSinksFromConfig();
-    }
-
-    private void loadSinksFromConfig(){
-        Config sinkCls = config.hasPath("metric.sink") ? 
config.getConfig("metric.sink") : null;
-        if(sinkCls == null){
-            // do nothing
-        }else{
-            for(String sinkType:sinkCls.root().unwrapped().keySet()){
-                
register(MetricSinkRepository.createSink(sinkType),config.getConfig("metric.sink."+sinkType));
-            }
-        }
-    }
-
-    @Override
-    public void stop() {
-        sinks.keySet().forEach(MetricSink::stop);
-    }
-
-    @Override
-    public void report() {
-        sinks.keySet().forEach(MetricSink::report);
-    }
-
-    @Override
-    public void register(MetricSink sink,Config config) {
-        LOG.debug("Register {}",sink);
-        sinks.put(sink,config);
-    }
-
-    @Override
-    public void register(MetricSource source) {
-        registry().registerAll(source.registry());
-    }
-
-    @Override
-    public MetricRegistry registry() {
-        return registry;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
deleted file mode 100644
index b5e6c63..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/entity/MetricEvent.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric.entity;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.eagle.alert.utils.DateTimeUtil;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-
-public class MetricEvent extends TreeMap<String,Object>{
-
-    private static final long serialVersionUID = 6862373651636342744L;
-
-    public static Builder of(String name){
-        return new Builder(name);
-    }
-
-    /**
-     * TODO: Refactor according to ConsoleReporter
-     */
-    public static class Builder{
-        private final String name;
-        private MetricEvent instance;
-        public Builder(String name){
-            this.instance = new MetricEvent();
-            this.name = name;
-        }
-
-        public Builder from(Counter value) {
-//            this.instance.put("type","counter");
-            this.instance.put("count",value.getCount());
-            return this;
-        }
-
-        public MetricEvent build(){
-            this.instance.put("name",name);
-            if(!this.instance.containsKey("timestamp")){
-                this.instance.put("timestamp", 
DateTimeUtil.getCurrentTimestamp());
-            }
-            return this.instance;
-        }
-
-        @SuppressWarnings({ "rawtypes", "unchecked" })
-        public Builder from(Gauge gauge) {
-            Object value = gauge.getValue();
-            if( value instanceof Map){
-                Map<? extends String, ?> map = (Map<? extends String, ?>) 
value;
-                this.instance.putAll(map);
-            } else {
-                this.instance.put("value", value);
-            }
-            return this;
-        }
-
-        public Builder from(Histogram value) {
-            this.instance.put("count",value.getCount());
-            Snapshot snapshot = value.getSnapshot();
-            this.instance.put("min", snapshot.getMin());
-            this.instance.put("max", snapshot.getMax());
-            this.instance.put("mean", snapshot.getMean());
-            this.instance.put("stddev", snapshot.getStdDev());
-            this.instance.put("median", snapshot.getMedian());
-            this.instance.put("75thPercentile", snapshot.get75thPercentile());
-            this.instance.put("95thPercentile", snapshot.get95thPercentile());
-            this.instance.put("98thPercentile", snapshot.get98thPercentile());
-            this.instance.put("99thPercentile", snapshot.get99thPercentile());
-            this.instance.put("999thPercentile", 
snapshot.get999thPercentile());
-            return this;
-        }
-
-        public Builder from(Meter value) {
-            this.instance.put("value",value.getCount());
-            this.instance.put("15MinRate",value.getFifteenMinuteRate());
-            this.instance.put("5MinRate",value.getFiveMinuteRate());
-            this.instance.put("mean",value.getMeanRate());
-            this.instance.put("1MinRate",value.getOneMinuteRate());
-            return this;
-        }
-
-        public Builder from(Timer value) {
-//            this.instance.put("type","timer");
-            this.instance.put("value",value.getCount());
-            this.instance.put("15MinRate",value.getFifteenMinuteRate());
-            this.instance.put("5MinRate",value.getFiveMinuteRate());
-            this.instance.put("mean",value.getMeanRate());
-            this.instance.put("1MinRate",value.getOneMinuteRate());
-            return this;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
deleted file mode 100644
index 42bf5d5..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/reporter/KafkaReporter.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metric.reporter;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.SortedMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.eagle.alert.metric.entity.MetricEvent;
-import org.apache.eagle.alert.utils.ByteUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.Timer;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-
-public class KafkaReporter extends ScheduledReporter {
-       private final static Logger LOG = 
LoggerFactory.getLogger(KafkaReporter.class);
-       private final String topic;
-       private final Properties properties;
-       private final Producer<byte[], String> producer;
-       private final Map<String, Object> additionalFields;
-
-       protected KafkaReporter(MetricRegistry registry, MetricFilter filter, 
TimeUnit rateUnit, TimeUnit durationUnit, String topic, Properties config, 
Map<String, Object> additionalFields) {
-               super(registry, "kafka-reporter", filter, rateUnit, 
durationUnit);
-               this.topic = topic;
-               this.properties = new Properties();
-               Preconditions.checkNotNull(topic,"topic should not be null");
-//             properties.put("bootstrap.servers", brokerList);
-//             properties.put("metadata.broker.list", brokerList);
-               properties.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-               properties.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-               properties.put("request.required.acks", "1");
-               
properties.put("key.deserializer","org.apache.kafka.common.serialization.ByteArraySerializer");
-               
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
-               if(config != null) {
-                       LOG.info(config.toString());
-                       properties.putAll(config);
-               }
-               this.additionalFields = additionalFields;
-               this.producer = new KafkaProducer<>(properties);
-               LOG.info("Initialized kafka-reporter");
-       }
-
-       @SuppressWarnings("rawtypes")
-    @Override
-       public void report(SortedMap<String, Gauge> gauges, SortedMap<String, 
Counter> counters, SortedMap<String, Histogram> histograms, SortedMap<String, 
Meter> meters, SortedMap<String, Timer> timers) {
-               for(SortedMap.Entry<String, Gauge> entry:gauges.entrySet()){
-                       
onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-               }
-               for(SortedMap.Entry<String, Counter> entry:counters.entrySet()){
-                       
onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-               }
-               for(SortedMap.Entry<String, Histogram> 
entry:histograms.entrySet()){
-                       
onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-               }
-               for(SortedMap.Entry<String, Meter> entry:meters.entrySet()){
-                       
onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-               }
-               for(SortedMap.Entry<String, Timer> entry:timers.entrySet()){
-                       
onMetricEvent(MetricEvent.of(entry.getKey()).from(entry.getValue()).build());
-               }
-       }
-
-       private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-       private void onMetricEvent(MetricEvent event){
-               try {
-                       if(additionalFields!=null){
-                               event.putAll(additionalFields);
-                       }
-                       // TODO: Support configurable partition key
-                       byte[] key = ByteUtils.intToBytes(event.hashCode());
-                       ProducerRecord<byte[],String> record  = new 
ProducerRecord<>(topic, key, OBJECT_MAPPER.writeValueAsString(event));
-                       // TODO: Support configuration timeout
-                       this.producer.send(record).get(5,TimeUnit.SECONDS);
-               } catch (JsonProcessingException e) {
-                       LOG.error("Failed to serialize {} as json",event,e);
-               } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                       LOG.error("Failed to produce message to topic 
{}",topic,e);
-               }
-       }
-
-       @Override
-       public void stop() {
-               this.producer.close();
-               super.stop();
-       }
-
-       @Override
-       public void close() {
-               this.producer.close();
-               super.close();
-       }
-
-       public static Builder forRegistry(MetricRegistry registry){
-               return new Builder(registry);
-       }
-
-       public static class Builder{
-               private final MetricRegistry registry;
-               private TimeUnit rateUnit;
-               private TimeUnit durationUnit;
-               private MetricFilter filter;
-               private String topic;
-               private Properties properties;
-               private Map<String, Object> additionalFields;
-
-               private Builder(MetricRegistry registry) {
-                       this.registry = registry;
-                       this.rateUnit = TimeUnit.SECONDS;
-                       this.durationUnit = TimeUnit.MILLISECONDS;
-                       this.filter = MetricFilter.ALL;
-               }
-
-               /**
-                * Convert rates to the given time unit.
-                *
-                * @param rateUnit a unit of time
-                * @return {@code this}
-                */
-               public Builder convertRatesTo(TimeUnit rateUnit) {
-                       this.rateUnit = rateUnit;
-                       return this;
-               }
-
-               /**
-                * Convert durations to the given time unit.
-                *
-                * @param durationUnit a unit of time
-                * @return {@code this}
-                */
-               public Builder convertDurationsTo(TimeUnit durationUnit) {
-                       this.durationUnit = durationUnit;
-                       return this;
-               }
-
-               /**
-                * Only report metrics which match the given filter.
-                *
-                * @param filter a {@link MetricFilter}
-                * @return {@code this}
-                */
-               public Builder filter(MetricFilter filter) {
-                       this.filter = filter;
-                       return this;
-               }
-
-               public Builder topic(String topic){
-                       this.topic = topic;
-                       return this;
-               }
-
-               public Builder config(Properties properties){
-                       this.properties = properties;
-                       return this;
-               }
-
-               /**
-                * Builds a {@link ConsoleReporter} with the given properties.
-                *
-                * @return a {@link ConsoleReporter}
-                */
-               public KafkaReporter build() {
-                       if(topic == null && properties!=null) topic = 
properties.getProperty("topic");
-                       return new 
KafkaReporter(registry,filter,rateUnit,durationUnit,topic,properties,additionalFields);
-               }
-
-               @SuppressWarnings("serial")
-        public Builder config(Config config) {
-                       this.config(new Properties(){{
-                               putAll(config.root().unwrapped());
-                       }});
-                       return this;
-               }
-
-               public Builder addFields(Map<String, Object> tags) {
-                       this.additionalFields = tags;
-                       return this;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
 
b/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
deleted file mode 100644
index fd6cc41..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-common/src/main/java/org/apache/eagle/alert/metric/sink/ConsoleSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.eagle.alert.metric.sink;
-
-import java.util.concurrent.TimeUnit;
-
-import com.codahale.metrics.ConsoleReporter;
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public class ConsoleSink implements MetricSink {
-    private ConsoleReporter reporter;
-    @Override
-    public void prepare(Config config, MetricRegistry registry) {
-        reporter = ConsoleReporter.forRegistry(registry).build();
-    }
-
-    @Override
-    public void start(long period,TimeUnit unit) {
-        reporter.start(period, unit);
-    }
-
-    @Override
-    public void stop() {
-        reporter.stop();
-        reporter.close();
-    }
-
-    @Override
-    public void report() {
-        reporter.report();
-    }
-}
\ No newline at end of file

Reply via email to