Repository: incubator-eagle
Updated Branches:
  refs/heads/master 91aa216a8 -> 3ee73e8d5


add metric topology for offline metric collection


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/34fa6d90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/34fa6d90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/34fa6d90

Branch: refs/heads/master
Commit: 34fa6d90a8cfe21a8ad85ce852b648a4a05cc363
Parents: 6f29955
Author: sunlibin <abnersunli...@gmail.com>
Authored: Sat Nov 21 18:04:54 2015 +0800
Committer: sunlibin <abnersunli...@gmail.com>
Committed: Mon Nov 30 11:48:27 2015 +0800

----------------------------------------------------------------------
 .../apache/eagle/executor/AlertExecutor.java    |  10 +-
 .../config/RunningJobCrawlConfig.java           |  14 +-
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |   4 -
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |   1 -
 .../impl/storm/zookeeper/ZKStateConfig.java     |  28 ++++
 .../eagle/datastream/StreamProducer.scala       |   4 +
 .../org/apache/eagle/metric/CountingMetric.java |   8 +-
 .../java/org/apache/eagle/metric/Metric.java    |   5 +-
 .../manager/EagleMetricReportManager.java       |  45 +++++++
 .../metric/report/EagleSerivceMetricReport.java |  61 ---------
 .../metric/report/EagleServiceMetricReport.java |  60 +++++++++
 .../metric/report/MetricEntityConvert.java      |   2 +-
 .../eagle/metric/report/MetricReport.java       |   4 +-
 eagle-security/eagle-metric-collection/pom.xml  |  95 ++++++++++++++
 .../metric/kafka/EagleMetricCollectorMain.java  | 127 ++++++++++++++++++
 .../eagle/metric/kafka/KafkaConsumerOffset.java |  27 ++++
 .../kafka/KafkaConsumerOffsetFetcher.java       |  70 ++++++++++
 .../metric/kafka/KafkaLatestOffsetFetcher.java  |  98 ++++++++++++++
 .../kafka/KafkaMessageDistributionExecutor.java | 126 ++++++++++++++++++
 .../metric/kafka/KafkaOffsetCheckerConfig.java  |  50 +++++++
 .../kafka/KafkaOffsetSourceSpoutProvider.java   |  53 ++++++++
 .../eagle/metric/kafka/KafkaOffsetSpout.java    | 131 +++++++++++++++++++
 .../src/main/resources/application.conf         |  39 ++++++
 .../src/main/resources/log4j.properties         |  39 ++++++
 .../src/test/java/TestKafkaOffset.java          |   2 +
 .../auditlog/HdfsAuditLogProcessorMain.java     |   1 -
 ...HiveJobRunningSourcedStormSpoutProvider.java |   2 +-
 eagle-security/pom.xml                          |   1 +
 eagle-topology-assembly/pom.xml                 |   5 +
 29 files changed, 1021 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
index 4f0f4b3..d86a846 100644
--- 
a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
+++ 
b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -28,7 +28,7 @@ import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.metric.CountingMetric;
 import org.apache.eagle.metric.Metric;
-import org.apache.eagle.metric.report.EagleSerivceMetricReport;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
 import com.sun.jersey.client.impl.CopyOnWriteHashMap;
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.policy.*;
@@ -73,7 +73,7 @@ public class AlertExecutor extends 
JavaStormStreamExecutor2<String, AlertAPIEnti
        private Map<String, Map<String, String>> dimensionsMap; // cache it for 
performance
        private Map<String, String> baseDimensions;
        private Thread metricReportThread;
-       private EagleSerivceMetricReport metricReport;
+       private EagleServiceMetricReport metricReport;
 
        public AlertExecutor(String alertExecutorId, PolicyPartitioner 
partitioner, int numPartitions, int partitionSeq,
                          AlertDefinitionDAO alertDefinitionDao, String[] 
sourceStreams){
@@ -122,7 +122,7 @@ public class AlertExecutor extends 
JavaStormStreamExecutor2<String, AlertAPIEnti
                                          
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : 
null;
                String password = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
                                          
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : 
null;
-               this.metricReport = new 
EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, 
password);
+               this.metricReport = new 
EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, 
password);
 
                metricMap = new ConcurrentHashMap<String, Metric>();
                baseDimensions = new HashMap<String, String>();
@@ -172,7 +172,7 @@ public class AlertExecutor extends 
JavaStormStreamExecutor2<String, AlertAPIEnti
                }
                
                policyEvaluators = new CopyOnWriteHashMap<>();
-               // for efficency, we don't put single policy evaluator 
+               // for efficiency, we don't put single policy evaluator
                policyEvaluators.putAll(tmpPolicyEvaluators);
                DynamicPolicyLoader policyLoader = 
