[EAGLE-422] eagle support for mr & spark running job monitoring

Author: jinhuwu <jinh...@ebay.com>
Author: pkuwm <ihuizhi...@gmail.com>
Author: Zhao, Qingwen <qingwz...@ebay.com>

Closes #309 from wujinhu/develop.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/5bf2c62d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/5bf2c62d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/5bf2c62d

Branch: refs/heads/develop
Commit: 5bf2c62d264ebc7922b18d932995b4f2b46c4a1a
Parents: 5c0db6a
Author: jinhuwu <jinh...@ebay.com>
Authored: Tue Aug 9 13:25:09 2016 +0800
Committer: Zhao, Qingwen <qingwz...@ebay.com>
Committed: Tue Aug 9 13:25:09 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/engine/e2e/Integration1.java    |   4 +
 .../eagle/jobrunning/ha/HAURLSelectorImpl.java  |   9 +-
 .../eagle/jobrunning/util/InputStreamUtils.java |   5 +
 .../apache/eagle/jpm/spark/crawl/EventType.java |  24 +
 .../jpm/spark/crawl/JHFInputStreamReader.java   |  25 +
 .../eagle/jpm/spark/crawl/JHFParserBase.java    |  29 +
 .../jpm/spark/crawl/JHFSparkEventReader.java    | 728 +++++++++++++++++++
 .../eagle/jpm/spark/crawl/JHFSparkParser.java   |  64 ++
 .../jpm/spark/crawl/SparkApplicationInfo.java   |  69 ++
 .../SparkFilesystemInputStreamReaderImpl.java   |  53 ++
 .../eagle/jpm/mr/history/MRHistoryJobMain.java  |  24 +-
 .../jpm/mr/history/common/JPAConstants.java     |  95 ---
 .../eagle/jpm/mr/history/common/JobConfig.java  |  38 -
 .../history/entities/JPAEntityRepository.java   |   4 +-
 .../jpm/mr/history/entities/JobConfig.java      |  38 +
 .../mr/history/entities/JobConfigSerDeser.java  |   1 -
 .../entities/JobConfigurationAPIEntity.java     |   9 +-
 .../history/entities/JobCountersSerDeser.java   | 166 -----
 .../mr/history/entities/JobEventAPIEntity.java  |   4 +-
 .../history/entities/JobExecutionAPIEntity.java |  98 ++-
 .../entities/JobProcessTimeStampEntity.java     |   4 +-
 .../entities/TaskAttemptCounterAPIEntity.java   |   4 +-
 .../entities/TaskAttemptExecutionAPIEntity.java |   6 +-
 .../entities/TaskExecutionAPIEntity.java        |   6 +-
 .../entities/TaskFailureCountAPIEntity.java     |   4 +-
 .../jobcounter/CounterGroupDictionary.java      | 238 ------
 .../mr/history/jobcounter/CounterGroupKey.java  |  32 -
 .../jpm/mr/history/jobcounter/CounterKey.java   |  30 -
 .../history/jobcounter/JobCounterException.java |  63 --
 .../jpm/mr/history/jobcounter/JobCounters.java  |  47 --
 .../jpm/mr/history/parser/EagleJobTagName.java  |  48 --
 .../mr/history/parser/JHFEventReaderBase.java   | 161 ++--
 .../mr/history/parser/JHFMRVer1EventReader.java |   2 +-
 .../mr/history/parser/JHFMRVer2EventReader.java |   5 +-
 .../parser/JobEntityLifecycleAggregator.java    |  30 +-
 .../parser/TaskAttemptCounterListener.java      |  13 +-
 .../mr/history/parser/TaskFailureListener.java  |  17 +-
 .../src/main/resources/JobCounter.conf          |   3 +
 .../src/main/resources/application.conf         |   2 +-
 eagle-jpm/eagle-jpm-mr-running/pom.xml          | 126 ++++
 .../assembly/eagle-jpm-mr-running-assembly.xml  |  65 ++
 .../eagle/jpm/mr/running/MRRunningJobMain.java  |  96 +++
 .../running/config/MRRunningConfigManager.java  | 142 ++++
 .../running/entities/JPMEntityRepository.java   |  32 +
 .../jpm/mr/running/entities/JobConfig.java      |  25 +
 .../running/entities/JobExecutionAPIEntity.java | 437 +++++++++++
 .../entities/TaskAttemptExecutionAPIEntity.java | 135 ++++
 .../entities/TaskExecutionAPIEntity.java        | 136 ++++
 .../parser/MRJobEntityCreationHandler.java      |  98 +++
 .../jpm/mr/running/parser/MRJobParser.java      | 553 ++++++++++++++
 .../AbstractMetricsCreationListener.java        |  42 ++
 .../JobExecutionMetricsCreationListener.java    |  60 ++
 .../TaskExecutionMetricsCreationListener.java   |  45 ++
 .../mr/running/recover/MRRunningJobManager.java |  80 ++
 .../running/storm/MRRunningJobFetchSpout.java   | 171 +++++
 .../mr/running/storm/MRRunningJobParseBolt.java | 114 +++
 .../src/main/resources/JobCounter.conf          | 187 +++++
 .../src/main/resources/application.conf         |  86 +++
 .../src/main/resources/log4j.properties         |  35 +
 .../jpm/spark/history/crawl/EventType.java      |  24 -
 .../history/crawl/JHFInputStreamReader.java     |  25 -
 .../jpm/spark/history/crawl/JHFParserBase.java  |  29 -
 .../history/crawl/JHFSparkEventReader.java      | 701 ------------------
 .../jpm/spark/history/crawl/JHFSparkParser.java |  63 --
 .../history/crawl/SparkApplicationInfo.java     |  69 --
 .../SparkHistoryFileInputStreamReaderImpl.java  |  53 --
 .../status/JobHistoryZKStateManager.java        |   2 +-
 .../history/storm/FinishedSparkJobSpout.java    |  11 +-
 .../spark/history/storm/SparkJobParseBolt.java  | 127 ++--
 .../src/main/resources/application.conf         |   4 +-
 eagle-jpm/eagle-jpm-spark-running/pom.xml       | 158 +++-
 .../eagle-jpm-spark-running-assembly.xml        |  65 ++
 .../jpm/spark/running/SparkRunningJobMain.java  |  84 +++
 .../common/SparkRunningConfigManager.java       | 151 ++++
 .../eagle/jpm/spark/running/common/Util.java    |  35 +
 .../running/entities/JPMEntityRepository.java   |  30 +
 .../jpm/spark/running/entities/JobConfig.java   |  25 +
 .../spark/running/entities/SparkAppEntity.java  | 472 ++++++++++++
 .../running/entities/SparkExecutorEntity.java   | 232 ++++++
 .../spark/running/entities/SparkJobEntity.java  | 190 +++++
 .../running/entities/SparkStageEntity.java      | 298 ++++++++
 .../spark/running/entities/SparkTaskEntity.java | 289 ++++++++
 .../parser/SparkAppEntityCreationHandler.java   |  73 ++
 .../running/parser/SparkApplicationParser.java  | 647 ++++++++++++++++
 .../running/recover/SparkRunningJobManager.java |  80 ++
 .../storm/SparkRunningJobFetchSpout.java        | 178 +++++
 .../running/storm/SparkRunningJobParseBolt.java | 110 +++
 .../services/org.apache.hadoop.fs.FileSystem    |  20 +
 .../src/main/resources/application.conf         |  66 ++
 .../src/main/resources/log4j.properties         |  35 +
 .../org/apache/eagle/jpm/util/Constants.java    | 123 +++-
 .../org/apache/eagle/jpm/util/HDFSUtil.java     |   2 +-
 .../org/apache/eagle/jpm/util/MRJobTagName.java |  48 ++
 .../java/org/apache/eagle/jpm/util/Utils.java   |  89 +++
 .../util/jobcounter/CounterGroupDictionary.java | 238 ++++++
 .../jpm/util/jobcounter/CounterGroupKey.java    |  32 +
 .../eagle/jpm/util/jobcounter/CounterKey.java   |  30 +
 .../util/jobcounter/JobCounterException.java    |  63 ++
 .../eagle/jpm/util/jobcounter/JobCounters.java  |  48 ++
 .../util/jobcounter/JobCountersSerDeser.java    | 165 +++++
 .../jpm/util/jobrecover/RunningJobManager.java  | 255 +++++++
 .../util/resourceFetch/RMResourceFetcher.java   | 103 ++-
 .../jpm/util/resourceFetch/ResourceFetcher.java |   6 +-
 .../SparkHistoryServerResourceFetcher.java      |   8 +-
 .../jpm/util/resourceFetch/model/AppInfo.java   |  67 +-
 .../util/resourceFetch/model/AppsWrapper.java   |   3 +-
 .../util/resourceFetch/model/ClusterInfo.java   | 119 +++
 .../resourceFetch/model/ClusterInfoWrapper.java |  35 +
 .../resourceFetch/model/JobCounterGroup.java    |  42 ++
 .../resourceFetch/model/JobCounterItem.java     |  55 ++
 .../util/resourceFetch/model/JobCounters.java   |  42 ++
 .../resourceFetch/model/JobCountersWrapper.java |  35 +
 .../jpm/util/resourceFetch/model/MRJob.java     | 289 ++++++++
 .../util/resourceFetch/model/MRJobsWrapper.java |  37 +
 .../jpm/util/resourceFetch/model/MRTask.java    | 109 +++
 .../util/resourceFetch/model/MRTaskAttempt.java | 136 ++++
 .../model/MRTaskAttemptWrapper.java             |  37 +
 .../resourceFetch/model/MRTaskAttempts.java     |  39 +
 .../jpm/util/resourceFetch/model/MRTasks.java   |  40 +
 .../resourceFetch/model/MRTasksWrapper.java     |  37 +
 .../jpm/util/resourceFetch/model/MrJobs.java    |  39 +
 .../util/resourceFetch/model/SparkExecutor.java | 155 ++++
 .../jpm/util/resourceFetch/model/SparkJob.java  | 165 +++++
 .../util/resourceFetch/model/SparkStage.java    | 211 ++++++
 .../jpm/util/resourceFetch/model/SparkTask.java | 111 +++
 .../model/SparkTaskInputMetrics.java            |  46 ++
 .../resourceFetch/model/SparkTaskMetrics.java   | 118 +++
 .../model/SparkTaskShuffleReadMetrics.java      |  82 +++
 .../model/SparkTaskShuffleWriteMetrics.java     |  55 ++
 .../resourceFetch/model/TaskCounterGroup.java   |  45 ++
 .../resourceFetch/model/TaskCounterItem.java    |  44 ++
 .../util/resourceFetch/model/TaskCounters.java  |  45 ++
 .../model/TaskCountersWrapper.java              |  35 +
 .../url/JobListServiceURLBuilderImpl.java       |  31 +-
 .../SparkCompleteJobServiceURLBuilderImpl.java  |   9 +-
 .../url/SparkJobServiceURLBuilderImpl.java      |   4 +-
 .../jpm/util/resourceFetch/url/URLUtil.java     |  33 +
 eagle-jpm/pom.xml                               |   1 +
 138 files changed, 10775 insertions(+), 1999 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index 8b28080..a00d75f 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -147,7 +147,11 @@ public class Integration1 {
 
         ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, 
ZKStringSerializer$.MODULE$);
         Properties topicConfiguration = new Properties();
