http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf
new file mode 100644
index 0000000..1524e61
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/JobCounter.conf
@@ -0,0 +1,187 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#### Sample configuration:
+## counter.group0.name = groupname1
+## counter.group0.counter0.names = counterName1,counterName2,...
+## counter.group0.counter0.description = counter description...
+
+counter.group0.name = org.apache.hadoop.mapreduce.FileSystemCounter
+counter.group0.description = File System Counters
+counter.group0.counter0.names = FILE_BYTES_READ
+counter.group0.counter0.description = FILE: Number of bytes read
+counter.group0.counter1.names = FILE_BYTES_WRITTEN
+counter.group0.counter1.description = FILE: Number of bytes written
+counter.group0.counter2.names = FILE_READ_OPS
+counter.group0.counter2.description = FILE: Number of read operations
+counter.group0.counter3.names = FILE_LARGE_READ_OPS
+counter.group0.counter3.description = FILE: Number of large read operations
+counter.group0.counter4.names = FILE_WRITE_OPS
+counter.group0.counter4.description = FILE: Number of write operations
+counter.group0.counter5.names = HDFS_BYTES_READ
+counter.group0.counter5.description = HDFS: Number of bytes read
+counter.group0.counter6.names = HDFS_BYTES_WRITTEN
+counter.group0.counter6.description = HDFS: Number of bytes written
+counter.group0.counter7.names = HDFS_READ_OPS
+counter.group0.counter7.description = HDFS: Number of read operations
+counter.group0.counter8.names = HDFS_LARGE_READ_OPS
+counter.group0.counter8.description = HDFS: Number of large read operations
+counter.group0.counter9.names = HDFS_WRITE_OPS
+counter.group0.counter9.description = HDFS: Number of write operations
+
+counter.group1.name = org.apache.hadoop.mapreduce.TaskCounter
+counter.group1.description = Map-Reduce Framework
+counter.group1.counter0.names = MAP_INPUT_RECORDS
+counter.group1.counter0.description = Map input records
+counter.group1.counter1.names = MAP_OUTPUT_RECORDS
+counter.group1.counter1.description = Map output records
+counter.group1.counter2.names = SPLIT_RAW_BYTES
+counter.group1.counter2.description = Input split bytes
+counter.group1.counter3.names = SPILLED_RECORDS
+counter.group1.counter3.description = Spilled Records
+counter.group1.counter4.names = CPU_MILLISECONDS
+counter.group1.counter4.description = CPU time spent (ms)
+counter.group1.counter5.names = PHYSICAL_MEMORY_BYTES
+counter.group1.counter5.description = Physical memory (bytes) snapshot
+counter.group1.counter6.names = VIRTUAL_MEMORY_BYTES
+counter.group1.counter6.description = Virtual memory (bytes) snapshot
+counter.group1.counter7.names = COMMITTED_HEAP_BYTES
+counter.group1.counter7.description = Total committed heap usage (bytes)
+counter.group1.counter8.names = REDUCE_SHUFFLE_BYTES
+counter.group1.counter8.description = Reduce shuffle bytes (bytes)
+counter.group1.counter9.names = GC_TIME_MILLIS
+counter.group1.counter9.description = GC time milliseconds
+counter.group1.counter10.names = MAP_OUTPUT_BYTES
+counter.group1.counter10.description = map output bytes
+counter.group1.counter11.names = REDUCE_INPUT_RECORDS
+counter.group1.counter11.description = reduce input records
+counter.group1.counter12.names = COMBINE_INPUT_RECORDS
+counter.group1.counter12.description = combine input records
+counter.group1.counter13.names = COMBINE_OUTPUT_RECORDS
+counter.group1.counter13.description = combine output records
+counter.group1.counter14.names = REDUCE_INPUT_GROUPS
+counter.group1.counter14.description = reduce input groups
+counter.group1.counter15.names = REDUCE_OUTPUT_RECORDS
+counter.group1.counter15.description = reduce output records
+counter.group1.counter16.names = SHUFFLED_MAPS
+counter.group1.counter16.description = shuffled maps
+counter.group1.counter17.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter17.description = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group1.counter18.names = MERGED_MAP_OUTPUTS
+counter.group1.counter18.description = MERGED_MAP_OUTPUTS
+counter.group1.counter19.names = FAILED_SHUFFLE
+counter.group1.counter19.description = FAILED_SHUFFLE
+
+counter.group2.name = org.apache.hadoop.mapreduce.JobCounter
+counter.group2.description = Map-Reduce Job Counter
+counter.group2.counter0.names = MB_MILLIS_MAPS
+counter.group2.counter0.description = Total megabyte-seconds taken by all map 
tasks
+counter.group2.counter1.names = MB_MILLIS_REDUCES
+counter.group2.counter1.description = Total megabyte-seconds taken by all 
reduce tasks
+counter.group2.counter2.names = VCORES_MILLIS_MAPS
+counter.group2.counter2.description = Total vcore-seconds taken by all map 
tasks
+counter.group2.counter3.names = VCORES_MILLIS_REDUCES
+counter.group2.counter3.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter4.names = OTHER_LOCAL_MAPS
+counter.group2.counter4.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter5.names = DATA_LOCAL_MAPS
+counter.group2.counter5.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter6.names = MILLIS_MAPS
+counter.group2.counter6.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter7.names = MILLIS_REDUCES
+counter.group2.counter7.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter8.names = TOTAL_LAUNCHED_MAPS
+counter.group2.counter8.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter9.names = TOTAL_LAUNCHED_REDUCES
+counter.group2.counter9.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter10.names = SLOTS_MILLIS_MAPS
+counter.group2.counter10.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter11.names = SLOTS_MILLIS_REDUCES
+counter.group2.counter11.description = Total vcore-seconds taken by all reduce 
tasks
+counter.group2.counter12.names = RACK_LOCAL_MAPS
+counter.group2.counter12.description = Total vcore-seconds taken by all reduce 
tasks
+
+counter.group3.name = MapTaskAttemptCounter
+counter.group3.description = Reduce Task Attempt Counter Aggregation
+counter.group3.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group3.counter1.names = MAP_INPUT_RECORDS
+counter.group3.counter2.names = MERGED_MAP_OUTPUTS
+counter.group3.counter3.names = SPILLED_RECORDS
+counter.group3.counter4.names = MAP_OUTPUT_BYTES
+counter.group3.counter5.names = COMMITTED_HEAP_BYTES
+counter.group3.counter6.names = FAILED_SHUFFLE
+counter.group3.counter7.names = CPU_MILLISECONDS
+counter.group3.counter8.names = SPLIT_RAW_BYTES
+counter.group3.counter9.names = COMBINE_INPUT_RECORDS
+counter.group3.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group3.counter11.names = TASK_ATTEMPT_DURATION
+counter.group3.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group3.counter13.names = MAP_OUTPUT_RECORDS
+counter.group3.counter14.names = GC_TIME_MILLIS
+counter.group3.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group3.counter16.names = REDUCE_INPUT_GROUPS
+counter.group3.counter17.names = REDUCE_INPUT_RECORDS
+counter.group3.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group3.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group3.counter20.names = SHUFFLED_MAPS
+
+counter.group4.name = ReduceTaskAttemptCounter
+counter.group4.description = Reduce Task Attempt Counter Aggregation
+counter.group4.counter0.names = MAP_OUTPUT_MATERIALIZED_BYTES
+counter.group4.counter1.names = MAP_INPUT_RECORDS
+counter.group4.counter2.names = MERGED_MAP_OUTPUTS
+counter.group4.counter3.names = SPILLED_RECORDS
+counter.group4.counter4.names = MAP_OUTPUT_BYTES
+counter.group4.counter5.names = COMMITTED_HEAP_BYTES
+counter.group4.counter6.names = FAILED_SHUFFLE
+counter.group4.counter7.names = CPU_MILLISECONDS
+counter.group4.counter8.names = SPLIT_RAW_BYTES
+counter.group4.counter9.names = COMBINE_INPUT_RECORDS
+counter.group4.counter10.names = PHYSICAL_MEMORY_BYTES
+counter.group4.counter11.names = TASK_ATTEMPT_DURATION
+counter.group4.counter12.names = VIRTUAL_MEMORY_BYTES
+counter.group4.counter13.names = MAP_OUTPUT_RECORDS
+counter.group4.counter14.names = GC_TIME_MILLIS
+counter.group4.counter15.names = COMBINE_OUTPUT_RECORDS
+counter.group4.counter16.names = REDUCE_INPUT_GROUPS
+counter.group4.counter17.names = REDUCE_INPUT_RECORDS
+counter.group4.counter18.names = REDUCE_OUTPUT_RECORDS
+counter.group4.counter19.names = REDUCE_SHUFFLE_BYTES
+counter.group4.counter20.names = SHUFFLED_MAPS
+
+counter.group5.name = MapTaskAttemptFileSystemCounter
+counter.group5.description = Map Task Attempt File System Counter Aggregation
+counter.group5.counter0.names = FILE_READ_OPS
+counter.group5.counter1.names = FILE_WRITE_OPS
+counter.group5.counter2.names = FILE_BYTES_READ
+counter.group5.counter3.names = FILE_LARGE_READ_OPS
+counter.group5.counter4.names = HDFS_BYTES_READ
+counter.group5.counter5.names = FILE_BYTES_WRITTEN
+counter.group5.counter6.names = HDFS_LARGE_READ_OPS
+counter.group5.counter7.names = HDFS_BYTES_WRITTEN
+counter.group5.counter8.names = HDFS_READ_OPS
+
+counter.group6.name = ReduceTaskAttemptFileSystemCounter
+counter.group6.description = Reduce Task Attempt File System Counter 
Aggregation
+counter.group6.description = Map-Reduce Job Counter
+counter.group6.counter0.names = FILE_READ_OPS
+counter.group6.counter1.names = FILE_WRITE_OPS
+counter.group6.counter2.names = FILE_BYTES_READ
+counter.group6.counter3.names = FILE_LARGE_READ_OPS
+counter.group6.counter4.names = HDFS_BYTES_READ
+counter.group6.counter5.names = FILE_BYTES_WRITTEN
+counter.group6.counter6.names = HDFS_LARGE_READ_OPS
+counter.group6.counter7.names = HDFS_BYTES_WRITTEN
+counter.group6.counter8.names = HDFS_READ_OPS
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
new file mode 100644
index 0000000..5c0d8f9
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "local",
+    "topologyName" : "mrRunningJob",
+    "stormConfigFile" : "storm.yaml",
+    "parallelismConfig" : {
+      "mrRunningJobFetchSpout" : 1,
+      "mrRunningJobParseBolt" : 10
+    },
+    "tasks" : {
+      "mrRunningJobFetchSpout" : 1,
+      "mrRunningJobParseBolt" : 10
+    },
+    "workers" : 5
+  },
+
+  "jobExtractorConfig" : {
+    "site" : "sandbox",
+    "fetchRunningJobInterval" : 60,
+    "parseJobThreadPoolSize" : 5, #job concurrent
+    "topAndBottomTaskByElapsedTime" : 50
+  },
+
+  "zookeeperConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkPort" : "2181",
+    "zkRoot" : "/apps/mr/running",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000
+  },
+
+  "dataSourceConfig" : {
+    "rmUrls": ["http://sandbox.hortonworks.com:50030";]
+  },
+
+  "eagleProps" : {
+    "mailHost" : "abc.com",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "sandbox.hortonworks.com",
+      "port": 9099,
+      "readTimeOutSeconds" : 20,
+      "maxFlushNum" : 500,
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+
+  "MRConfigureKeys" : {
+    "jobConfigKey" : [
+    "mapreduce.map.output.compress",
+    "mapreduce.map.output.compress.codec",
+    "mapreduce.output.fileoutputformat.compress",
+    "mapreduce.output.fileoutputformat.compress.type",
+    "mapreduce.output.fileoutputformat.compress.codec",
+    "mapred.output.format.class",
+    "eagle.job.runid",
+    "eagle.job.runidfieldname",
+    "eagle.job.name",
+    "eagle.job.normalizedfieldname",
+    "eagle.alert.email",
+    "eagle.job.alertemailaddress",
+    "dataplatform.etl.info",
+    "mapreduce.map.memory.mb",
+    "mapreduce.reduce.memory.mb",
+    "mapreduce.map.java.opts",
+    "mapreduce.reduce.java.opts"
+  ]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
deleted file mode 100644
index 9c720c8..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/EventType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-public enum EventType {
-    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, 
SparkListenerApplicationStart,
-    SparkListenerExecutorAdded, 
SparkListenerJobStart,SparkListenerStageSubmitted, 
SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
-    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, 
SparkListenerApplicationEnd,SparkListenerExecutorRemoved
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
deleted file mode 100644
index 1f76a2f..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-import java.io.InputStream;
-
-public interface JHFInputStreamReader {
-    public void read(InputStream is) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
deleted file mode 100644
index 3fbc769..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-import java.io.InputStream;
-
-public interface JHFParserBase {
-    /**
-     * this method will ensure to close the inputstream
-     * @param is
-     * @throws Exception
-     */
-    public void parse(InputStream is) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
deleted file mode 100644
index 99aa68d..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ /dev/null
@@ -1,701 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import jline.internal.Log;
-import org.apache.eagle.jpm.entity.*;
-import org.apache.eagle.jpm.util.JSONUtil;
-import org.apache.eagle.jpm.util.JobNameNormalization;
-import org.apache.eagle.jpm.util.SparkEntityConstant;
-import org.apache.eagle.jpm.util.SparkJobTagName;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-public class JHFSparkEventReader {
-
-    public static final int FLUSH_LIMIT = 500;
-    private static final Logger logger = 
LoggerFactory.getLogger(JHFSparkEventReader.class);
-
-    private Map<String, SparkExecutor> executors;
-    private SparkApp app;
-    private Map<Integer, SparkJob> jobs;
-    private Map<String, SparkStage> stages;
-    private Map<Integer, Set<String>> jobStageMap;
-    private Map<Integer, SparkTask> tasks;
-    private EagleServiceClientImpl client;
-    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
-
-    private List<TaggedLogAPIEntity> createEntities;
-
-    private Config conf;
-
-    public JHFSparkEventReader(Map<String, String> baseTags, 
SparkApplicationInfo info) {
-        app = new SparkApp();
-        app.setTags(new HashMap<String, String>(baseTags));
-        app.setYarnState(info.getState());
-        app.setYarnStatus(info.getFinalStatus());
-        createEntities = new ArrayList<>();
-        jobs = new HashMap<Integer, SparkJob>();
-        stages = new HashMap<String, SparkStage>();
-        jobStageMap = new HashMap<Integer, Set<String>>();
-        tasks = new HashMap<Integer, SparkTask>();
-        executors = new HashMap<String, SparkExecutor>();
-        stageTaskStatusMap = new HashMap<>();
-        conf = ConfigFactory.load();
-        this.initiateClient();
-    }
-
-    public void read(JSONObject eventObj) throws Exception {
-        String eventType = (String) eventObj.get("Event");
-        if 
(eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString()))
 {
-            handleAppStarted(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString()))
 {
-            handleEnvironmentSet(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
-            handleExecutorAdd(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString()))
 {
-            handleBlockManagerAdd(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
-            handleJobStart(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
-            handleStageSubmit(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
-            handleTaskStart(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
-            handleTaskEnd(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
-            handleStageComplete(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
-            handleJobEnd(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) 
{
-            handleExecutorRemoved(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
-            handleAppEnd(eventObj);
-        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString()))
 {
-            //nothing to do now
-        } else {
-            logger.info("Not registered event type:" + eventType);
-        }
-
-    }
-
-
-    private void handleEnvironmentSet(JSONObject event) {
-        app.setConfig(new JobConfig());
-        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
-
-        List<String> jobConfs = 
conf.getStringList("basic.jobConf.additional.info");
-        String[] props = {"spark.yarn.app.id", "spark.executor.memory", 
"spark.driver.host", "spark.driver.port",
-                "spark.driver.memory", "spark.scheduler.pool", 
"spark.executor.cores", "spark.yarn.am.memory",
-                "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", 
"spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", 
"spark.master"};
-        jobConfs.addAll(Arrays.asList(props));
-        for (String prop : jobConfs) {
-            if (sparkProps.containsKey(prop)) {
-                app.getConfig().getConfig().put(prop, (String) 
sparkProps.get(prop));
-            }
-        }
-    }
-
-    private Object getConfigVal(JobConfig config, String configName, String 
type) {
-        if (config.getConfig().containsKey(configName)) {
-            Object val = config.getConfig().get(configName);
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return Integer.parseInt((String) val);
-            } else {
-                return val;
-            }
-        } else {
-            if (type.equalsIgnoreCase(Integer.class.getName())) {
-                return conf.getInt("spark.defaultVal." + configName);
-            } else {
-                return conf.getString("spark.defaultVal." + configName);
-            }
-
-        }
-    }
-
-
-    private boolean isClientMode(JobConfig config) {
-        if 
(config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-
-    private void handleAppStarted(JSONObject event) {
-        //need update all entities tag before app start
-        List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
-        entities.addAll(this.executors.values());
-        entities.add(this.app);
-
-        for (TaggedLogAPIEntity entity : entities) {
-            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), 
JSONUtil.getString(event, "App ID"));
-            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), 
JSONUtil.getString(event, "App Name"));
-            
entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), 
JSONUtil.getString(event, "App Attempt ID"));
-            // the second argument of getNormalizeName() is changed to null 
because the original code contains sensitive text
-            // original second argument looks like: 
this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
-            
entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), 
this.getNormalizedName(JSONUtil.getString(event, "App Name"), null));
-            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), 
JSONUtil.getString(event, "User"));
-        }
-
-        this.app.setStartTime(JSONUtil.getLong(event, "Timestamp"));
-        this.app.setTimestamp(JSONUtil.getLong(event, "Timestamp"));
-
-    }
-
-    private void handleExecutorAdd(JSONObject event) throws Exception {
-        String executorID = (String) event.get("Executor ID");
-        SparkExecutor executor = this.initiateExecutor(executorID, 
JSONUtil.getLong(event, "Timestamp"));
-
-        JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor 
Info");
-
-    }
-
-    private void handleBlockManagerAdd(JSONObject event) throws Exception {
-        long maxMemory = JSONUtil.getLong(event, "Maximum Memory");
-        long timestamp = JSONUtil.getLong(event, "Timestamp");
-        JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager 
ID");
-        String executorID = JSONUtil.getString(blockInfo, "Executor ID");
-        String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, 
"Host"), JSONUtil.getLong(blockInfo, "Port"));
-
-        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
-        executor.setMaxMemory(maxMemory);
-        executor.setHostPort(hostport);
-    }
-
-    private void handleTaskStart(JSONObject event) {
-        this.initializeTask(event);
-    }
-
-    private void handleTaskEnd(JSONObject event) {
-        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
-        Integer taskId = JSONUtil.getInt(taskInfo, "Task ID");
-        SparkTask task = null;
-        if (tasks.containsKey(taskId)) {
-            task = tasks.get(taskId);
-        } else {
-            return;
-        }
-
-        task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed"));
-        JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics");
-        if (null != taskMetrics) {
-            task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, 
"Executor Deserialize Time"));
-            task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor 
Run Time"));
-            task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time"));
-            task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size"));
-            task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, 
"Result Serialization Time"));
-            task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory 
Bytes Spilled"));
-            task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes 
Spilled"));
-
-            JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, 
"Input Metrics");
-            if (null != inputMetrics) {
-                task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes 
Read"));
-                task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records 
Read"));
-            }
-
-            JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, 
"Output Metrics");
-            if (null != outputMetrics) {
-                task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes 
Written"));
-                task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records 
Written"));
-            }
-
-            JSONObject shuffleWriteMetrics = 
JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics");
-            if (null != shuffleWriteMetrics) {
-                
task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes 
Written"));
-                
task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle 
Records Written"));
-            }
-
-            JSONObject shuffleReadMetrics = 
JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics");
-            if (null != shuffleReadMetrics) {
-                
task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes 
Read"));
-                
task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote 
Bytes Read"));
-                
task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records 
Read"));
-            }
-        } else {
-            //for tasks success without task metrics, save in the end if no 
other information
-            if (!task.isFailed()) {
-                return;
-            }
-        }
-
-        aggregateToStage(task);
-        aggregateToExecutor(task);
-        tasks.remove(taskId);
-        this.flushEntities(task, false);
-    }
-
-    private SparkTask initializeTask(JSONObject event) {
-        SparkTask task = new SparkTask();
-        task.setTags(new HashMap(this.app.getTags()));
-        task.setTimestamp(app.getTimestamp());
-
-        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
JSONUtil.getLong(event, "Stage ID").toString());
-        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
JSONUtil.getLong(event, "Stage Attempt ID").toString());
-
-        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
-        task.setTaskId(JSONUtil.getInt(taskInfo, "Task ID"));
-        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), 
JSONUtil.getInt(taskInfo, "Index").toString());
-        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), 
JSONUtil.getInt(taskInfo, "Attempt").toString());
-        task.setLaunchTime(JSONUtil.getLong(taskInfo, "Launch Time"));
-        task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID"));
-        task.setHost(JSONUtil.getString(taskInfo, "Host"));
-        task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality"));
-        task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative"));
-
-        tasks.put(task.getTaskId(), task);
-        return task;
-    }
-
-    private void handleJobStart(JSONObject event) {
-        SparkJob job = new SparkJob();
-        job.setTags(new HashMap(this.app.getTags()));
-        job.setTimestamp(app.getTimestamp());
-
-        Integer jobId = JSONUtil.getInt(event, "Job ID");
-        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
jobId.toString());
-        job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time"));
-
-        //for complete application, no active stages/tasks
-        job.setNumActiveStages(0);
-        job.setNumActiveTasks(0);
-
-        this.jobs.put(jobId, job);
-        this.jobStageMap.put(jobId, new HashSet<String>());
-
-        JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos");
-        job.setNumStages(stages.size());
-        for (int i = 0; i < stages.size(); i++) {
-            JSONObject stageInfo = (JSONObject) stages.get(i);
-            Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-            Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
-            String stageName = JSONUtil.getString(stageInfo, "Stage Name");
-            int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
-            this.initiateStage(jobId, stageId, stageAttemptId, stageName, 
numTasks);
-
-        }
-
-    }
-
-    private void handleStageSubmit(JSONObject event) {
-        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
-        String key = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
-        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
-
-        if (!stages.containsKey(this.generateStageKey(stageId.toString(), 
stageAttemptId.toString()))) {
-            //may be further attempt for one stage
-            String baseAttempt = this.generateStageKey(stageId.toString(), 
"0");
-            if (stages.containsKey(baseAttempt)) {
-                SparkStage stage = stages.get(baseAttempt);
-                String jobId = 
stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
-
-                String stageName = JSONUtil.getString(event, "Stage Name");
-                int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
-                this.initiateStage(Integer.parseInt(jobId), stageId, 
stageAttemptId, stageName, numTasks);
-            }
-        }
-
-    }
-
-    private void handleStageComplete(JSONObject event) {
-        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
-        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
-        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
-        String key = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
-        SparkStage stage = stages.get(key);
-        stage.setSubmitTime(JSONUtil.getLong(stageInfo, "Submission Time"));
-        stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time"));
-
-        if (stageInfo.containsKey("Failure Reason")) {
-            
stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString());
-        } else {
-            
stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString());
-        }
-    }
-
-    private void handleExecutorRemoved(JSONObject event) {
-        String executorID = JSONUtil.getString(event, "Executor ID");
-        SparkExecutor executor = executors.get(executorID);
-        executor.setEndTime(JSONUtil.getLong(event, "Timestamp"));
-
-    }
-
-    private void handleJobEnd(JSONObject event) {
-        Integer jobId = JSONUtil.getInt(event, "Job ID");
-        SparkJob job = jobs.get(jobId);
-        job.setCompletionTime(JSONUtil.getLong(event, "Completion Time"));
-        JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
-        String result = JSONUtil.getString(jobResult, "Result");
-        if (result.equalsIgnoreCase("JobSucceeded")) {
-            
job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString());
-        } else {
-            
job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString());
-        }
-
-    }
-
-    private void handleAppEnd(JSONObject event) {
-        long endTime = JSONUtil.getLong(event, "Timestamp");
-        app.setEndTime(endTime);
-    }
-
-    public void clearReader() throws Exception {
-        //clear tasks
-        for (SparkTask task : tasks.values()) {
-            logger.info("Task {} does not have result or no task metrics.", 
task.getTaskId());
-            task.setFailed(true);
-            aggregateToStage(task);
-            aggregateToExecutor(task);
-            this.flushEntities(task, false);
-        }
-
-        List<SparkStage> needStoreStages = new ArrayList<>();
-        for (SparkStage stage : this.stages.values()) {
-            Integer jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
-                SparkJob job = this.jobs.get(jobId);
-                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
-                job.setNumSkippedTasks(job.getNumSkippedTasks() + 
stage.getNumTasks());
-            } else {
-                this.aggregateToJob(stage);
-                this.aggregateStageToApp(stage);
-                needStoreStages.add(stage);
-            }
-            String stageId = 
stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-            String stageAttemptId = 
stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, 
stageAttemptId));
-        }
-
-        this.flushEntities(needStoreStages, false);
-        for (SparkJob job : jobs.values()) {
-            this.aggregateJobToApp(job);
-        }
-        this.flushEntities(jobs.values(), false);
-
-        app.setExecutors(executors.values().size());
-        long executorMemory = parseExecutorMemory((String) 
this.getConfigVal(this.app.getConfig(), "spark.executor.memory", 
String.class.getName()));
-        long driverMemory = 
parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) 
this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", 
String.class.getName()) : (String) this.getConfigVal(app.getConfig(), 
"spark.driver.memory", String.class.getName()));
-        int executoreCore = (Integer) this.getConfigVal(app.getConfig(), 
"spark.executor.cores", Integer.class.getName());
-        int driverCore = this.isClientMode(app.getConfig()) ? (Integer) 
this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", 
Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), 
"spark.driver.cores", Integer.class.getName());
-        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), 
executorMemory, "spark.yarn.executor.memoryOverhead");
-        long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? 
this.getMemoryOverhead(app.getConfig(), driverMemory, 
"spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), 
driverMemory, "spark.yarn.driver.memoryOverhead");
-
-        app.setExecMemoryBytes(executorMemory);
-        app.setDriveMemoryBytes(driverMemory);
-        app.setExecutorCores(executoreCore);
-        app.setDriverCores(driverCore);
-        app.setExecutorMemoryOverhead(executorMemoryOverhead);
-        app.setDriverMemoryOverhead(driverMemoryOverhead);
-
-        for (SparkExecutor executor : executors.values()) {
-            String executorID = 
executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
-            if (executorID.equalsIgnoreCase("driver")) {
-                executor.setExecMemoryBytes(driverMemory);
-                executor.setCores(driverCore);
-                executor.setMemoryOverhead(driverMemoryOverhead);
-            } else {
-                executor.setExecMemoryBytes(executorMemory);
-                executor.setCores(executoreCore);
-                executor.setMemoryOverhead(executorMemoryOverhead);
-            }
-            if (executor.getEndTime() == 0)
-                executor.setEndTime(app.getEndTime());
-            this.aggregateExecutorToApp(executor);
-        }
-        this.flushEntities(executors.values(), false);
-        //spark code...tricky
-        app.setSkippedTasks(app.getCompleteTasks());
-        this.flushEntities(app, true);
-    }
-
-    private long getMemoryOverhead(JobConfig config, long executorMemory, 
String fieldName) {
-        long result = 0l;
-        if (config.getConfig().containsKey(fieldName)) {
-            result = 
this.parseExecutorMemory(config.getConfig().get(fieldName) + "m");
-            if(result  == 0l){
-               result = 
this.parseExecutorMemory(config.getConfig().get(fieldName));
-            }
-        }
-
-        if(result == 0l){
-            result =  
Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
 executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 
100);
-        }
-        return result;
-    }
-
-    private void aggregateExecutorToApp(SparkExecutor executor) {
-        app.setTotalExecutorTime(app.getTotalExecutorTime() + 
(executor.getEndTime() - executor.getStartTime()));
-    }
-
-    private void aggregateJobToApp(SparkJob job) {
-        //aggregate job level metrics
-        app.setNumJobs(app.getNumJobs() + 1);
-        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
-        app.setCompleteTasks(app.getCompleteTasks() + 
job.getNumCompletedTasks());
-        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
-        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
-        app.setTotalStages(app.getTotalStages() + job.getNumStages());
-        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
-        app.setSkippedStages(app.getSkippedStages() + 
job.getNumSkippedStages());
-    }
-
-    private void aggregateStageToApp(SparkStage stage) {
-        //aggregate task level metrics
-        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + 
stage.getDiskBytesSpilled());
-        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + 
stage.getMemoryBytesSpilled());
-        app.setExecutorRunTime(app.getExecutorRunTime() + 
stage.getExecutorRunTime());
-        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
-        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + 
stage.getExecutorDeserializeTime());
-        app.setResultSerializationTime(app.getResultSerializationTime() + 
stage.getResultSerializationTime());
-        app.setResultSize(app.getResultSize() + stage.getResultSize());
-        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
-        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
-        app.setOutputRecords(app.getOutputRecords() + 
stage.getOutputRecords());
-        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
-        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + 
stage.getShuffleWriteRecords());
-        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + 
stage.getShuffleWriteBytes());
-        app.setShuffleReadRecords(app.getShuffleReadRecords() + 
stage.getShuffleReadRecords());
-        app.setShuffleReadBytes(app.getShuffleReadBytes() + 
stage.getShuffleReadBytes());
-    }
-
-    private void aggregateToStage(SparkTask task) {
-        String stageId = 
task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
-        String stageAttemptId = 
task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
-        String key = this.generateStageKey(stageId, stageAttemptId);
-        SparkStage stage = stages.get(key);
-
-        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + 
task.getDiskBytesSpilled());
-        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + 
task.getMemoryBytesSpilled());
-        stage.setExecutorRunTime(stage.getExecutorRunTime() + 
task.getExecutorRunTime());
-        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
-        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + 
task.getExecutorDeserializeTime());
-        stage.setResultSerializationTime(stage.getResultSerializationTime() + 
task.getResultSerializationTime());
-        stage.setResultSize(stage.getResultSize() + task.getResultSize());
-        stage.setInputRecords(stage.getInputRecords() + 
task.getInputRecords());
-        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
-        stage.setOutputRecords(stage.getOutputRecords() + 
task.getOutputRecords());
-        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
-        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + 
task.getShuffleWriteRecords());
-        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + 
task.getShuffleWriteBytes());
-        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + 
task.getShuffleReadRecords());
-        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
-        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + 
taskShuffleReadBytes);
-
-        boolean success = !task.isFailed();
-
-        Integer taskIndex = 
Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
-        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
-            //has previous task attempt, retrieved from task index in one stage
-            boolean previousResult = 
stageTaskStatusMap.get(key).get(taskIndex);
-            success = previousResult || success;
-            if (previousResult != success) {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-                stageTaskStatusMap.get(key).put(taskIndex, success);
-            }
-        } else {
-            if (success) {
-                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
-            } else {
-                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
-            }
-            stageTaskStatusMap.get(key).put(taskIndex, success);
-        }
-
-    }
-
-    private void aggregateToExecutor(SparkTask task) {
-        String executorId = task.getExecutorId();
-        SparkExecutor executor = executors.get(executorId);
-
-        if (null != executor) {
-            executor.setTotalTasks(executor.getTotalTasks() + 1);
-            if (task.isFailed()) {
-                executor.setFailedTasks(executor.getFailedTasks() + 1);
-            } else {
-                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
-            }
-            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
-            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + 
taskShuffleReadBytes);
-            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
-            executor.setTotalInputBytes(executor.getTotalInputBytes() + 
task.getInputBytes());
-            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + 
task.getShuffleWriteBytes());
-            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
-        }
-
-    }
-
-    private void aggregateToJob(SparkStage stage) {
-        Integer jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
-        SparkJob job = jobs.get(jobId);
-        job.setNumCompletedTasks(job.getNumCompletedTasks() + 
stage.getNumCompletedTasks());
-        job.setNumFailedTasks(job.getNumFailedTasks() + 
stage.getNumFailedTasks());
-        job.setNumTask(job.getNumTask() + stage.getNumTasks());
-
-
-        if 
(stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()))
 {
-            //if multiple attempts succeed, just count one
-            if (!hasStagePriorAttemptSuccess(stage)) {
-                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
-            }
-
-        } else {
-            job.setNumFailedStages(job.getNumFailedStages() + 1);
-        }
-    }
-
-    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
-        Integer stageAttemptId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
-        for (Integer i = 0; i < stageAttemptId; i++) {
-            SparkStage previousStage = 
stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()),
 i.toString()));