DynamicPolicyLoader.getInstance();
                
@@ -258,7 +258,7 @@ public class AlertExecutor extends 
JavaStormStreamExecutor2<String, AlertAPIEnti
                                                long previous = 
metric.getTimestamp();
                                                if (current > previous + 
MERITE_GRANULARITY) {
                                                        metricList.add(metric);
-                                                       metricMap.put(name, new 
CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), 
metric.getMetricName()));
+                                                       metricMap.put(name, new 
CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDimensions(), 
metric.getMetricName()));
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
index b17a41d..79a8928 100644
--- 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
+++ 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/config/RunningJobCrawlConfig.java
@@ -16,10 +16,11 @@
  */
 package org.apache.eagle.jobrunning.config;
 
-import java.io.Serializable;
-
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.job.JobPartitioner;
 
+import java.io.Serializable;
+
 public class RunningJobCrawlConfig implements Serializable{
        private static final long serialVersionUID = 1L;
        public RunningJobEndpointConfig endPointConfig;
@@ -49,13 +50,4 @@ public class RunningJobCrawlConfig implements Serializable{
         public Class<? extends JobPartitioner> partitionerCls;
         public int numTotalPartitions = 1;
     }
-    
-       public static class ZKStateConfig implements Serializable{
-               private static final long serialVersionUID = 1L;
-               public String zkQuorum;
-        public String zkRoot;
-        public int zkSessionTimeoutMs;
-        public int zkRetryTimes;
-        public int zkRetryInterval;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 06d37ef..373b3ca 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -51,10 +51,6 @@ public class KafkaSourcedSpoutProvider extends 
AbstractStormSpoutProvider{
                String zkConnString = 
context.getString("dataSourceConfig.zkConnection");
                // transaction zkRoot
                String zkRoot = 
context.getString("dataSourceConfig.transactionZKRoot");
-               // Site
-               String site = context.getString("eagleProps.site");
-
-        //String realTopic = (site ==null)? topic : 
String.format("%s_%s",site,topic);
 
         LOG.info(String.format("Use topic id: %s",topic));
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index 8bdbcb5..8b65c1f 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -51,7 +51,6 @@ public class KafkaSourcedSpoutScheme implements Scheme {
        @Override
        public List<Object> deserialize(byte[] ser) {
                Object tmp = deserializer.deserialize(ser);
-               Map<String, Object> map = (Map<String, Object>)tmp;
                if(tmp == null)
                        return null;
                // the following tasks are executed within the same process of 
kafka spout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
new file mode 100644
index 0000000..f9515f5
--- /dev/null
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/zookeeper/ZKStateConfig.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.zookeeper;
+
+import java.io.Serializable;
+
+public class ZKStateConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public String zkQuorum;
+    public String zkRoot;
+    public int zkSessionTimeoutMs;
+    public int zkRetryTimes;
+    public int zkRetryInterval;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
index 9fb3e22..40d4904 100644
--- 
a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
+++ 
b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -103,6 +103,10 @@ trait StreamProducer{
     ret
   }
 
+  def streamUnion(others : util.List[StreamProducer]) : StreamProducer = {
+    streamUnion(others);
+  }
+
   def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
     val ret = StreamUnionProducer(incrementAndGetId(), others)
     hookupDAG(graph, this, ret)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
index f4d5cd5..4f65b8e 100644
--- 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
+++ 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/CountingMetric.java
@@ -25,16 +25,20 @@ import com.google.common.util.concurrent.AtomicDouble;
  */
 public class CountingMetric extends Metric{
 
+    public CountingMetric(long timestamp, Map<String, String> dimensions, 
String metricName, double value) {
+        super(timestamp, dimensions, metricName, new AtomicDouble(value));
+    }
+
     public CountingMetric(long timestamp, Map<String, String> dimensions, 
String metricName, AtomicDouble value) {
        super(timestamp, dimensions, metricName, value);
     }
-  
+
     public CountingMetric(long timestamp, Map<String, String> dimensions, 
String metricName) {
           this(timestamp, dimensions, metricName, new AtomicDouble(0.0));
     }
 
     public CountingMetric(CountingMetric metric) {
-        this(metric.timestamp, new HashMap<String, String>(metric.dimensions), 
metric.metricName, metric.value);
+        this(metric.timestamp, new HashMap<>(metric.dimensions), 
metric.metricName, metric.value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
index 993906e..616c82b 100644
--- a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
+++ b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/Metric.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.metric;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -32,7 +33,7 @@ public abstract class Metric implements MetricOperator{
     
     public Metric(long timestamp, Map<String, String> dimensions, String 
metricName, AtomicDouble value) {
        this.timestamp = timestamp;
-       this.dimensions = dimensions;
+       this.dimensions = new HashMap<>(dimensions);
        this.metricName = metricName;
           this.value = value;
     }
@@ -45,7 +46,7 @@ public abstract class Metric implements MetricOperator{
         return timestamp;
      }
 
-     public Map<String, String> getDemensions() {
+     public Map<String, String> getDimensions() {
         return dimensions;
      }
    

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
new file mode 100644
index 0000000..b63944d
--- /dev/null
+++ 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/manager/EagleMetricReportManager.java
@@ -0,0 +1,45 @@
+package org.apache.eagle.metric.manager;
+
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.report.MetricReport;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class EagleMetricReportManager {
+
+    private static EagleMetricReportManager manager = new 
EagleMetricReportManager();
+    private Map<String, MetricReport> metricReportMap = new 
ConcurrentHashMap<>();
+
+    private EagleMetricReportManager() {
+
+    }
+
+    public static EagleMetricReportManager getInstance () {
+        return manager;
+    }
+
+    public boolean register(String name, MetricReport report) {
+       if (metricReportMap.get(name) == null) {
+           synchronized (metricReportMap) {
+               if (metricReportMap.get(name) == null) {
+                   metricReportMap.put(name, report);
+                   return true;
+               }
+            }
+        }
+        return false;
+    }
+
+    public Map<String, MetricReport> getRegisteredReports() {
+        return metricReportMap;
+    }
+
+    public void emit(List<Metric> list) {
+        synchronized (this.metricReportMap) {
+            for (MetricReport report : metricReportMap.values()) {
+                report.emit(list);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
deleted file mode 100644
index 31056f2..0000000
--- 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleSerivceMetricReport.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.report;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.metric.Metric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.log.entity.GenericMetricEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-
-public class EagleSerivceMetricReport implements MetricReport{
-               
-    private EagleServiceClientImpl client;
-       private static final Logger LOG = 
LoggerFactory.getLogger(EagleSerivceMetricReport.class);
-
-       public EagleSerivceMetricReport(String host, int port, String username, 
String password) {
-               client = new EagleServiceClientImpl(host, port, username, 
password);
-       }
-
-    public EagleSerivceMetricReport(String host, int port) {
-        client = new EagleServiceClientImpl(host, port, null, null);
-    }
-        
-       public void emit(List<Metric> list) {
-               List<GenericMetricEntity> entities = new 
ArrayList<GenericMetricEntity>();
-               for (Metric metric : list) {
-                       entities.add(MetricEntityConvert.convert(metric));
-               }
-               try {
-                       int total = entities.size();
-                       GenericServiceAPIResponseEntity<String> response = 
client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
-            if(response.isSuccess()) {
-                LOG.info("Wrote " + total + " entities to service");
-            }else{
-                LOG.error("Failed to write " + total + " entities to service, 
due to server exception: "+ response.getException());
-            }
-               }
-               catch (Exception ex) {
-            LOG.error("Got exception while writing entities: ", ex);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
new file mode 100644
index 0000000..7ff415e
--- /dev/null
+++ 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/EagleServiceMetricReport.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.report;
+
+import org.apache.eagle.log.entity.GenericMetricEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EagleServiceMetricReport implements MetricReport{
+               
+    private EagleServiceClientImpl client;
+       private static final Logger LOG = 
LoggerFactory.getLogger(EagleServiceMetricReport.class);
+
+       public EagleServiceMetricReport(String host, int port, String username, 
String password) {
+               client = new EagleServiceClientImpl(host, port, username, 
password);
+       }
+
+    public EagleServiceMetricReport(String host, int port) {
+       client = new EagleServiceClientImpl(host, port, null, null);
+    }
+
+       public void emit(List<Metric> list) {
+               List<GenericMetricEntity> entities = new 
ArrayList<GenericMetricEntity>();
+               for (Metric metric : list) {
+                       entities.add(MetricEntityConvert.convert(metric));
+               }
+               try {
+                       int total = entities.size();
+                       GenericServiceAPIResponseEntity<String> response = 
client.create(entities, GenericMetricEntity.GENERIC_METRIC_SERVICE);
+            if(response.isSuccess()) {
+                LOG.info("Wrote " + total + " entities to service");
+            }else{
+                LOG.error("Failed to write " + total + " entities to service, 
due to server exception: "+ response.getException());
+            }
+               }
+               catch (Exception ex) {
+            LOG.error("Got exception while writing entities: ", ex);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
index 10f05ca..c389fa7 100644
--- 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
+++ 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricEntityConvert.java
@@ -25,7 +25,7 @@ public class MetricEntityConvert {
         GenericMetricEntity entity = new GenericMetricEntity();
         entity.setPrefix(metric.getMetricName());
         entity.setValue(new double[]{metric.getValue().get()});
-        entity.setTags(metric.getDemensions());
+        entity.setTags(metric.getDimensions());
         entity.setTimestamp(metric.getTimestamp());
         return entity;
        }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
index c03a89f..85d423b 100644
--- 
a/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
+++ 
b/eagle-core/eagle-metric/src/main/java/org/apache/eagle/metric/report/MetricReport.java
@@ -21,6 +21,6 @@ import java.util.List;
 import org.apache.eagle.metric.Metric;
 
 public interface MetricReport {
-        
-       public void emit(List<Metric> list);
+       // The method should be thread safe
+       void emit(List<Metric> list);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/pom.xml 
b/eagle-security/eagle-metric-collection/pom.xml
new file mode 100644
index 0000000..f2e78a6
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/pom.xml
@@ -0,0 +1,95 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>eagle</groupId>
+    <artifactId>eagle-security-parent</artifactId>
+    <version>0.3.0</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-metric-collection</artifactId>
+  <packaging>jar</packaging>
+  <name>eagle-metric-collection</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-security-hdfs-auditlog</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.storm</groupId>
+        <artifactId>storm-core</artifactId>
+        <exclusions>
+            <exclusion>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>log4j</groupId>
+                <artifactId>log4j</artifactId>
+            </exclusion>
+            <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>log4j-over-slf4j</artifactId>
+            </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>storm-kafka</artifactId>
+          <exclusions>
+              <exclusion>
+                  <groupId>ch.qos.logback</groupId>
+                  <artifactId>logback-classic</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>log4j</groupId>
+                  <artifactId>log4j</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>log4j-over-slf4j</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-alert-process</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-stream-process-api</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>com.hierynomus</groupId>
+          <artifactId>sshj</artifactId>
+          <version>0.13.0</version>
+      </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
new file mode 100644
index 0000000..65fe68a
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.apache.eagle.dataproc.util.ConfigOptionParser;
+import org.apache.eagle.datastream.ExecutionEnvironmentFactory;
+import org.apache.eagle.datastream.StormExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class EagleMetricCollectorMain {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EagleMetricCollectorMain.class);
+
+    public static void main(String[] args) throws Exception {
+        new ConfigOptionParser().load(args);
+        //System.setProperty("config.resource", "/application.local.conf");
+
+        Config config = ConfigFactory.load();
+
+        StormExecutionEnvironment env = 
ExecutionEnvironmentFactory.getStorm(config);
+
+        String deserClsName = 
config.getString("dataSourceConfig.deserializerClass");
+        final KafkaSourcedSpoutScheme scheme = new 
KafkaSourcedSpoutScheme(deserClsName, config) {
+            @Override
+            public List<Object> deserialize(byte[] ser) {
+                Object tmp = deserializer.deserialize(ser);
+                Map<String, Object> map = (Map<String, Object>)tmp;
+                if(tmp == null) return null;
+                return Arrays.asList(map.get("user"), map.get("timestamp"));
+            }
+        };
+
+        KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new 
KafkaSourcedSpoutProvider() {
+            @Override
+            public BaseRichSpout getSpout(Config context) {
+                // Kafka topic
+                String topic = context.getString("dataSourceConfig.topic");
+                // Kafka consumer group id
+                String groupId = 
context.getString("dataSourceConfig.metricCollectionConsumerId");
+                // Kafka fetch size
+                int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+                // Kafka deserializer class
+                String deserClsName = 
context.getString("dataSourceConfig.deserializerClass");
+
+                // Kafka broker zk connection
+                String zkConnString = 
context.getString("dataSourceConfig.zkQuorum");
+
+                // transaction zkRoot
+                String zkRoot = 
context.getString("dataSourceConfig.transactionZKRoot");
+
+                LOG.info(String.format("Use topic id: %s",topic));
+
+                String brokerZkPath = null;
+                if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+                    brokerZkPath = 
context.getString("dataSourceConfig.brokerZkPath");
+                }
+
+                BrokerHosts hosts;
+                if(brokerZkPath == null) {
+                    hosts = new ZkHosts(zkConnString);
+                } else {
+                    hosts = new ZkHosts(zkConnString, brokerZkPath);
+                }
+
+                SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                        topic,
+                        zkRoot + "/" + topic,
+                        groupId);
+
+                // transaction zkServers
+                String[] zkConnections = zkConnString.split(",");
+                List<String> zkHosts = new ArrayList<>();
+                for (String zkConnection : zkConnections) {
+                    zkHosts.add(zkConnection.split(":")[0]);
+                }
+                Integer zkPort = 
Integer.valueOf(zkConnections[0].split(":")[1]);
+
+                spoutConfig.zkServers = zkHosts;
+                // transaction zkPort
+                spoutConfig.zkPort = zkPort;
+                // transaction update interval
+                spoutConfig.stateUpdateIntervalMs = 
context.getLong("dataSourceConfig.transactionStateUpdateMS");
+                // Kafka fetch size
+                spoutConfig.fetchSizeBytes = fetchSize;
+
+                spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+                return kafkaSpout;
+            }
+        };
+
+        env.newSource(new 
KafkaOffsetSourceSpoutProvider().getSpout(config)).renameOutputFields(0).withName("kafkaLogLagChecker");
+        
env.newSource(kafkaMessageSpoutProvider.getSpout(config)).renameOutputFields(2).withName("kafkaMessageDistributionCheck").groupBy(Arrays.asList(0))
+                .flatMap(new KafkaMessageDistributionExecutor());
+        env.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
new file mode 100644
index 0000000..5cf0e11
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import java.util.Map;
+
+public class KafkaConsumerOffset {
+    public Map<String, String> topology;
+    public Long offset;
+    public Long partition;
+    public Map<String, String> broker;
+    public String topic;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
new file mode 100644
index 0000000..f34f195
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaConsumerOffsetFetcher.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConsumerOffsetFetcher {
+
+    public CuratorFramework curator;
+    public String zkRoot;
+    public ObjectMapper mapper;
+    public String topic;
+    public String group;
+
+    public KafkaConsumerOffsetFetcher(ZKStateConfig config, String topic, 
String group) {
+        try {
+            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, 
config.zkSessionTimeoutMs, 15000,
+                    new RetryNTimes(config.zkRetryTimes, 
config.zkRetryInterval));
+            curator.start();
+            this.zkRoot = config.zkRoot;
+            mapper = new ObjectMapper();
+            Module module = new SimpleModule("offset").registerSubtypes(new 
NamedType(KafkaConsumerOffset.class));
+            mapper.registerModule(module);
+            this.topic = topic;
+            this.group = group;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, Long> fetch() throws Exception {
+        Map<String, Long> map = new HashMap<String, Long>();
+        String path = zkRoot + "/" + topic + "/" + group;
+        if (curator.checkExists().forPath(path) != null) {
+            List<String> partitions = curator.getChildren().forPath(path);
+            for (String partition : partitions) {
+                String partitionPath = path + "/" + partition;
+                String data = new 
String(curator.getData().forPath(partitionPath));
+                KafkaConsumerOffset offset = mapper.readValue(data, 
KafkaConsumerOffset.class);
+                map.put(partition, offset.offset);
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
new file mode 100644
index 0000000..de93ea3
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import java.util.*;
+
+public class KafkaLatestOffsetFetcher {
+
+    private List<String> brokerList;
+    private int port;
+
+    public KafkaLatestOffsetFetcher(String connectString) {
+        brokerList = new ArrayList<>();
+        String[] brokers = connectString.split(",");
+        for (String broker : brokers) {
+            brokerList.add(broker.split(":")[0]);
+        }
+        this.port = Integer.valueOf(brokers[0].split(":")[1]);
+    }
+
+    public Map<Integer, Long> fetch(String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> metadatas = 
fetchPartitionMetadata(brokerList, port, topic, partitionCount);
+        Map<Integer, Long> ret = new HashMap<>();
+        for (int partition = 0; partition < partitionCount; partition++) {
+            PartitionMetadata metadata = metadatas.get(partition);
+            if (metadata == null) {
+                throw new RuntimeException("Can't find metadata for Topic and 
Partition. Exiting");
+            }
+            if (metadata.leader() == null) {
+                throw new RuntimeException("Can't find Leader for Topic and 
Partition. Exiting");
+            }
+            String leadBroker = metadata.leader().host();
+            String clientName = "Client_" + topic + "_" + partition;
+            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 
100000, 64 * 1024, clientName);
+            long lastestOffset = getLatestOffset(consumer, topic, partition, 
clientName);
+            if (consumer != null) consumer.close();
+            ret.put(partition, lastestOffset);
+        }
+        return ret;
+    }
+
+    public long getLatestOffset(SimpleConsumer consumer, String topic, int 
partition, String clientName) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, 
partition);
+        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> 
requestInfo = new HashMap<>();
+        requestInfo.put(topicAndPartition, new 
PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        kafka.javaapi.OffsetRequest request = new 
kafka.javaapi.OffsetRequest(requestInfo, 
kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+        if (response.hasError()) {
+            throw new RuntimeException("Error fetching data offset from the 
broker. Reason: " + response.errorCode(topic, partition) );
+        }
+        long[] offsets = response.offsets(topic, partition);
+        return offsets[0];
+    }
+
+    private Map<Integer, PartitionMetadata> 
fetchPartitionMetadata(List<String> brokerList, int port, String topic, int 
partitionCount) {
+        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
+        for (String broker : brokerList) {
+            SimpleConsumer consumer = null;
+            try {
+                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, 
"leaderLookup");
+                List<String> topics = Collections.singletonList(topic);
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+                List<TopicMetadata> metaData = resp.topicsMetadata();
+                for (TopicMetadata item : metaData) {
+                    for (PartitionMetadata part : item.partitionsMetadata()) {
+                        partitionMetadata.put(part.partitionId(), part);
+                    }
+                }
+                if (partitionMetadata.size() == partitionCount) break;
+            } catch (Exception e) {
+                throw new RuntimeException("Error communicating with Broker [" 
+ broker + "] " + "to find Leader for [" + topic + "] Reason: " + e);
+            } finally {
+                if (consumer != null) consumer.close();
+            }
+        }
+        return partitionMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
new file mode 100644
index 0000000..be6d0f7
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.datastream.Tuple1;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class KafkaMessageDistributionExecutor extends 
JavaStormStreamExecutor1<String> {
+
+    private Config config;
+    private Map<String, String> baseMetricDimension;
+    private Map<String, EventMetric> eventMetrics;
+    private static final long DEFAULT_METRIC_GRANULARITY = 5 * 60 * 1000;
+    private static final String metricName = "kafka.message.user.count";
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
+
+    public static class EventMetric {
+        long latestMessageTime;
+        Metric metric;
+
+        public EventMetric(long latestMessageTime, Metric metric) {
+            this.latestMessageTime = latestMessageTime;
+            this.metric = metric;
+        }
+
+        public void update(double d) {
+            this.metric.update(d);
+        }
+    }
+
+    @Override
+    public void prepareConfig(Config config) {
+        this.config = config;
+    }
+
+    @Override
+    public void init() {
+        String site = config.getString("dataSourceConfig.site");
+        String topic = config.getString("dataSourceConfig.topic");
+        this.baseMetricDimension = new HashMap<>();
+        this.baseMetricDimension.put("site", site);
+        this.baseMetricDimension.put("topic", topic);
+        String eagleServiceHost = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + 
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + 
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+        EagleServiceMetricReport report = new 
EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, 
password);
+        
EagleMetricReportManager.getInstance().register("metricCollectServiceReport", 
report);
+        eventMetrics = new ConcurrentHashMap<>();
+    }
+
+    public long trimTimestamp(long timestamp, long granularity) {
+        return timestamp / granularity * granularity;
+    }
+
+    public void putNewMetric(long currentMessageTime, String user) {
+        Map<String ,String> dimensions = new HashMap<>();
+        dimensions.putAll(baseMetricDimension);
+        dimensions.put("user", user);
+        long trimTimestamp = trimTimestamp(currentMessageTime, 
DEFAULT_METRIC_GRANULARITY);
+        Metric metric = new CountingMetric(trimTimestamp, dimensions, 
metricName, 1);
+        eventMetrics.put(user, new EventMetric(currentMessageTime, metric));
+    }
+
+    public void update(long currentMessageTime, String user) {
+        if (eventMetrics.get(user) == null) {
+            LOG.info("Got metrics for new user: " + user);
+            putNewMetric(currentMessageTime, user);
+        }
+        else {
+            long latestMessageTime = eventMetrics.get(user).latestMessageTime;
+            if (currentMessageTime > latestMessageTime + 
DEFAULT_METRIC_GRANULARITY) {
+                
EagleMetricReportManager.getInstance().emit(Arrays.asList(eventMetrics.remove(user).metric));
+                putNewMetric(currentMessageTime, user);
+            }
+            else {
+                eventMetrics.get(user).update(1);
+            }
+        }
+    }
+
+    @Override
+    public void flatMap(List<Object> input, Collector<Tuple1<String>> 
collector) {
+        try {
+            String user = (String) input.get(0);
+            Long timestamp = (Long) (input.get(1));
+            update(timestamp, user);
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception, ex: ", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
new file mode 100644
index 0000000..5a06c82
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetCheckerConfig.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+
+import java.io.Serializable;
+
+public class KafkaOffsetCheckerConfig implements Serializable {
+    public static class KafkaConfig implements Serializable{
+        public String kafkaEndPoints;
+        public String topic;
+        public String site;
+        public String group;
+    }
+
+    public static class ServiceConfig implements Serializable{
+        public String serviceHost;
+        public Integer servicePort;
+        public String username;
+        public String password;
+    }
+
+    public ZKStateConfig zkConfig;
+    public KafkaConfig kafkaConfig;
+    public ServiceConfig serviceConfig;
+
+    public KafkaOffsetCheckerConfig (ServiceConfig serviceConfig, 
ZKStateConfig zkConfig, KafkaConfig kafkaConfig) {
+        this.serviceConfig = serviceConfig;
+        this.zkConfig = zkConfig;
+        this.kafkaConfig = kafkaConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
new file mode 100644
index 0000000..c794632
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSourceSpoutProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaOffsetSourceSpoutProvider {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetSourceSpoutProvider.class);
+       
+       public BaseRichSpout getSpout(Config config){
+
+               ZKStateConfig zkStateConfig = new ZKStateConfig();
+               zkStateConfig.zkQuorum = 
config.getString("dataSourceConfig.zkQuorum");
+               zkStateConfig.zkRoot = 
config.getString("dataSourceConfig.transactionZKRoot");
+               zkStateConfig.zkSessionTimeoutMs = 
config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+               zkStateConfig.zkRetryTimes = 
config.getInt("dataSourceConfig.zkRetryTimes");
+               zkStateConfig.zkRetryInterval = 
config.getInt("dataSourceConfig.zkRetryInterval");
+
+               KafkaOffsetCheckerConfig.ServiceConfig serviceConfig = new 
KafkaOffsetCheckerConfig.ServiceConfig();
+               serviceConfig.serviceHost = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+               serviceConfig.servicePort = 
config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+               serviceConfig.username = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+               serviceConfig.password = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+
+               KafkaOffsetCheckerConfig.KafkaConfig kafkaConfig = new 
KafkaOffsetCheckerConfig.KafkaConfig();
+               kafkaConfig.kafkaEndPoints = 
config.getString("dataSourceConfig.kafkaEndPoints");
+               kafkaConfig.site = config.getString("dataSourceConfig.site");
+               kafkaConfig.topic = config.getString("dataSourceConfig.topic");
+               kafkaConfig.group = 
config.getString("dataSourceConfig.hdfsTopologyConsumerGroupId");
+               KafkaOffsetCheckerConfig checkerConfig = new 
KafkaOffsetCheckerConfig(serviceConfig, zkStateConfig, kafkaConfig);
+               KafkaOffsetSpout spout = new KafkaOffsetSpout(checkerConfig);
+               return spout;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
new file mode 100644
index 0000000..d6f7298
--- /dev/null
+++ 
b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaOffsetSpout.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.manager.EagleMetricReportManager;
+import org.apache.eagle.metric.report.EagleServiceMetricReport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaOffsetSpout extends BaseRichSpout {
+       private static final long serialVersionUID = 1L;
+       private static final long DEFAULT_ROUND_INTERVALS = 5 * 60 * 1000;
+       private KafkaOffsetCheckerConfig config;
+       private KafkaConsumerOffsetFetcher consumerOffsetFetcher;
+       private KafkaLatestOffsetFetcher latestOffsetFetcher;
+       private Map<String, String> baseMetricDimension;
+       private long lastRoundTime = 0;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetSpout.class);
+
+       public KafkaOffsetSpout(KafkaOffsetCheckerConfig config) {//Config 
config, ZKStateConfig zkStateConfig, String kafkaEndPoints){
+               this.config = config;
+       }
+       
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+               consumerOffsetFetcher = new 
KafkaConsumerOffsetFetcher(config.zkConfig, config.kafkaConfig.topic, 
config.kafkaConfig.group);
+               latestOffsetFetcher = new 
KafkaLatestOffsetFetcher(config.kafkaConfig.kafkaEndPoints);
+
+               this.baseMetricDimension = new HashMap<>();
+               this.baseMetricDimension.put("site", config.kafkaConfig.site);
+               this.baseMetricDimension.put("topic", config.kafkaConfig.topic);
+               this.baseMetricDimension.put("group", config.kafkaConfig.group);
+               String eagleServiceHost = config.serviceConfig.serviceHost;
+               Integer eagleServicePort = config.serviceConfig.servicePort;
+               String username = config.serviceConfig.serviceHost;
+               String password = config.serviceConfig.serviceHost;
+               EagleServiceMetricReport report = new 
EagleServiceMetricReport(eagleServiceHost, eagleServicePort, username, 
password);
+               
EagleMetricReportManager.getInstance().register("metricCollectServiceReport", 
report);
+       }
+
+       public Metric constructMetric(long timestamp, String partition, double 
value) {
+               Map<String, String> dimensions = new HashMap<>();
+               dimensions.putAll(baseMetricDimension);
+               dimensions.put("partition", partition);
+               String metricName = "eagle.kafka.message.consumer.lag";
+               Metric metric = new CountingMetric(timestamp, dimensions, 
metricName, value);
+               return metric;
+       }
+
+       @Override
+       public void nextTuple() {
+               Long currentTime = System.currentTimeMillis();
+               if (currentTime - lastRoundTime > DEFAULT_ROUND_INTERVALS) {
+                       try {
+                               Map<String, Long> consumedOffset = 
consumerOffsetFetcher.fetch();
+                               Map<Integer, Long> latestOffset = 
latestOffsetFetcher.fetch(config.kafkaConfig.topic, consumedOffset.size());
+                               List<Metric> list = new ArrayList<>();
+                               for (Map.Entry<String, Long> entry : 
consumedOffset.entrySet()) {
+                                       String partition = entry.getKey();
+                                       Integer partitionNumber = 
Integer.valueOf(partition.split("_")[1]);
+                                       Long lag = 
latestOffset.get(partitionNumber) - entry.getValue();
+                                       list.add(constructMetric(currentTime, 
partition, lag));
+                               }
+                               
EagleMetricReportManager.getInstance().emit(list);
+                       } catch (Exception ex) {
+                               LOG.error("Got an exception, ex: ", ex);
+                       }
+               }
+        try{
+            Thread.sleep(10 * 1000);
+        }catch(Throwable t){
+                       //Do nothing
+        }
+    }
+       
+       /**
+        * empty because framework will take care of output fields declaration
+        */
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               
+       }
+
+       @Override
+    public void ack(Object msgId) {
+    }
+
+    @Override
+    public void fail(Object msgId) {
+    }
+   
+    @Override
+    public void deactivate() {
+       
+    }
+   
+    @Override
+    public void close() {
+       
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/resources/application.conf 
b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
new file mode 100644
index 0000000..4b07019
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/application.conf
@@ -0,0 +1,39 @@
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "metricCollectionTopology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1
+    }
+  },
+  "dataSourceConfig": {
+    # For fetch gap
+    "site" : "sandbox",
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkQuorum" : "localhost:2191",
+    "hdfsTopologyConsumerGroupId" : "eagle.hdfsaudit.consumer",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 2000,
+    "zkConnectionTimeoutMS" : 15000,
+    #"fetchSize" : 1048586,
+    "deserializerClass" : 
"org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "metricCollectionConsumerId" : "eagle.metric.collection.consumer",
+    # For kafka spout
+    #"transactionZKServers" : "localhost",
+    #"transactionZKPort" : "2181",
+    "transactionZKRoot" : "/consumers",
+    #"transactionStateUpdateMS" : 2000,
+    "kafkaEndPoints" : "localhost:9092"
+  },
+  "eagleProps" : {
+    "eagleService": {
+      "host": "localhost",
+      "port": 38080,
+      "username": "admin",
+      "password": "secret"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties 
b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
new file mode 100644
index 0000000..8a0919a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/resources/log4j.properties
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=./logs
+eagle.log.file=eagle.log
+
+
+#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
+#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
+#log4j.logger.eagle.executor.AlertExecutor=DEBUG
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
+
+# Daily Rolling File Appender
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java 
b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
new file mode 100644
index 0000000..bfba783
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/test/java/TestKafkaOffset.java
@@ -0,0 +1,2 @@
+public class TestKafkaOffset {
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
index fa712d4..327cb8d 100644
--- 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
@@ -41,7 +41,6 @@ public class HdfsAuditLogProcessorMain {
         Config config = new ConfigOptionParser().load(args);
 
         LOG.info("Config class: " + config.getClass().getCanonicalName());
-
         if(LOG.isDebugEnabled()) LOG.debug("Config 
content:"+config.root().render(ConfigRenderOptions.concise()));
 
         StormExecutionEnvironment env = 
ExecutionEnvironmentFactory.getStorm(config);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
 
b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 16f77c3..9ddf0b2 100644
--- 
a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ 
b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -21,7 +21,7 @@ import org.apache.eagle.job.JobPartitioner;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig;
 import 
org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.RunningJobEndpointConfig;
-import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ZKStateConfig;
+import org.apache.eagle.dataproc.impl.storm.zookeeper.ZKStateConfig;
 import org.apache.eagle.jobrunning.storm.JobRunningSpout;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-security/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/pom.xml b/eagle-security/pom.xml
index 0f45c49..599cff6 100644
--- a/eagle-security/pom.xml
+++ b/eagle-security/pom.xml
@@ -41,5 +41,6 @@
     <module>eagle-security-hdfs-securitylog</module>
     <module>eagle-security-hbase-securitylog</module>
     <module>eagle-security-hbase-web</module>
+       <module>eagle-metric-collection</module>
   </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/34fa6d90/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 838c2f8..2132bb3 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -54,6 +54,11 @@
             <artifactId>eagle-security-hbase-securitylog</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-metric-collection</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

Reply via email to