+//        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+        AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);// 
RackAwareMode.Disabled$.MODULE$);
+
         AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);
+
     }
 
     public static void proactive_schedule(Config config) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
index 21a81ed..e030cf3 100644
--- 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
+++ 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/ha/HAURLSelectorImpl.java
@@ -47,13 +47,12 @@ public class HAURLSelectorImpl implements HAURLSelector {
        public boolean checkUrl(String urlString) {
                InputStream is = null;
                try {
+                       LOG.info("Getting input stream from url: " + urlString);
                        is = InputStreamUtils.getInputStream(urlString, 
compressionType);
-               }
-               catch (Exception ex) {
-                       LOG.info("get inputstream from url: " + urlString + " 
failed. ");
+               } catch (Exception ex) {
+                       LOG.error("Failed to get input stream from url: " + 
urlString);
                        return false;
-               }
-               finally {
+               } finally {
                        if (is != null) { try { is.close(); } catch 
(IOException e) {/*Do nothing*/} }
                }
                return true;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
index 62a15af..03c8ba6 100644
--- 
a/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
+++ 
b/eagle-core/eagle-data-process/eagle-storm-jobrunning-spout/src/main/java/org/apache/eagle/jobrunning/util/InputStreamUtils.java
@@ -23,8 +23,12 @@ import java.net.URLConnection;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.eagle.jobrunning.common.JobConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InputStreamUtils {
+       private static final Logger LOG = 
LoggerFactory.getLogger(InputStreamUtils.class);
+
 
        private static final int CONNECTION_TIMEOUT = 10 * 1000;
        private static final int READ_TIMEOUT = 5 * 60 * 1000;
@@ -47,6 +51,7 @@ public class InputStreamUtils {
        
        public static InputStream getInputStream(String urlString, 
JobConstants.CompressionType compressionType, int timeout) throws Exception {
                final URL url = URLConnectionUtils.getUrl(urlString);
+               LOG.info("Open connection. compression type:" + compressionType 
+ "; URL: " + url.toString());
                if (compressionType.equals(JobConstants.CompressionType.GZIP)) {
                        return openGZIPInputStream(url, timeout);
                }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
new file mode 100644
index 0000000..1ba15b7
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/EventType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+public enum EventType {
+    SparkListenerBlockManagerAdded, SparkListenerEnvironmentUpdate, 
SparkListenerApplicationStart,
+    SparkListenerExecutorAdded, 
SparkListenerJobStart,SparkListenerStageSubmitted, 
SparkListenerTaskStart,SparkListenerBlockManagerRemoved,
+    SparkListenerTaskEnd, SparkListenerStageCompleted, SparkListenerJobEnd, 
SparkListenerApplicationEnd,SparkListenerExecutorRemoved
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
new file mode 100644
index 0000000..feeee7b
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFInputStreamReader.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+import java.io.InputStream;
+
+public interface JHFInputStreamReader {
+    public void read(InputStream is) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
new file mode 100644
index 0000000..48701f7
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFParserBase.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+import java.io.InputStream;
+
+public interface JHFParserBase {
+    /**
+     * this method will ensure to close the inputstream
+     * @param is
+     * @throws Exception
+     */
+    public void parse(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
new file mode 100644
index 0000000..d5fda5a
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkEventReader.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import jline.internal.Log;
+import org.apache.eagle.jpm.entity.*;
+import org.apache.eagle.jpm.util.JSONUtil;
+import org.apache.eagle.jpm.util.JobNameNormalization;
+import org.apache.eagle.jpm.util.SparkEntityConstant;
+import org.apache.eagle.jpm.util.SparkJobTagName;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class JHFSparkEventReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JHFSparkEventReader.class);
+
+    public static final int FLUSH_LIMIT = 500;
+    private long firstTaskLaunchTime;
+
+    private Map<String, SparkExecutor> executors;
+    private SparkApp app;
+    private Map<Integer, SparkJob> jobs;
+    private Map<String, SparkStage> stages;
+    private Map<Integer, Set<String>> jobStageMap;
+    private Map<Integer, SparkTask> tasks;
+    private EagleServiceClientImpl client;
+    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
+
+    private List<TaggedLogAPIEntity> createEntities;
+
+    private Config conf;
+
+    public JHFSparkEventReader(Map<String, String> baseTags, 
SparkApplicationInfo info) {
+        app = new SparkApp();
+        app.setTags(new HashMap<String, String>(baseTags));
+        app.setYarnState(info.getState());
+        app.setYarnStatus(info.getFinalStatus());
+        createEntities = new ArrayList<>();
+        jobs = new HashMap<Integer, SparkJob>();
+        stages = new HashMap<String, SparkStage>();
+        jobStageMap = new HashMap<Integer, Set<String>>();
+        tasks = new HashMap<Integer, SparkTask>();
+        executors = new HashMap<String, SparkExecutor>();
+        stageTaskStatusMap = new HashMap<>();
+        conf = ConfigFactory.load();
+        this.initiateClient();
+    }
+
+    public SparkApp getApp() {
+        return this.app;
+    }
+
+    public void read(JSONObject eventObj) throws Exception {
+        String eventType = (String) eventObj.get("Event");
+        LOG.info("Event type: " + eventType);
+        if 
(eventType.equalsIgnoreCase(EventType.SparkListenerApplicationStart.toString()))
 {
+            handleAppStarted(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerEnvironmentUpdate.toString()))
 {
+            handleEnvironmentSet(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerExecutorAdded.toString())) {
+            handleExecutorAdd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerAdded.toString()))
 {
+            handleBlockManagerAdd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerJobStart.toString())) {
+            handleJobStart(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerStageSubmitted.toString())) {
+            handleStageSubmit(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerTaskStart.toString())) {
+            handleTaskStart(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerTaskEnd.toString())) {
+            handleTaskEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerStageCompleted.toString())) {
+            handleStageComplete(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerJobEnd.toString())) {
+            handleJobEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerExecutorRemoved.toString())) 
{
+            handleExecutorRemoved(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerApplicationEnd.toString())) {
+            handleAppEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(EventType.SparkListenerBlockManagerRemoved.toString()))
 {
+            //nothing to do now
+        } else {
+            LOG.info("Not registered event type:" + eventType);
+        }
+
+    }
+
+
+    private void handleEnvironmentSet(JSONObject event) {
+        app.setConfig(new JobConfig());
+        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
+
+        List<String> jobConfs = 
conf.getStringList("basic.jobConf.additional.info");
+        String[] props = {"spark.yarn.app.id", "spark.executor.memory", 
"spark.driver.host", "spark.driver.port",
+                "spark.driver.memory", "spark.scheduler.pool", 
"spark.executor.cores", "spark.yarn.am.memory",
+                "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", 
"spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", 
"spark.master"};
+        jobConfs.addAll(Arrays.asList(props));
+        for (String prop : jobConfs) {
+            if (sparkProps.containsKey(prop)) {
+                app.getConfig().getConfig().put(prop, (String) 
sparkProps.get(prop));
+            }
+        }
+    }
+
+    private Object getConfigVal(JobConfig config, String configName, String 
type) {
+        if (config.getConfig().containsKey(configName)) {
+            Object val = config.getConfig().get(configName);
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return Integer.parseInt((String) val);
+            } else {
+                return val;
+            }
+        } else {
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return conf.getInt("spark.defaultVal." + configName);
+            } else {
+                return conf.getString("spark.defaultVal." + configName);
+            }
+        }
+    }
+
+
+    private boolean isClientMode(JobConfig config) {
+        if 
(config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client")) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+
+    private void handleAppStarted(JSONObject event) {
+        //need update all entities tag before app start
+        List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
+        entities.addAll(this.executors.values());
+        entities.add(this.app);
+
+        long appStartTime = JSONUtil.getLong(event, "Timestamp");
+        for (TaggedLogAPIEntity entity : entities) {
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), 
JSONUtil.getString(event, "App ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), 
JSONUtil.getString(event, "App Name"));
+            // In yarn-client mode, attemptId is not available in the log, so 
we set attemptId = 1.
+            String attemptId = isClientMode(this.app.getConfig()) ? "1" : 
JSONUtil.getString(event, "App Attempt ID");
+            
entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), 
attemptId);
+            // the second argument of getNormalizeName() is changed to null 
because the original code contains sensitive text
+            // original second argument looks like: 
this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
+            
entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), 
this.getNormalizedName(JSONUtil.getString(event, "App Name"), null));
+            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), 
JSONUtil.getString(event, "User"));
+
+            entity.setTimestamp(appStartTime);
+        }
+
+        this.app.setStartTime(appStartTime);
+    }
+
+    private void handleExecutorAdd(JSONObject event) throws Exception {
+        String executorID = (String) event.get("Executor ID");
+        SparkExecutor executor = this.initiateExecutor(executorID, 
JSONUtil.getLong(event, "Timestamp"));
+
+        JSONObject executorInfo = JSONUtil.getJSONObject(event, "Executor 
Info");
+
+    }
+
+    private void handleBlockManagerAdd(JSONObject event) throws Exception {
+        long maxMemory = JSONUtil.getLong(event, "Maximum Memory");
+        long timestamp = JSONUtil.getLong(event, "Timestamp");
+        JSONObject blockInfo = JSONUtil.getJSONObject(event, "Block Manager 
ID");
+        String executorID = JSONUtil.getString(blockInfo, "Executor ID");
+        String hostport = String.format("%s:%s", JSONUtil.getString(blockInfo, 
"Host"), JSONUtil.getLong(blockInfo, "Port"));
+
+        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
+        executor.setMaxMemory(maxMemory);
+        executor.setHostPort(hostport);
+    }
+
+    private void handleTaskStart(JSONObject event) {
+        this.initializeTask(event);
+    }
+
+    private void handleTaskEnd(JSONObject event) {
+        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
+        Integer taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        SparkTask task = null;
+        if (tasks.containsKey(taskId)) {
+            task = tasks.get(taskId);
+        } else {
+            return;
+        }
+
+        task.setFailed(JSONUtil.getBoolean(taskInfo, "Failed"));
+        JSONObject taskMetrics = JSONUtil.getJSONObject(event, "Task Metrics");
+        if (null != taskMetrics) {
+            task.setExecutorDeserializeTime(JSONUtil.getLong(taskMetrics, 
"Executor Deserialize Time"));
+            task.setExecutorRunTime(JSONUtil.getLong(taskMetrics, "Executor 
Run Time"));
+            task.setJvmGcTime(JSONUtil.getLong(taskMetrics, "JVM GC Time"));
+            task.setResultSize(JSONUtil.getLong(taskMetrics, "Result Size"));
+            task.setResultSerializationTime(JSONUtil.getLong(taskMetrics, 
"Result Serialization Time"));
+            task.setMemoryBytesSpilled(JSONUtil.getLong(taskMetrics, "Memory 
Bytes Spilled"));
+            task.setDiskBytesSpilled(JSONUtil.getLong(taskMetrics, "Disk Bytes 
Spilled"));
+
+            JSONObject inputMetrics = JSONUtil.getJSONObject(taskMetrics, 
"Input Metrics");
+            if (null != inputMetrics) {
+                task.setInputBytes(JSONUtil.getLong(inputMetrics, "Bytes 
Read"));
+                task.setInputRecords(JSONUtil.getLong(inputMetrics, "Records 
Read"));
+            }
+
+            JSONObject outputMetrics = JSONUtil.getJSONObject(taskMetrics, 
"Output Metrics");
+            if (null != outputMetrics) {
+                task.setOutputBytes(JSONUtil.getLong(outputMetrics, "Bytes 
Written"));
+                task.setOutputRecords(JSONUtil.getLong(outputMetrics, "Records 
Written"));
+            }
+
+            JSONObject shuffleWriteMetrics = 
JSONUtil.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+            if (null != shuffleWriteMetrics) {
+                
task.setShuffleWriteBytes(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle Bytes 
Written"));
+                
task.setShuffleWriteRecords(JSONUtil.getLong(shuffleWriteMetrics, "Shuffle 
Records Written"));
+            }
+
+            JSONObject shuffleReadMetrics = 
JSONUtil.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+            if (null != shuffleReadMetrics) {
+                
task.setShuffleReadLocalBytes(JSONUtil.getLong(shuffleReadMetrics, "Local Bytes 
Read"));
+                
task.setShuffleReadRemoteBytes(JSONUtil.getLong(shuffleReadMetrics, "Remote 
Bytes Read"));
+                
task.setShuffleReadRecords(JSONUtil.getLong(shuffleReadMetrics, "Total Records 
Read"));
+            }
+        } else {
+            //for tasks success without task metrics, save in the end if no 
other information
+            if (!task.isFailed()) {
+                return;
+            }
+        }
+
+        aggregateToStage(task);
+        aggregateToExecutor(task);
+        tasks.remove(taskId);
+        this.flushEntities(task, false);
+    }
+
+
+    private SparkTask initializeTask(JSONObject event) {
+        SparkTask task = new SparkTask();
+        task.setTags(new HashMap(this.app.getTags()));
+        task.setTimestamp(app.getTimestamp());
+
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
JSONUtil.getLong(event, "Stage ID").toString());
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
JSONUtil.getLong(event, "Stage Attempt ID").toString());
+
+        JSONObject taskInfo = JSONUtil.getJSONObject(event, "Task Info");
+        int taskId = JSONUtil.getInt(taskInfo, "Task ID");
+        task.setTaskId(taskId);
+
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), 
JSONUtil.getInt(taskInfo, "Index").toString());
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), 
JSONUtil.getInt(taskInfo, "Attempt").toString());
+        long launchTime = JSONUtil.getLong(taskInfo, "Launch Time");
+        if (taskId == 0) {
+            this.setFirstTaskLaunchTime(launchTime);
+        }
+        task.setLaunchTime(launchTime);
+        task.setExecutorId(JSONUtil.getString(taskInfo, "Executor ID"));
+        task.setHost(JSONUtil.getString(taskInfo, "Host"));
+        task.setTaskLocality(JSONUtil.getString(taskInfo, "Locality"));
+        task.setSpeculative(JSONUtil.getBoolean(taskInfo, "Speculative"));
+
+        tasks.put(task.getTaskId(), task);
+        return task;
+    }
+
+    private void setFirstTaskLaunchTime(long launchTime) {
+        this.firstTaskLaunchTime = launchTime;
+    }
+
+    private long getFirstTaskLaunchTime() {
+        return this.firstTaskLaunchTime;
+    }
+
+
+    private void handleJobStart(JSONObject event) {
+        SparkJob job = new SparkJob();
+        job.setTags(new HashMap(this.app.getTags()));
+        job.setTimestamp(app.getTimestamp());
+
+        Integer jobId = JSONUtil.getInt(event, "Job ID");
+        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
jobId.toString());
+        job.setSubmissionTime(JSONUtil.getLong(event, "Submission Time"));
+
+        //for complete application, no active stages/tasks
+        job.setNumActiveStages(0);
+        job.setNumActiveTasks(0);
+
+        this.jobs.put(jobId, job);
+        this.jobStageMap.put(jobId, new HashSet<String>());
+
+        JSONArray stages = JSONUtil.getJSONArray(event, "Stage Infos");
+        job.setNumStages(stages.size());
+        for (int i = 0; i < stages.size(); i++) {
+            JSONObject stageInfo = (JSONObject) stages.get(i);
+            Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+            Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
+            String stageName = JSONUtil.getString(stageInfo, "Stage Name");
+            int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+            this.initiateStage(jobId, stageId, stageAttemptId, stageName, 
numTasks);
+        }
+    }
+
+    private void handleStageSubmit(JSONObject event) {
+        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
+        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
+        String key = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
+        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
+
+        if (!stages.containsKey(this.generateStageKey(stageId.toString(), 
stageAttemptId.toString()))) {
+            //may be further attempt for one stage
+            String baseAttempt = this.generateStageKey(stageId.toString(), 
"0");
+            if (stages.containsKey(baseAttempt)) {
+                SparkStage stage = stages.get(baseAttempt);
+                String jobId = 
stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
+
+                String stageName = JSONUtil.getString(event, "Stage Name");
+                int numTasks = JSONUtil.getInt(stageInfo, "Number of Tasks");
+                this.initiateStage(Integer.parseInt(jobId), stageId, 
stageAttemptId, stageName, numTasks);
+            }
+        }
+
+    }
+
+    private void handleStageComplete(JSONObject event) {
+        JSONObject stageInfo = JSONUtil.getJSONObject(event, "Stage Info");
+        Integer stageId = JSONUtil.getInt(stageInfo, "Stage ID");
+        Integer stageAttemptId = JSONUtil.getInt(stageInfo, "Stage Attempt 
ID");
+        String key = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
+        SparkStage stage = stages.get(key);
+
+        // If "Submission Time" is not available, use the "Launch Time" of 
"Task ID" = 0.
+        Long submissionTime = JSONUtil.getLong(stageInfo, "Submission Time");
+        if (submissionTime == null) {
+            submissionTime = this.getFirstTaskLaunchTime();
+        }
+        stage.setSubmitTime(submissionTime);
+        stage.setCompleteTime(JSONUtil.getLong(stageInfo, "Completion Time"));
+
+        if (stageInfo.containsKey("Failure Reason")) {
+            
stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.FAILED.toString());
+        } else {
+            
stage.setStatus(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString());
+        }
+    }
+
+    private void handleExecutorRemoved(JSONObject event) {
+        String executorID = JSONUtil.getString(event, "Executor ID");
+        SparkExecutor executor = executors.get(executorID);
+        executor.setEndTime(JSONUtil.getLong(event, "Timestamp"));
+
+    }
+
+    private void handleJobEnd(JSONObject event) {
+        Integer jobId = JSONUtil.getInt(event, "Job ID");
+        SparkJob job = jobs.get(jobId);
+        job.setCompletionTime(JSONUtil.getLong(event, "Completion Time"));
+        JSONObject jobResult = JSONUtil.getJSONObject(event, "Job Result");
+        String result = JSONUtil.getString(jobResult, "Result");
+        if (result.equalsIgnoreCase("JobSucceeded")) {
+            
job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.SUCCEEDED.toString());
+        } else {
+            
job.setStatus(SparkEntityConstant.SPARK_JOB_STATUS.FAILED.toString());
+        }
+    }
+
+    private void handleAppEnd(JSONObject event) {
+        long endTime = JSONUtil.getLong(event, "Timestamp");
+        app.setEndTime(endTime);
+    }
+
+    public void clearReader() throws Exception {
+        //clear tasks
+        for (SparkTask task : tasks.values()) {
+            LOG.info("Task {} does not have result or no task metrics.", 
task.getTaskId());
+            task.setFailed(true);
+            aggregateToStage(task);
+            aggregateToExecutor(task);
+            this.flushEntities(task, false);
+        }
+
+        List<SparkStage> needStoreStages = new ArrayList<>();
+        for (SparkStage stage : this.stages.values()) {
+            Integer jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
+                SparkJob job = this.jobs.get(jobId);
+                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
+                job.setNumSkippedTasks(job.getNumSkippedTasks() + 
stage.getNumTasks());
+            } else {
+                this.aggregateToJob(stage);
+                this.aggregateStageToApp(stage);
+                needStoreStages.add(stage);
+            }
+            String stageId = 
stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+            String stageAttemptId = 
stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, 
stageAttemptId));
+        }
+
+        this.flushEntities(needStoreStages, false);
+        for (SparkJob job : jobs.values()) {
+            this.aggregateJobToApp(job);
+        }
+        this.flushEntities(jobs.values(), false);
+
+        app.setExecutors(executors.values().size());
+        long executorMemory = parseExecutorMemory((String) 
this.getConfigVal(this.app.getConfig(), "spark.executor.memory", 
String.class.getName()));
+        long driverMemory = 
parseExecutorMemory(this.isClientMode(app.getConfig()) ? (String) 
this.getConfigVal(this.app.getConfig(), "spark.yarn.am.memory", 
String.class.getName()) : (String) this.getConfigVal(app.getConfig(), 
"spark.driver.memory", String.class.getName()));
+        int executoreCore = (Integer) this.getConfigVal(app.getConfig(), 
"spark.executor.cores", Integer.class.getName());
+        int driverCore = this.isClientMode(app.getConfig()) ? (Integer) 
this.getConfigVal(app.getConfig(), "spark.yarn.am.cores", 
Integer.class.getName()) : (Integer) this.getConfigVal(app.getConfig(), 
"spark.driver.cores", Integer.class.getName());
+        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), 
executorMemory, "spark.yarn.executor.memoryOverhead");
+        long driverMemoryOverhead = this.isClientMode(app.getConfig()) ? 
this.getMemoryOverhead(app.getConfig(), driverMemory, 
"spark.yarn.am.memoryOverhead") : this.getMemoryOverhead(app.getConfig(), 
driverMemory, "spark.yarn.driver.memoryOverhead");
+
+        app.setExecMemoryBytes(executorMemory);
+        app.setDriveMemoryBytes(driverMemory);
+        app.setExecutorCores(executoreCore);
+        app.setDriverCores(driverCore);
+        app.setExecutorMemoryOverhead(executorMemoryOverhead);
+        app.setDriverMemoryOverhead(driverMemoryOverhead);
+
+        for (SparkExecutor executor : executors.values()) {
+            String executorID = 
executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
+            if (executorID.equalsIgnoreCase("driver")) {
+                executor.setExecMemoryBytes(driverMemory);
+                executor.setCores(driverCore);
+                executor.setMemoryOverhead(driverMemoryOverhead);
+            } else {
+                executor.setExecMemoryBytes(executorMemory);
+                executor.setCores(executoreCore);
+                executor.setMemoryOverhead(executorMemoryOverhead);
+            }
+            if (executor.getEndTime() == 0)
+                executor.setEndTime(app.getEndTime());
+            this.aggregateExecutorToApp(executor);
+        }
+        this.flushEntities(executors.values(), false);
+        //spark code...tricky
+        app.setSkippedTasks(app.getCompleteTasks());
+        this.flushEntities(app, true);
+    }
+
+    private long getMemoryOverhead(JobConfig config, long executorMemory, 
String fieldName) {
+        long result = 0l;
+        if (config.getConfig().containsKey(fieldName)) {
+            result = 
this.parseExecutorMemory(config.getConfig().get(fieldName) + "m");
+            if(result  == 0l){
+               result = 
this.parseExecutorMemory(config.getConfig().get(fieldName));
+            }
+        }
+
+        if(result == 0l){
+            result =  
Math.max(this.parseExecutorMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
 executorMemory * conf.getInt("spark.defaultVal." + fieldName + ".factor") / 
100);
+        }
+        return result;
+    }
+
+    private void aggregateExecutorToApp(SparkExecutor executor) {
+        app.setTotalExecutorTime(app.getTotalExecutorTime() + 
(executor.getEndTime() - executor.getStartTime()));
+    }
+
+    private void aggregateJobToApp(SparkJob job) {
+        //aggregate job level metrics
+        app.setNumJobs(app.getNumJobs() + 1);
+        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
+        app.setCompleteTasks(app.getCompleteTasks() + 
job.getNumCompletedTasks());
+        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
+        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
+        app.setTotalStages(app.getTotalStages() + job.getNumStages());
+        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
+        app.setSkippedStages(app.getSkippedStages() + 
job.getNumSkippedStages());
+    }
+
+    private void aggregateStageToApp(SparkStage stage) {
+        //aggregate task level metrics
+        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + 
stage.getDiskBytesSpilled());
+        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + 
stage.getMemoryBytesSpilled());
+        app.setExecutorRunTime(app.getExecutorRunTime() + 
stage.getExecutorRunTime());
+        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
+        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + 
stage.getExecutorDeserializeTime());
+        app.setResultSerializationTime(app.getResultSerializationTime() + 
stage.getResultSerializationTime());
+        app.setResultSize(app.getResultSize() + stage.getResultSize());
+        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
+        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
+        app.setOutputRecords(app.getOutputRecords() + 
stage.getOutputRecords());
+        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
+        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + 
stage.getShuffleWriteRecords());
+        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + 
stage.getShuffleWriteBytes());
+        app.setShuffleReadRecords(app.getShuffleReadRecords() + 
stage.getShuffleReadRecords());
+        app.setShuffleReadBytes(app.getShuffleReadBytes() + 
stage.getShuffleReadBytes());
+    }
+
+    private void aggregateToStage(SparkTask task) {
+        String stageId = 
task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+        String stageAttemptId = 
task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+        String key = this.generateStageKey(stageId, stageAttemptId);
+        SparkStage stage = stages.get(key);
+
+        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + 
task.getDiskBytesSpilled());
+        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + 
task.getMemoryBytesSpilled());
+        stage.setExecutorRunTime(stage.getExecutorRunTime() + 
task.getExecutorRunTime());
+        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
+        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + 
task.getExecutorDeserializeTime());
+        stage.setResultSerializationTime(stage.getResultSerializationTime() + 
task.getResultSerializationTime());
+        stage.setResultSize(stage.getResultSize() + task.getResultSize());
+        stage.setInputRecords(stage.getInputRecords() + 
task.getInputRecords());
+        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
+        stage.setOutputRecords(stage.getOutputRecords() + 
task.getOutputRecords());
+        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
+        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + 
task.getShuffleWriteRecords());
+        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + 
task.getShuffleWriteBytes());
+        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + 
task.getShuffleReadRecords());
+        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
+        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + 
taskShuffleReadBytes);
+
+        boolean success = !task.isFailed();
+
+        Integer taskIndex = 
Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
+        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
+            //has previous task attempt, retrieved from task index in one stage
+            boolean previousResult = 
stageTaskStatusMap.get(key).get(taskIndex);
+            success = previousResult || success;
+            if (previousResult != success) {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+                stageTaskStatusMap.get(key).put(taskIndex, success);
+            }
+        } else {
+            if (success) {
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+            } else {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
+            }
+            stageTaskStatusMap.get(key).put(taskIndex, success);
+        }
+
+    }
+
+    private void aggregateToExecutor(SparkTask task) {
+        String executorId = task.getExecutorId();
+        SparkExecutor executor = executors.get(executorId);
+
+        if (null != executor) {
+            executor.setTotalTasks(executor.getTotalTasks() + 1);
+            if (task.isFailed()) {
+                executor.setFailedTasks(executor.getFailedTasks() + 1);
+            } else {
+                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
+            }
+            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
+            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + 
taskShuffleReadBytes);
+            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
+            executor.setTotalInputBytes(executor.getTotalInputBytes() + 
task.getInputBytes());
+            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + 
task.getShuffleWriteBytes());
+            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
+        }
+
+    }
+
+    private void aggregateToJob(SparkStage stage) {
+        Integer jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+        SparkJob job = jobs.get(jobId);
+        job.setNumCompletedTasks(job.getNumCompletedTasks() + 
stage.getNumCompletedTasks());
+        job.setNumFailedTasks(job.getNumFailedTasks() + 
stage.getNumFailedTasks());
+        job.setNumTask(job.getNumTask() + stage.getNumTasks());
+
+
+        if 
(stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()))
 {
+            //if multiple attempts succeed, just count one
+            if (!hasStagePriorAttemptSuccess(stage)) {
+                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
+            }
+
+        } else {
+            job.setNumFailedStages(job.getNumFailedStages() + 1);
+        }
+    }
+
+    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
+        Integer stageAttemptId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+        for (Integer i = 0; i < stageAttemptId; i++) {
+            SparkStage previousStage = 
stages.get(this.generateStageKey(stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()),
 i.toString()));