-            if 
(previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()))
 {
-                return true;
-            }
-        }
-        return false;
-    }
-
-
-    private String generateStageKey(String stageId, String stageAttemptId) {
-        return String.format("%s-%s", stageId, stageAttemptId);
-    }
-
-    private void initiateStage(Integer jobId, Integer stageId, Integer 
stageAttemptId, String name, int numTasks) {
-        SparkStage stage = new SparkStage();
-        stage.setTags(new HashMap(this.app.getTags()));
-        stage.setTimestamp(app.getTimestamp());
-        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
jobId.toString());
-        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
stageId.toString());
-        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
stageAttemptId.toString());
-        stage.setName(name);
-        stage.setNumActiveTasks(0);
-        stage.setNumTasks(numTasks);
-        
stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool")
 == null ? "default" : 
this.app.getConfig().getConfig().get("spark.scheduler.pool"));
-
-        String stageKey = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
-        stages.put(stageKey, stage);
-        this.jobStageMap.get(jobId).add(stageKey);
-    }
-
-
-    private SparkExecutor initiateExecutor(String executorID, long startTime) 
throws Exception {
-        if (!executors.containsKey(executorID)) {
-            SparkExecutor executor = new SparkExecutor();
-            executor.setTags(new HashMap(this.app.getTags()));
-            
executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), 
executorID);
-            executor.setStartTime(startTime);
-            executor.setTimestamp(app.getTimestamp());
-
-            this.executors.put(executorID, executor);
-        }
-
-        return this.executors.get(executorID);
-    }
-
-    private String getNormalizedName(String jobName, String assignedName) {
-        if (null != assignedName) {
-            return assignedName;
-        } else {
-            return JobNameNormalization.getInstance().normalize(jobName);
-        }
-    }
-
-    private long parseExecutorMemory(String memory) {
-
-        if (memory.endsWith("g") || memory.endsWith("G")) {
-            int executorGB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
-            return 1024l * 1024 * 1024 * executorGB;
-        } else if (memory.endsWith("m") || memory.endsWith("M")) {
-            int executorMB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
-            return 1024l * 1024 * executorMB;
-        } else if (memory.endsWith("k") || memory.endsWith("K")) {
-            int executorKB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
-            return 1024l * executorKB;
-        } else if (memory.endsWith("t") || memory.endsWith("T")) {
-            int executorTB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
-            return 1024l * 1024 * 1024 * 1024 * executorTB;
-        } else if (memory.endsWith("p") || memory.endsWith("P")) {
-            int executorPB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
-            return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
-        }
-        Log.info("Cannot parse memory info " +  memory);
-        return 0l;
-    }
-
-    private void flushEntities(Object entity, boolean forceFlush) {
-        this.flushEntities(Arrays.asList(entity), forceFlush);
-    }
-
-    private void flushEntities(Collection entities, boolean forceFlush) {
-        this.createEntities.addAll(entities);
-
-        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
-            try {
-                this.doFlush(this.createEntities);
-                this.createEntities.clear();
-            } catch (Exception e) {
-                logger.error("Fail to flush entities", e);
-            }
-
-        }
-    }
-
-    private EagleServiceBaseClient initiateClient() {
-        String host = conf.getString("eagleProps.eagle.service.host");
-        int port = conf.getInt("eagleProps.eagle.service.port");
-        String userName = conf.getString("eagleProps.eagle.service.userName");
-        String pwd = conf.getString("eagleProps.eagle.service.pwd");
-        client = new EagleServiceClientImpl(host, port, userName, pwd);
-        int timeout = conf.getInt("eagleProps.eagle.service.read_timeout");
-        client.getJerseyClient().setReadTimeout(timeout * 1000);
-
-        return client;
-    }
-
-    private void doFlush(List entities) throws Exception {
-        logger.info("start flushing entities of total number " + 
entities.size());
-        client.create(entities);
-        logger.info("finish flushing entities of total number " + 
entities.size());
-//        for(Object entity: entities){
-//            if(entity instanceof SparkApp){
-//                for (Field field : entity.getClass().getDeclaredFields()) {
-//                    field.setAccessible(true); // You might want to set 
modifier to public first.
-//                    Object value = field.get(entity);
-//                    if (value != null) {
-//                        System.out.println(field.getName() + "=" + value);
-//                    }
-//                }
-//            }
-//        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
deleted file mode 100644
index 75ce508..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-public class JHFSparkParser implements JHFParserBase {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(JHFSparkParser.class);
-
-    JHFSparkEventReader eventReader;
-
-    public JHFSparkParser(JHFSparkEventReader reader){
-        this.eventReader = reader;
-    }
-
-    @Override
-    public void parse(InputStream is) throws Exception {
-        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
-        try{
-            String line = null;
-
-            JSONParser parser = new JSONParser();
-            while((line = reader.readLine()) != null){
-                try{
-                    JSONObject eventObj = (JSONObject) parser.parse(line);
-                    this.eventReader.read(eventObj);
-                }catch(Exception e){
-                    logger.error(String.format("Fail to parse %s.", line), e);
-                }
-            }
-            this.eventReader.clearReader();
-
-        }finally {
-            if(reader != null){
-                reader.close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
deleted file mode 100644
index c206b71..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-public class SparkApplicationInfo {
-
-    private String state;
-    private String finalStatus;
-    private String queue;
-    private String name;
-    private String user;
-
-    public String getState() {
-        return state;
-    }
-
-    public void setState(String state) {
-        this.state = state;
-    }
-
-    public String getFinalStatus() {
-        return finalStatus;
-    }
-
-    public void setFinalStatus(String finalStatus) {
-        this.finalStatus = finalStatus;
-    }
-
-    public String getQueue() {
-        return queue;
-    }
-
-    public void setQueue(String queue) {
-        this.queue = queue;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getUser() {
-        return user;
-    }
-
-    public void setUser(String user) {
-        this.user = user;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
deleted file mode 100644
index 38c0a04..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkHistoryFileInputStreamReaderImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.history.crawl;
-
-import org.apache.eagle.jpm.util.SparkJobTagName;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-public class SparkHistoryFileInputStreamReaderImpl implements 
JHFInputStreamReader {
-
-    private String site;
-    private SparkApplicationInfo app;
-
-
-    public SparkHistoryFileInputStreamReaderImpl(String site, 
SparkApplicationInfo app){
-        this.site = site;
-        this.app = app;
-    }
-
-    @Override
-    public void read(InputStream is) throws Exception {
-        Map<String, String> baseTags = new HashMap<>();
-        baseTags.put(SparkJobTagName.SITE.toString(), site);
-        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
-        JHFParserBase parser = new JHFSparkParser(new 
JHFSparkEventReader(baseTags, this.app));
-        parser.parse(is);
-    }
-
-    public static void main(String[] args) throws Exception{
-        SparkHistoryFileInputStreamReaderImpl impl = new 
SparkHistoryFileInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
-        impl.read(new FileInputStream(new 
File("E:\\eagle\\application_1459803563374_535667_1")));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 60e126e..0c475a9 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -26,7 +26,7 @@ import 
org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
index 9c231aa..8404eda 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/FinishedSparkJobSpout.java
@@ -71,10 +71,13 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
     public void nextTuple() {
         LOG.info("Start to run tuple");
         try {
-            long fetchTime = Calendar.getInstance().getTimeInMillis();
+            Calendar calendar = Calendar.getInstance();
+            long fetchTime = calendar.getTimeInMillis();
+            calendar.setTimeInMillis(this.lastFinishAppTime);
+            LOG.info("Last finished time = {}", calendar.getTime());
             if (fetchTime - this.lastFinishAppTime > 
this.config.stormConfig.spoutCrawlInterval) {
-                List apps = 
rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, new 
Long(lastFinishAppTime).toString());
-                List<AppInfo> appInfos = (null != apps ? 
(List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
+                List<AppInfo> appInfos = 
rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, 
Long.toString(lastFinishAppTime));
+                //List<AppInfo> appInfos = (null != apps ? 
(List<AppInfo>)apps.get(0):new ArrayList<AppInfo>());
                 LOG.info("Get " + appInfos.size() + " from yarn resource 
manager.");
                 for (AppInfo app: appInfos) {
                     String appId = app.getId();
@@ -123,7 +126,7 @@ public class FinishedSparkJobSpout extends BaseRichSpout {
         if (this.failTimes.containsKey(appId)) {
             failTimes = this.failTimes.get(appId);
         }
-        failTimes ++;
+        failTimes++;
         if (failTimes >= FAIL_MAX_TIMES) {
             this.failTimes.remove(appId);
             zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
index 4e6bf03..23d5152 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -24,18 +24,16 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
 import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
-import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
-import 
org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
-import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.jpm.util.HDFSUtil;
 import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
 import 
org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
 import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
-import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,9 +41,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class SparkJobParseBolt extends BaseRichBolt {
 
@@ -87,15 +83,23 @@ public class SparkJobParseBolt extends BaseRichBolt {
 
             SparkApplicationInfo info = zkState.getApplicationInfo(appId);
             //first try to get attempts under the application
-            List<SparkApplicationAttempt> attempts = 
this.getAttemptList(appId);
+            hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+            Set<String> inprogressSet = new HashSet<String>();
+            List<String> attemptLogNames = this.getAttemptLogNameList(appId, 
hdfs, inprogressSet);
 
-            if (attempts.isEmpty()) {
-                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not 
found on history server.", appId, info.getName(), info.getUser(), 
info.getQueue());
+            if (attemptLogNames.isEmpty()) {
+                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not 
found on history server.",
+                    appId, info.getName(), info.getUser(), info.getQueue());
             } else {
-                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
-                for (SparkApplicationAttempt attempt : attempts) {
-                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir 
+ "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId()));
-                    JHFInputStreamReader reader = new 
SparkHistoryFileInputStreamReaderImpl(config.info.site , info);
+                for (String attemptLogName : attemptLogNames) {
+                    String extension = "";
+                    if (inprogressSet.contains(attemptLogName)) {
+                        extension = ".inprogress";
+                    }
+                    LOG.info("Attempt log name: " + attemptLogName + 
extension);
+
+                    Path attemptFile = getFilePath(attemptLogName, extension);
+                    JHFInputStreamReader reader = new 
SparkFilesystemInputStreamReaderImpl(config.info.site, info);
                     reader.read(hdfs.open(attemptFile));
                 }
             }
@@ -124,50 +128,67 @@ public class SparkJobParseBolt extends BaseRichBolt {
     }
 
     private String getAppAttemptLogName(String appId, String attemptId) {
-        return String.format("%s_%s", appId, attemptId);
+        if (attemptId.equals("0")) {
+            return appId;
+        }
+        return appId + "_" + attemptId;
     }
 
-    private List<SparkApplicationAttempt> getAttemptList(String appId) throws 
IOException {
-        FileSystem hdfs = null;
-        List<SparkApplicationAttempt> attempts = new ArrayList<>();
-        try {
+    private Path getFilePath(String appAttemptLogName, String extension) {
+        String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + 
appAttemptLogName + extension;
+        return new Path(attemptLogDir);
+    }
 
-            SparkApplication app = null;
-            /*try {
-                List apps = 
this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, 
appId);
-                if (apps != null) {
-                    app = (SparkApplication) apps.get(0);
-                    attempts = app.getAttempts();
+    private List<String> getAttemptLogNameList(String appId, FileSystem hdfs, 
Set<String> inprogressSet)
+            throws IOException {
+        List<String> attempts = new ArrayList<String>();
+        SparkApplication app = null;
+        /*try {
+            List apps = 
this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, 
appId);
+            if (apps != null) {
+                app = (SparkApplication) apps.get(0);
+                attempts = app.getAttempts();
+            }
+        } catch (Exception e) {
+            LOG.warn("Fail to get application detail from history server for 
appId " + appId, e);
+        }*/
+
+
+        if (null == app) {
+            // history server may not have the info, just double check.
+            // TODO: if attemptId is not "1, 2, 3,...", we should change the 
logic.
+            // Use getResourceManagerVersion() to compare YARN/RM versions.
+            // attemptId might be: "appId_000001"
+            int attemptId = 0;
+
+            boolean exists = true;
+            while (exists) {
+                String attemptIdString = Integer.toString(attemptId);
+                String appAttemptLogName = this.getAppAttemptLogName(appId, 
attemptIdString);
+                LOG.info("Attempt ID: {}, App Attempt Log: {}", 
attemptIdString, appAttemptLogName);
+
+                String extension = "";
+                Path attemptFile = getFilePath(appAttemptLogName, extension);
+                extension = ".inprogress";
+                Path inprogressFile = getFilePath(appAttemptLogName, 
extension);
+                Path logFile = null;
+                // Check if history log exists.
+                if (hdfs.exists(attemptFile)) {
+                    logFile = attemptFile;
+                } else if (hdfs.exists(inprogressFile)) {
+                    logFile = inprogressFile;
+                    inprogressSet.add(appAttemptLogName);
+                } else if (attemptId > 0) {
+                    exists = false;
                 }
-            } catch (Exception e) {
-                LOG.warn("Fail to get application detail from history server 
for appId " + appId, e);
-            }*/
-
-
-            if (null == app) {
-                //history server may not have the info, just double check
-                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
-                Integer attemptId = 1;
-
-                boolean exists = true;
-                while (exists) {
-                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir 
+ "/" + this.getAppAttemptLogName(appId, attemptId.toString()));
-                    if (hdfs.exists(attemptFile)) {
-                        SparkApplicationAttempt attempt = new 
SparkApplicationAttempt();
-                        attempt.setAttemptId(attemptId.toString());
-                        attempts.add(attempt);
-                        attemptId++;
-                    } else {
-                        exists = false;
-                    }
+
+                if (logFile != null) {
+                    attempts.add(appAttemptLogName);
                 }
-            }
-            return attempts;
-        } finally {
-            if (null != hdfs) {
-                hdfs.close();
+                attemptId++;
             }
         }
+        return attempts;
     }
 
     @Override
@@ -175,4 +196,4 @@ public class SparkJobParseBolt extends BaseRichBolt {
         super.cleanup();
         zkState.close();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 36f0836..65aaa36 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -33,12 +33,12 @@
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000,
-    spark.history.server.url : "http://sandbox.hortonworks.com:18080/";,
+    spark.history.server.url : "http://sandbox.hortonworks.com:18080";,
     spark.history.server.username : "",
     spark.history.server.pwd : "",
     rm.url:["http://sandbox.hortonworks.com:8088";] ,
     "hdfs": {
-      "baseDir": "/logs/spark-events",
+      "baseDir": "/spark-history",
       "endPoint": "hdfs://sandbox.hortonworks.com:8020",
       "principal": "",
       "keytab" : ""

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml 
b/eagle-jpm/eagle-jpm-spark-running/pom.xml
index 7bf90d4..7f5e6e8 100644
--- a/eagle-jpm/eagle-jpm-spark-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -29,38 +29,140 @@
   <name>eagle-jpm-spark-running</name>
   <url>http://maven.apache.org</url>
   <dependencies>
-       <dependency>
-               <groupId>org.slf4j</groupId>
-               <artifactId>slf4j-api</artifactId>
-       </dependency>
-       <dependency>
-               <groupId>org.apache.eagle</groupId>
-               <artifactId>eagle-stream-process-api</artifactId>
-        <version>${project.version}</version>
-       </dependency>
+      <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-process-api</artifactId>
+          <version>${project.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.wso2.orbit.com.lmax</groupId>
+                  <artifactId>disruptor</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>asm</groupId>
+                  <artifactId>asm</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-client</artifactId>
+          <version>${curator.version}</version>
+      </dependency>
       <dependency>
           <groupId>org.apache.eagle</groupId>
           <artifactId>eagle-stream-process-base</artifactId>
           <version>${project.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.wso2.orbit.com.lmax</groupId>
+                  <artifactId>disruptor</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>asm</groupId>
+                  <artifactId>asm</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-job-common</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.jsoup</groupId>
+        <artifactId>jsoup</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>storm-core</artifactId>
+          <exclusions>
+              <exclusion>
+                  <groupId>ch.qos.logback</groupId>
+                  <artifactId>logback-classic</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.ow2.asm</groupId>
+                  <artifactId>asm</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-jpm-util</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-jpm-entity</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs-nfs</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <version>${hadoop.version}</version>
       </dependency>
-       <dependency>
-               <groupId>org.apache.eagle</groupId>
-               <artifactId>eagle-job-common</artifactId>
-               <version>${project.version}</version>
-       </dependency>           
-       <dependency>
-               <groupId>org.jsoup</groupId>
-               <artifactId>jsoup</artifactId>
-       </dependency>
-       <dependency>
-               <groupId>org.apache.storm</groupId>
-               <artifactId>storm-core</artifactId>
-               <exclusions>
-               <exclusion>
-                       <groupId>ch.qos.logback</groupId>
-                       <artifactId>logback-classic</artifactId>
-               </exclusion>
-       </exclusions>
-       </dependency>
   </dependencies>
+  <build>
+      <resources>
+          <resource>
+              <directory>src/main/resources</directory>
+          </resource>
+      </resources>
+      <plugins>
+          <plugin>
+              <artifactId>maven-assembly-plugin</artifactId>
+              <configuration>
+                  
<descriptor>src/assembly/eagle-jpm-spark-running-assembly.xml</descriptor>
+                  
<finalName>eagle-jpm-spark-running-${project.version}</finalName>
+              </configuration>
+              <executions>
+                  <execution>
+                      <phase>package</phase>
+                      <goals>
+                          <goal>single</goal>
+                      </goals>
+                      <configuration>
+                          <tarLongFileMode>posix</tarLongFileMode>
+                      </configuration>
+                  </execution>
+              </executions>
+          </plugin>
+      </plugins>
+  </build>
 </project>

Reply via email to