+            if 
(previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SPARK_STAGE_STATUS.COMPLETE.toString()))
 {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    private String generateStageKey(String stageId, String stageAttemptId) {
+        return String.format("%s-%s", stageId, stageAttemptId);
+    }
+
+    private void initiateStage(Integer jobId, Integer stageId, Integer 
stageAttemptId, String name, int numTasks) {
+        SparkStage stage = new SparkStage();
+        stage.setTags(new HashMap(this.app.getTags()));
+        stage.setTimestamp(app.getTimestamp());
+        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
jobId.toString());
+        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
stageId.toString());
+        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
stageAttemptId.toString());
+        stage.setName(name);
+        stage.setNumActiveTasks(0);
+        stage.setNumTasks(numTasks);
+        
stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool")
 == null ? "default" : 
this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+
+        String stageKey = this.generateStageKey(stageId.toString(), 
stageAttemptId.toString());
+        stages.put(stageKey, stage);
+        this.jobStageMap.get(jobId).add(stageKey);
+    }
+
+
+    private SparkExecutor initiateExecutor(String executorID, long startTime) 
throws Exception {
+        if (!executors.containsKey(executorID)) {
+            SparkExecutor executor = new SparkExecutor();
+            executor.setTags(new HashMap(this.app.getTags()));
+            
executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), 
executorID);
+            executor.setStartTime(startTime);
+            executor.setTimestamp(app.getTimestamp());
+
+            this.executors.put(executorID, executor);
+        }
+
+        return this.executors.get(executorID);
+    }
+
+    private String getNormalizedName(String jobName, String assignedName) {
+        if (null != assignedName) {
+            return assignedName;
+        } else {
+            return JobNameNormalization.getInstance().normalize(jobName);
+        }
+    }
+
+    private long parseExecutorMemory(String memory) {
+
+        if (memory.endsWith("g") || memory.endsWith("G")) {
+            int executorGB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
+            return 1024l * 1024 * 1024 * executorGB;
+        } else if (memory.endsWith("m") || memory.endsWith("M")) {
+            int executorMB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
+            return 1024l * 1024 * executorMB;
+        } else if (memory.endsWith("k") || memory.endsWith("K")) {
+            int executorKB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
+            return 1024l * executorKB;
+        } else if (memory.endsWith("t") || memory.endsWith("T")) {
+            int executorTB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * executorTB;
+        } else if (memory.endsWith("p") || memory.endsWith("P")) {
+            int executorPB = Integer.parseInt(memory.substring(0, 
memory.length() - 1));
+            return 1024l * 1024 * 1024 * 1024 * 1024 * executorPB;
+        }
+        Log.info("Cannot parse memory info " +  memory);
+        return 0l;
+    }
+
+    private void flushEntities(Object entity, boolean forceFlush) {
+        this.flushEntities(Arrays.asList(entity), forceFlush);
+    }
+
+    private void flushEntities(Collection entities, boolean forceFlush) {
+        this.createEntities.addAll(entities);
+
+        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+            try {
+                this.doFlush(this.createEntities);
+                this.createEntities.clear();
+            } catch (Exception e) {
+                LOG.error("Fail to flush entities", e);
+            }
+
+        }
+    }
+
+    private EagleServiceBaseClient initiateClient() {
+        String host = conf.getString("eagleProps.eagle.service.host");
+        int port = conf.getInt("eagleProps.eagle.service.port");
+        String userName = conf.getString("eagleProps.eagle.service.userName");
+        String pwd = conf.getString("eagleProps.eagle.service.pwd");
+        client = new EagleServiceClientImpl(host, port, userName, pwd);
+        int timeout = conf.getInt("eagleProps.eagle.service.read_timeout");
+        client.getJerseyClient().setReadTimeout(timeout * 1000);
+
+        return client;
+    }
+
+    private void doFlush(List entities) throws Exception {
+        LOG.info("start flushing entities of total number " + entities.size());
+//        client.create(entities);
+        LOG.info("finish flushing entities of total number " + 
entities.size());
+//        for(Object entity: entities){
+//            if(entity instanceof SparkApp){
+//                for (Field field : entity.getClass().getDeclaredFields()) {
+//                    field.setAccessible(true); // You might want to set 
modifier to public first.
+//                    Object value = field.get(entity);
+//                    if (value != null) {
+//                        System.out.println(field.getName() + "=" + value);
+//                    }
+//                }
+//            }
+//        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
new file mode 100644
index 0000000..171cb0f
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/JHFSparkParser.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class JHFSparkParser implements JHFParserBase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JHFSparkParser.class);
+
+    JHFSparkEventReader eventReader;
+
+    public JHFSparkParser(JHFSparkEventReader reader){
+        this.eventReader = reader;
+    }
+
+    @Override
+    public void parse(InputStream is) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+        try {
+            String line = null;
+
+            JSONParser parser = new JSONParser();
+            while((line = reader.readLine()) != null){
+                try{
+                    JSONObject eventObj = (JSONObject) parser.parse(line);
+                    String eventType = (String) eventObj.get("Event");
+                    logger.info("Event type: " + eventType);
+                    this.eventReader.read(eventObj);
+                }catch(Exception e){
+                    logger.error(String.format("Invalid json string. Fail to 
parse %s.", line), e);
+                }
+            }
+            this.eventReader.clearReader();
+        } finally {
+            if(reader != null){
+                reader.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
new file mode 100644
index 0000000..423d045
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkApplicationInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+public class SparkApplicationInfo {
+
+    private String state;
+    private String finalStatus;
+    private String queue;
+    private String name;
+    private String user;
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public String getFinalStatus() {
+        return finalStatus;
+    }
+
+    public void setFinalStatus(String finalStatus) {
+        this.finalStatus = finalStatus;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
new file mode 100644
index 0000000..f1d2cd1
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.crawl;
+
+import org.apache.eagle.jpm.util.SparkJobTagName;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkFilesystemInputStreamReaderImpl implements 
JHFInputStreamReader {
+
+    private String site;
+    private SparkApplicationInfo app;
+
+
+    public SparkFilesystemInputStreamReaderImpl(String site, 
SparkApplicationInfo app){
+        this.site = site;
+        this.app = app;
+    }
+
+    @Override
+    public void read(InputStream is) throws Exception {
+        Map<String, String> baseTags = new HashMap<>();
+        baseTags.put(SparkJobTagName.SITE.toString(), site);
+        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+        JHFParserBase parser = new JHFSparkParser(new 
JHFSparkEventReader(baseTags, this.app));
+        parser.parse(is);
+    }
+
+    public static void main(String[] args) throws Exception{
+        SparkFilesystemInputStreamReaderImpl impl = new 
SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+        impl.read(new FileInputStream(new 
File("E:\\eagle\\application_1459803563374_535667_1")));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
index 7c0530d..ffa2f22 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobMain.java
@@ -23,11 +23,11 @@ import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.topology.TopologyBuilder;
 import org.apache.eagle.jpm.mr.history.common.JHFConfigManager;
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
 import org.apache.eagle.jpm.mr.history.storm.HistoryJobProgressBolt;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
+import org.apache.eagle.jpm.util.Constants;
 
 import java.util.List;
 import java.util.regex.Pattern;
@@ -37,14 +37,15 @@ public class MRHistoryJobMain {
         try {
             //1. trigger init conf
             JHFConfigManager jhfConfigManager = 
JHFConfigManager.getInstance(args);
+            com.typesafe.config.Config jhfAppConf = 
jhfConfigManager.getConfig();
 
             //2. init JobHistoryContentFilter
             JobHistoryContentFilterBuilder builder = 
JobHistoryContentFilterBuilder.newBuilder().acceptJobFile().acceptJobConfFile();
-            List<String> confKeyPatterns = 
jhfConfigManager.getConfig().getStringList("MRConfigureKeys");
-            confKeyPatterns.add(JPAConstants.JobConfiguration.CASCADING_JOB);
-            confKeyPatterns.add(JPAConstants.JobConfiguration.HIVE_JOB);
-            confKeyPatterns.add(JPAConstants.JobConfiguration.PIG_JOB);
-            confKeyPatterns.add(JPAConstants.JobConfiguration.SCOOBI_JOB);
+            List<String> confKeyPatterns = 
jhfAppConf.getStringList("MRConfigureKeys");
+            confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB);
+            confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB);
+            confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB);
+            confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB);
 
             for (String key : confKeyPatterns) {
                 builder.includeJobKeyPatterns(Pattern.compile(key));
@@ -54,10 +55,13 @@ public class MRHistoryJobMain {
             //3. init topology
             TopologyBuilder topologyBuilder = new TopologyBuilder();
             String topologyName = "mrHistoryJobTopology";
+            if (jhfAppConf.hasPath("envContextConfig.topologyName")) {
+                topologyName = 
jhfAppConf.getString("envContextConfig.topologyName");
+            }
             String spoutName = "mrHistoryJobExecutor";
             String boltName = "updateProcessTime";
-            int parallelism = 
jhfConfigManager.getConfig().getInt("envContextConfig.parallelismConfig." + 
spoutName);
-            int tasks = 
jhfConfigManager.getConfig().getInt("envContextConfig.tasks." + spoutName);
+            int parallelism = 
jhfAppConf.getInt("envContextConfig.parallelismConfig." + spoutName);
+            int tasks = jhfAppConf.getInt("envContextConfig.tasks." + 
spoutName);
             if (parallelism > tasks) {
                 parallelism = tasks;
             }
@@ -68,8 +72,8 @@ public class MRHistoryJobMain {
             ).setNumTasks(tasks);
             topologyBuilder.setBolt(boltName, new 
HistoryJobProgressBolt(spoutName, jhfConfigManager), 
1).setNumTasks(1).allGrouping(spoutName);
 
-            backtype.storm.Config config = new backtype.storm.Config();
-            
config.setNumWorkers(jhfConfigManager.getConfig().getInt("envContextConfig.workers"));
+            Config config = new backtype.storm.Config();
+            
config.setNumWorkers(jhfAppConf.getInt("envContextConfig.workers"));
             config.put(Config.TOPOLOGY_DEBUG, true);
             if (!jhfConfigManager.getEnv().equals("local")) {
                 //cluster mode

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
deleted file mode 100755
index feb5498..0000000
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JPAConstants.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.common;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JPAConstants {
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(JPAConstants.class);
-
-    public static final String JPA_JOB_CONFIG_SERVICE_NAME = 
"JobConfigService";
-    public static final String JPA_JOB_EVENT_SERVICE_NAME = "JobEventService";
-    public static final String JPA_JOB_EXECUTION_SERVICE_NAME = 
"JobExecutionService";
-
-    public static final String JPA_TASK_ATTEMPT_EXECUTION_SERVICE_NAME = 
"TaskAttemptExecutionService";
-    public static final String JPA_TASK_FAILURE_COUNT_SERVICE_NAME = 
"TaskFailureCountService";
-    public static final String JPA_TASK_ATTEMPT_COUNTER_SERVICE_NAME = 
"TaskAttemptCounterService";
-    public static final String JPA_TASK_EXECUTION_SERVICE_NAME = 
"TaskExecutionService";
-    public static final String JPA_JOB_PROCESS_TIME_STAMP_NAME = 
"JobProcessTimeStampService";
-
-    public static final String JOB_TASK_TYPE_TAG = "taskType";
-
-    public static class JobConfiguration {
-        // job type
-        public static final String SCOOBI_JOB = "scoobi.mode";
-        public static final String HIVE_JOB = "hive.query.string";
-        public static final String PIG_JOB = "pig.script";
-        public static final String CASCADING_JOB = "cascading.app.name";
-    }
-
-    /**
-     * MR task types
-     */
-    public enum TaskType {
-        SETUP, MAP, REDUCE, CLEANUP
-    }
-
-    public enum JobType {
-        CASCADING("CASCADING"),HIVE("HIVE"),PIG("PIG"),SCOOBI("SCOOBI"),
-        NOTAVALIABLE("N/A")
-        ;
-        private String value;
-        JobType(String value){
-            this.value = value;
-        }
-        @Override
-        public String toString() {
-            return this.value;
-        }
-    }
-
-    public static final String FILE_SYSTEM_COUNTER = 
"org.apache.hadoop.mapreduce.FileSystemCounter";
-    public static final String TASK_COUNTER = 
"org.apache.hadoop.mapreduce.TaskCounter";
-
-    public static final String MAP_TASK_ATTEMPT_COUNTER = 
"MapTaskAttemptCounter";
-    public static final String REDUCE_TASK_ATTEMPT_COUNTER = 
"ReduceTaskAttemptCounter";
-
-    public static final String MAP_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = 
"MapTaskAttemptFileSystemCounter";
-    public static final String REDUCE_TASK_ATTEMPT_FILE_SYSTEM_COUNTER = 
"ReduceTaskAttemptFileSystemCounter";
-
-    public enum TaskAttemptCounter {
-        TASK_ATTEMPT_DURATION,
-    }
-
-
-
-    private static final String DEFAULT_JOB_CONF_NORM_JOBNAME_KEY = 
"eagle.job.name";
-    private static final String EAGLE_NORM_JOBNAME_CONF_KEY = 
"eagle.job.normalizedfieldname";
-
-    public static String JOB_CONF_NORM_JOBNAME_KEY = null;
-
-    static {
-        if (JOB_CONF_NORM_JOBNAME_KEY == null) {
-            JOB_CONF_NORM_JOBNAME_KEY = DEFAULT_JOB_CONF_NORM_JOBNAME_KEY;
-        }
-        LOG.info("Loaded " + EAGLE_NORM_JOBNAME_CONF_KEY + " : " + 
JOB_CONF_NORM_JOBNAME_KEY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
deleted file mode 100644
index f85f7bc..0000000
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/common/JobConfig.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.common;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public final class JobConfig {
-    private Map<String, String> config = new TreeMap<>();
-
-    public Map<String, String> getConfig() {
-        return config;
-    }
-
-    public void setConfig(Map<String, String> config) {
-        this.config = config;
-    }
-    
-    public String toString(){
-        return config.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
index d49cdef..964d68a 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JPAEntityRepository.java
@@ -18,8 +18,8 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JobConfig;
-import org.apache.eagle.jpm.mr.history.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCounters;
+import org.apache.eagle.jpm.util.jobcounter.JobCountersSerDeser;
 import org.apache.eagle.log.entity.repo.EntityRepository;
 
 public class JPAEntityRepository extends EntityRepository {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java
new file mode 100644
index 0000000..f1dc375
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.eagle.jpm.mr.history.entities;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public final class JobConfig {
+    private Map<String, String> config = new TreeMap<>();
+
+    public Map<String, String> getConfig() {
+        return config;
+    }
+
+    public void setConfig(Map<String, String> config) {
+        this.config = config;
+    }
+    
+    public String toString(){
+        return config.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
index 65f535f..8776f1f 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigSerDeser.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JobConfig;
 import org.apache.eagle.log.entity.meta.EntitySerDeser;
 import org.apache.hadoop.hbase.util.Bytes;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
index 44fa98c..295cc68 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobConfigurationAPIEntity.java
@@ -18,8 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
-import org.apache.eagle.jpm.mr.history.common.JobConfig;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -27,12 +26,12 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa")
 @ColumnFamily("f")
 @Prefix("jconf")
-@Service(JPAConstants.JPA_JOB_CONFIG_SERVICE_NAME)
+@Service(Constants.JPA_JOB_CONFIG_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 @Indexes({
-        @Index(name="Index_1_jobId", columns = { "jobID" }, unique = true),
-        @Index(name="Index_2_normJobName", columns = { "normJobName" }, unique 
= false)
+        @Index(name="Index_1_jobId", columns = { "jobId" }, unique = true),
+        @Index(name="Index_2_jobDefId", columns = { "jobDefId" }, unique = 
false)
 })
 public class JobConfigurationAPIEntity extends JobBaseAPIEntity {
     

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
deleted file mode 100755
index 01044bb..0000000
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobCountersSerDeser.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.eagle.jpm.mr.history.entities;
-
-import org.apache.eagle.jpm.mr.history.jobcounter.*;
-import org.apache.eagle.log.entity.meta.EntitySerDeser;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class JobCountersSerDeser implements EntitySerDeser<JobCounters> {
-
-    private CounterGroupDictionary dictionary = null;
-
-    @Override
-    public JobCounters deserialize(byte[] bytes) {
-        JobCounters counters = new JobCounters();
-        final int length = bytes.length;
-        if (length < 4) {
-            return counters;
-        }
-
-        final Map<String, Map<String, Long> > groupMap = 
counters.getCounters();
-        int pos = 0;
-        final int totalGroups = Bytes.toInt(bytes, pos);
-        pos += 4;
-        
-        for (int i = 0; i < totalGroups; ++i) {
-            final int groupIndex = Bytes.toInt(bytes, pos);
-            pos += 4;
-            final int totalCounters = Bytes.toInt(bytes, pos);
-            pos += 4;
-            final int nextGroupPos = pos + (totalCounters * 12);
-            try {
-                final CounterGroupKey groupKey = getCounterGroup(groupIndex);
-                if (groupKey == null) {
-                    throw new JobCounterException("Group index " + groupIndex 
+ " is not defined");
-                }
-                final Map<String, Long> counterMap = new TreeMap<String, 
Long>();
-                groupMap.put(groupKey.getName(), counterMap);
-                for (int j = 0; j < totalCounters; ++j) {
-                    final int counterIndex = Bytes.toInt(bytes, pos);
-                    pos += 4;
-                    final long value = Bytes.toLong(bytes, pos);
-                    pos += 8;
-                    final CounterKey counterKey = 
groupKey.getCounterKeyByID(counterIndex);
-                    if (counterKey == null) {
-                        continue;
-                    }
-                    counterMap.put(counterKey.getNames().get(0), value);
-                }
-            } catch (JobCounterException ex) {
-                // skip the group
-                pos = nextGroupPos;
-            }
-        }
-        return counters;
-    }
-
-    @Override
-    public byte[] serialize(JobCounters counters) {
-        
-        final Map<String, Map<String, Long>> groupMap = counters.getCounters();
-        int totalSize = 4;
-        for (Map<String, Long> counterMap : groupMap.values()) {
-            final int counterCount = counterMap.size();
-            totalSize += counterCount * 12 + 8;
-        }
-        byte[] buffer = new byte[totalSize];
-
-        int totalGroups = 0;
-        int pos = 0;
-        int totalGroupNumberPos = pos;
-        pos += 4;
-        int nextGroupPos = pos;
-        
-        for (Map.Entry<String, Map<String, Long>> entry : groupMap.entrySet()) 
{
-            final String groupName = entry.getKey();
-            final Map<String, Long> counterMap = entry.getValue();
-            try {
-                nextGroupPos = pos = serializeGroup(buffer, pos, groupName, 
counterMap);
-                ++totalGroups;
-            } catch (JobCounterException ex) {
-                pos = nextGroupPos;
-            }
-        }
-        
-        Bytes.putInt(buffer, totalGroupNumberPos, totalGroups);
-        if (pos < totalSize) {
-            buffer = Arrays.copyOf(buffer, pos);
-        }
-        return buffer;
-    }
-
-    @Override
-    public Class<JobCounters> type() {
-        return JobCounters.class;
-    }
-
-    private int serializeGroup(byte[] buffer, int currentPos, String 
groupName, Map<String, Long> counterMap) throws JobCounterException {
-        int pos = currentPos;
-        final CounterGroupKey groupKey = getCounterGroup(groupName);
-        if (groupKey == null) {
-            throw new JobCounterException("Group name " + groupName + " is not 
defined");
-        }
-        Bytes.putInt(buffer, pos, groupKey.getIndex());
-        pos += 4;
-        int totalCounterNumberPos = pos;
-        pos += 4;
-        int totalCounters = 0;
-        
-        for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
-            final String counterName = entry.getKey();
-            final CounterKey counterKey = 
groupKey.getCounterKeyByName(counterName);
-            if (counterKey == null) {
-                continue;
-            }
-            final Long counterValue = entry.getValue();
-            Bytes.putInt(buffer, pos, counterKey.getIndex());
-            pos += 4;
-            Bytes.putLong(buffer, pos, counterValue);
-            pos += 8;
-            ++totalCounters;
-        }
-        Bytes.putInt(buffer, totalCounterNumberPos, totalCounters);
-        return pos;
-    }
-
-    private CounterGroupKey getCounterGroup(String groupName) throws 
JobCounterException {
-        if (dictionary == null) {
-            dictionary = CounterGroupDictionary.getInstance();
-        }
-        final CounterGroupKey groupKey = 
dictionary.getCounterGroupByName(groupName);
-        if (groupKey == null) {
-            throw new JobCounterException("Invalid counter group name: " + 
groupName);
-        }
-        return groupKey;
-    }
-
-    private CounterGroupKey getCounterGroup(int groupIndex) throws 
JobCounterException {
-        if (dictionary == null) {
-            dictionary = CounterGroupDictionary.getInstance();
-        }
-        return dictionary.getCounterGroupByIndex(groupIndex);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/5bf2c62d/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
index 3639ad0..31dd480 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/entities/JobEventAPIEntity.java
@@ -18,7 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history.entities;
 
-import org.apache.eagle.jpm.mr.history.common.JPAConstants;
+import org.apache.eagle.jpm.util.Constants;
 import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
@@ -26,7 +26,7 @@ import org.codehaus.jackson.map.annotate.JsonSerialize;
 @Table("eaglejpa")
 @ColumnFamily("f")
 @Prefix("jevent")
-@Service(JPAConstants.JPA_JOB_EVENT_SERVICE_NAME)
+@Service(Constants.JPA_JOB_EVENT_SERVICE_NAME)
 @TimeSeries(true)
 @Partition({"site"})
 public class JobEventAPIEntity extends JobBaseAPIEntity {


Reply via email to