http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
new file mode 100644
index 0000000..211d6b7
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkJobEntity.java
@@ -0,0 +1,191 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+import java.util.List;
+
+@Table("eagleSparkRunningJobs")
+@ColumnFamily("f")
+@Prefix("sparkJob")
+@Service(Constants.RUNNING_SPARK_JOB_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", 
"jobId","user", "queue"})
+@Partition({"site"})
+public class SparkJobEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private long  submissionTime;
+    @Column("b")
+    private long completionTime;
+    @Column("c")
+    private int numStages = 0;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private int numTask = 0;
+    @Column("f")
+    private int numActiveTasks = 0;
+    @Column("g")
+    private int numCompletedTasks = 0;
+    @Column("h")
+    private int numSkippedTasks = 0;
+    @Column("i")
+    private int numFailedTasks = 0;
+    @Column("j")
+    private int numActiveStages = 0;
+    @Column("k")
+    private int numCompletedStages = 0;
+    @Column("l")
+    private int numSkippedStages = 0;
+    @Column("m")
+    private int numFailedStages = 0;
+    @Column("n")
+    private List<Integer> stages;
+
+    public List<Integer> getStages() {
+        return stages;
+    }
+
+    public void setStages(List<Integer> stages) {
+        this.stages = stages;
+        this.valueChanged("stages");
+    }
+
+    public long getSubmissionTime() {
+        return submissionTime;
+    }
+
+    public long getCompletionTime() {
+        return completionTime;
+    }
+
+    public int getNumStages() {
+        return numStages;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumTask() {
+        return numTask;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumSkippedTasks() {
+        return numSkippedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public int getNumActiveStages() {
+        return numActiveStages;
+    }
+
+    public int getNumCompletedStages() {
+        return numCompletedStages;
+    }
+
+    public int getNumSkippedStages() {
+        return numSkippedStages;
+    }
+
+    public int getNumFailedStages() {
+        return numFailedStages;
+    }
+
+    public void setSubmissionTime(long submissionTime) {
+        this.submissionTime = submissionTime;
+        this.valueChanged("submissionTime");
+    }
+
+    public void setCompletionTime(long completionTime) {
+        this.completionTime = completionTime;
+        this.valueChanged("completionTime");
+    }
+
+    public void setNumStages(int numStages) {
+        this.numStages = numStages;
+        this.valueChanged("numStages");
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumTask(int numTask) {
+        this.numTask = numTask;
+        this.valueChanged("numTask");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumSkippedTasks(int numSkippedTasks) {
+        this.numSkippedTasks = numSkippedTasks;
+        this.valueChanged("numSkippedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setNumActiveStages(int numActiveStages) {
+        this.numActiveStages = numActiveStages;
+        this.valueChanged("numActiveStages");
+    }
+
+    public void setNumCompletedStages(int numCompletedStages) {
+        this.numCompletedStages = numCompletedStages;
+        this.valueChanged("numCompletedStages");
+    }
+
+    public void setNumSkippedStages(int numSkippedStages) {
+        this.numSkippedStages = numSkippedStages;
+        this.valueChanged("numSkippedStages");
+    }
+
+    public void setNumFailedStages(int numFailedStages) {
+        this.numFailedStages = numFailedStages;
+        this.valueChanged("numFailedStages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
new file mode 100644
index 0000000..0194132
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkStageEntity.java
@@ -0,0 +1,299 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningStages")
+@ColumnFamily("f")
+@Prefix("sparkStage")
+@Service(Constants.RUNNING_SPARK_STAGE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", 
"stageId","stageAttemptId","user", "queue"})
+@Partition({"site"})
+public class SparkStageEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String status;
+    @Column("b")
+    private int numActiveTasks = 0;
+    @Column("c")
+    private int numCompletedTasks = 0;
+    @Column("d")
+    private int numFailedTasks = 0;
+    @Column("e")
+    private long executorRunTime = 0L;
+    @Column("f")
+    private long inputBytes = 0L;
+    @Column("g")
+    private long inputRecords = 0L;
+    @Column("h")
+    private long outputBytes = 0L;
+    @Column("i")
+    private long outputRecords = 0L;
+    @Column("j")
+    private long shuffleReadBytes = 0L;
+    @Column("k")
+    private long shuffleReadRecords = 0L;
+    @Column("l")
+    private long shuffleWriteBytes = 0L;
+    @Column("m")
+    private long shuffleWriteRecords = 0L;
+    @Column("n")
+    private long memoryBytesSpilled = 0L;
+    @Column("o")
+    private long diskBytesSpilled = 0L;
+    @Column("p")
+    private String name;
+    @Column("q")
+    private String schedulingPool;
+    @Column("r")
+    private long submitTime;
+    @Column("s")
+    private long completeTime;
+    @Column("t")
+    private int numTasks;
+    @Column("u")
+    private long executorDeserializeTime;
+    @Column("v")
+    private long resultSize;
+    @Column("w")
+    private long jvmGcTime;
+    @Column("x")
+    private long resultSerializationTime;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public int getNumActiveTasks() {
+        return numActiveTasks;
+    }
+
+    public int getNumCompletedTasks() {
+        return numCompletedTasks;
+    }
+
+    public int getNumFailedTasks() {
+        return numFailedTasks;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadBytes() {
+        return shuffleReadBytes;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getSchedulingPool() {
+        return schedulingPool;
+    }
+
+    public long getSubmitTime() {
+        return submitTime;
+    }
+
+    public long getCompleteTime() {
+        return completeTime;
+    }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        this.valueChanged("status");
+    }
+
+    public void setNumActiveTasks(int numActiveTasks) {
+        this.numActiveTasks = numActiveTasks;
+        this.valueChanged("numActiveTasks");
+    }
+
+    public void setNumCompletedTasks(int numCompletedTasks) {
+        this.numCompletedTasks = numCompletedTasks;
+        this.valueChanged("numCompletedTasks");
+    }
+
+    public void setNumFailedTasks(int numFailedTasks) {
+        this.numFailedTasks = numFailedTasks;
+        this.valueChanged("numFailedTasks");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+    public void setShuffleReadBytes(long shuffleReadBytes) {
+        this.shuffleReadBytes = shuffleReadBytes;
+        this.valueChanged("shuffleReadBytes");
+    }
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setName(String name) {
+        this.name = name;
+        this.valueChanged("name");
+    }
+
+    public void setSchedulingPool(String schedulingPool) {
+        this.schedulingPool = schedulingPool;
+        this.valueChanged("schedulingPool");
+    }
+
+    public void setSubmitTime(long submitTime) {
+        this.submitTime = submitTime;
+        this.valueChanged("submitTime");
+    }
+
+    public void setCompleteTime(long completeTime) {
+        this.completeTime = completeTime;
+        this.valueChanged("completeTime");
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+        valueChanged("numTasks");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        valueChanged("executorDeserializeTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        valueChanged("resultSerializationTime");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
new file mode 100644
index 0000000..6522c3c
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/SparkTaskEntity.java
@@ -0,0 +1,290 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+
+@Table("eagleSparkRunningTasks")
+@ColumnFamily("f")
+@Prefix("sparkTask")
+@Service(Constants.RUNNING_SPARK_TASK_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site","sparkAppId", "sparkAppAttemptId", "sparkAppName", "jobId", 
"jobName", "stageId","stageAttemptId","taskIndex","taskAttemptId","user", 
"queue"})
+@Partition({"site"})
+public class SparkTaskEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private int taskId;
+    @Column("b")
+    private long launchTime;
+    @Column("c")
+    private String executorId;
+    @Column("d")
+    private String host;
+    @Column("e")
+    private String taskLocality;
+    @Column("f")
+    private boolean speculative;
+    @Column("g")
+    private long executorDeserializeTime;
+    @Column("h")
+    private long executorRunTime;
+    @Column("i")
+    private long resultSize;
+    @Column("j")
+    private long jvmGcTime;
+    @Column("k")
+    private long resultSerializationTime;
+    @Column("l")
+    private long memoryBytesSpilled;
+    @Column("m")
+    private long diskBytesSpilled;
+    @Column("n")
+    private long inputBytes;
+    @Column("o")
+    private long inputRecords;
+    @Column("p")
+    private long outputBytes;
+    @Column("q")
+    private long outputRecords;
+    @Column("r")
+    private long shuffleReadRemoteBytes;
+    @Column("x")
+    private long shuffleReadLocalBytes;
+    @Column("s")
+    private long shuffleReadRecords;
+    @Column("t")
+    private long shuffleWriteBytes;
+    @Column("u")
+    private long shuffleWriteRecords;
+    @Column("v")
+    private boolean failed;
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public long getLaunchTime() {
+        return launchTime;
+    }
+
+    public String getExecutorId() {
+        return executorId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public String getTaskLocality() {
+        return taskLocality;
+    }
+
+    public boolean isSpeculative() {
+        return speculative;
+    }
+
+    public long getExecutorDeserializeTime() {
+        return executorDeserializeTime;
+    }
+
+    public long getExecutorRunTime() {
+        return executorRunTime;
+    }
+
+    public long getResultSize() {
+        return resultSize;
+    }
+
+    public long getJvmGcTime() {
+        return jvmGcTime;
+    }
+
+    public long getResultSerializationTime() {
+        return resultSerializationTime;
+    }
+
+    public long getMemoryBytesSpilled() {
+        return memoryBytesSpilled;
+    }
+
+    public long getDiskBytesSpilled() {
+        return diskBytesSpilled;
+    }
+
+    public long getInputBytes() {
+        return inputBytes;
+    }
+
+    public long getInputRecords() {
+        return inputRecords;
+    }
+
+    public long getOutputBytes() {
+        return outputBytes;
+    }
+
+    public long getOutputRecords() {
+        return outputRecords;
+    }
+
+    public long getShuffleReadRecords() {
+        return shuffleReadRecords;
+    }
+
+    public long getShuffleWriteBytes() {
+        return shuffleWriteBytes;
+    }
+
+    public long getShuffleWriteRecords() {
+        return shuffleWriteRecords;
+    }
+
+    public boolean isFailed() {
+        return failed;
+    }
+
+    public long getShuffleReadRemoteBytes() {
+        return shuffleReadRemoteBytes;
+    }
+
+    public long getShuffleReadLocalBytes() {
+        return shuffleReadLocalBytes;
+    }
+
+    public void setFailed(boolean failed) {
+        this.failed = failed;
+        valueChanged("failed");
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+        valueChanged("taskId");
+    }
+
+    public void setLaunchTime(long launchTime) {
+        this.launchTime = launchTime;
+        valueChanged("launchTime");
+    }
+
+    public void setExecutorId(String executorId) {
+        this.executorId = executorId;
+        valueChanged("executorId");
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+        this.valueChanged("host");
+    }
+
+    public void setTaskLocality(String taskLocality) {
+        this.taskLocality = taskLocality;
+        this.valueChanged("taskLocality");
+    }
+
+    public void setSpeculative(boolean speculative) {
+        this.speculative = speculative;
+        this.valueChanged("speculative");
+    }
+
+    public void setExecutorDeserializeTime(long executorDeserializeTime) {
+        this.executorDeserializeTime = executorDeserializeTime;
+        this.valueChanged("executorDeserializeTime");
+    }
+
+    public void setExecutorRunTime(long executorRunTime) {
+        this.executorRunTime = executorRunTime;
+        this.valueChanged("executorRunTime");
+    }
+
+    public void setResultSize(long resultSize) {
+        this.resultSize = resultSize;
+        this.valueChanged("resultSize");
+    }
+
+    public void setJvmGcTime(long jvmGcTime) {
+        this.jvmGcTime = jvmGcTime;
+        this.valueChanged("jvmGcTime");
+    }
+
+    public void setResultSerializationTime(long resultSerializationTime) {
+        this.resultSerializationTime = resultSerializationTime;
+        this.valueChanged("resultSerializationTime");
+    }
+
+    public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+        this.memoryBytesSpilled = memoryBytesSpilled;
+        this.valueChanged("memoryBytesSpilled");
+    }
+
+    public void setDiskBytesSpilled(long diskBytesSpilled) {
+        this.diskBytesSpilled = diskBytesSpilled;
+        this.valueChanged("diskBytesSpilled");
+    }
+
+    public void setInputBytes(long inputBytes) {
+        this.inputBytes = inputBytes;
+        this.valueChanged("inputBytes");
+    }
+
+    public void setInputRecords(long inputRecords) {
+        this.inputRecords = inputRecords;
+        this.valueChanged("inputRecords");
+    }
+
+    public void setOutputBytes(long outputBytes) {
+        this.outputBytes = outputBytes;
+        this.valueChanged("outputBytes");
+    }
+
+    public void setOutputRecords(long outputRecords) {
+        this.outputRecords = outputRecords;
+        this.valueChanged("outputRecords");
+    }
+
+
+
+    public void setShuffleReadRecords(long shuffleReadRecords) {
+        this.shuffleReadRecords = shuffleReadRecords;
+        this.valueChanged("shuffleReadRecords");
+    }
+
+    public void setShuffleWriteBytes(long shuffleWriteBytes) {
+        this.shuffleWriteBytes = shuffleWriteBytes;
+        this.valueChanged("shuffleWriteBytes");
+    }
+
+    public void setShuffleWriteRecords(long shuffleWriteRecords) {
+        this.shuffleWriteRecords = shuffleWriteRecords;
+        this.valueChanged("shuffleWriteRecords");
+    }
+
+    public void setShuffleReadRemoteBytes(long shuffleReadRemoteBytes) {
+        this.shuffleReadRemoteBytes = shuffleReadRemoteBytes;
+        this.valueChanged("shuffleReadRemoteBytes");
+    }
+
+    public void setShuffleReadLocalBytes(long shuffleReadLocalBytes) {
+        this.shuffleReadLocalBytes = shuffleReadLocalBytes;
+        this.valueChanged("shuffleReadLocalBytes");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 0fc74d7..284eeee 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -82,8 +82,6 @@ public class SparkHistoryJobAppConfig implements Serializable 
{
         this.eagleInfo.host = 
config.getString("eagleProps.eagle.service.host");
         this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
 
-        this.stormConfig.topologyName = config.getString("storm.name");
-        this.stormConfig.workerNo = config.getInt("storm.worker.num");
         this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
         this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
         this.stormConfig.spoutCrawlInterval = 
config.getInt("storm.spoutCrawlInterval");
@@ -117,9 +115,7 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
     }
 
     public static class StormConfig implements Serializable {
-        public int workerNo;
         public int timeoutSec;
-        public String topologyName;
         public int spoutPending;
         public int spoutCrawlInterval;
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
new file mode 100644
index 0000000..b73b52e
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFInputStreamReader.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFInputStreamReader {
+    void read(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
new file mode 100644
index 0000000..047e2d5
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFParserBase.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import java.io.InputStream;
+
+public interface JHFParserBase {
+    /**
+     * this method will ensure to close the inputStream.
+     * @param is
+     * @throws Exception
+     */
+    void parse(InputStream is) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
new file mode 100644
index 0000000..571620a
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.eagle.jpm.spark.entity.*;
+import org.apache.eagle.jpm.util.*;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.service.client.EagleServiceClientException;
+import org.apache.eagle.service.client.impl.EagleServiceBaseClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+public class JHFSparkEventReader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JHFSparkEventReader.class);
+
+    private static final int FLUSH_LIMIT = 500;
+    private long firstTaskLaunchTime;
+    private long lastEventTime;
+
+    private Map<String, SparkExecutor> executors;
+    private SparkApp app;
+    private Map<Integer, SparkJob> jobs;
+    private Map<String, SparkStage> stages;
+    private Map<Integer, Set<String>> jobStageMap;
+    private Map<Long, SparkTask> tasks;
+    private EagleServiceClientImpl client;
+    private Map<String, Map<Integer, Boolean>> stageTaskStatusMap;
+
+    private List<TaggedLogAPIEntity> createEntities;
+
+    private Config conf;
+
+    public JHFSparkEventReader(Map<String, String> baseTags, 
SparkApplicationInfo info) {
+        app = new SparkApp();
+        app.setTags(new HashMap<String, String>(baseTags));
+        app.setYarnState(info.getState());
+        app.setYarnStatus(info.getFinalStatus());
+        createEntities = new ArrayList<>();
+        jobs = new HashMap<Integer, SparkJob>();
+        stages = new HashMap<String, SparkStage>();
+        jobStageMap = new HashMap<Integer, Set<String>>();
+        tasks = new HashMap<Long, SparkTask>();
+        executors = new HashMap<String, SparkExecutor>();
+        stageTaskStatusMap = new HashMap<>();
+        conf = ConfigFactory.load();
+        this.initiateClient();
+    }
+
+    public SparkApp getApp() {
+        return this.app;
+    }
+
+    public void read(JSONObject eventObj) {
+        String eventType = (String) eventObj.get("Event");
+        if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationStart.toString()))
 {
+            handleAppStarted(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerEnvironmentUpdate.toString()))
 {
+            handleEnvironmentSet(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorAdded.toString()))
 {
+            handleExecutorAdd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerAdded.toString()))
 {
+            handleBlockManagerAdd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobStart.toString())) {
+            handleJobStart(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageSubmitted.toString()))
 {
+            handleStageSubmit(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskStart.toString())) {
+            handleTaskStart(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerTaskEnd.toString())) {
+            handleTaskEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerStageCompleted.toString()))
 {
+            handleStageComplete(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerJobEnd.toString())) {
+            handleJobEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerExecutorRemoved.toString()))
 {
+            handleExecutorRemoved(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerApplicationEnd.toString()))
 {
+            handleAppEnd(eventObj);
+        } else if 
(eventType.equalsIgnoreCase(SparkEventType.SparkListenerBlockManagerRemoved.toString()))
 {
+            //nothing to do now
+        } else {
+            LOG.info("Not registered event type:" + eventType);
+        }
+
+    }
+
+    private void handleEnvironmentSet(JSONObject event) {
+        app.setConfig(new JobConfig());
+        JSONObject sparkProps = (JSONObject) event.get("Spark Properties");
+
+        String[] additionalJobConf = 
conf.getString("basic.jobConf.additional.info").split(",\\s*");
+        String[] props = {"spark.yarn.app.id", "spark.executor.memory", 
"spark.driver.host", "spark.driver.port",
+            "spark.driver.memory", "spark.scheduler.pool", 
"spark.executor.cores", "spark.yarn.am.memory",
+            "spark.yarn.am.cores", "spark.yarn.executor.memoryOverhead", 
"spark.yarn.driver.memoryOverhead", "spark.yarn.am.memoryOverhead", 
"spark.master"};
+        String[] jobConf = (String[])ArrayUtils.addAll(additionalJobConf, 
props);
+        for (String prop : jobConf) {
+            if (sparkProps.containsKey(prop)) {
+                app.getConfig().getConfig().put(prop, (String) 
sparkProps.get(prop));
+            }
+        }
+    }
+
+    private Object getConfigVal(JobConfig config, String configName, String 
type) {
+        if (config.getConfig().containsKey(configName)) {
+            Object val = config.getConfig().get(configName);
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return Integer.parseInt((String) val);
+            } else {
+                return val;
+            }
+        } else {
+            if (type.equalsIgnoreCase(Integer.class.getName())) {
+                return conf.getInt("spark.defaultVal." + configName);
+            } else {
+                return conf.getString("spark.defaultVal." + configName);
+            }
+        }
+    }
+
+    private boolean isClientMode(JobConfig config) {
+        return 
config.getConfig().get("spark.master").equalsIgnoreCase("yarn-client");
+    }
+
+    private void handleAppStarted(JSONObject event) {
+        //need update all entities tag before app start
+        List<TaggedLogAPIEntity> entities = new 
ArrayList<TaggedLogAPIEntity>();
+        entities.addAll(this.executors.values());
+        entities.add(this.app);
+
+        long appStartTime = JSONUtils.getLong(event, "Timestamp", 
lastEventTime);
+        for (TaggedLogAPIEntity entity : entities) {
+            entity.getTags().put(SparkJobTagName.SPARK_APP_ID.toString(), 
JSONUtils.getString(event, "App ID"));
+            entity.getTags().put(SparkJobTagName.SPARK_APP_NAME.toString(), 
JSONUtils.getString(event, "App Name"));
+            // In yarn-client mode, attemptId is not available in the log, so 
we set attemptId = 1.
+            String attemptId = isClientMode(this.app.getConfig()) ? "1" : 
JSONUtils.getString(event, "App Attempt ID");
+            
entity.getTags().put(SparkJobTagName.SPARK_APP_ATTEMPT_ID.toString(), 
attemptId);
+            // the second argument of getNormalizeName() is changed to null 
because the original code contains sensitive text
+            // original second argument looks like: 
this.app.getConfig().getConfig().get("xxx"), "xxx" is the sensitive text
+            
entity.getTags().put(SparkJobTagName.SPARK_APP_NORM_NAME.toString(), 
this.getNormalizedName(JSONUtils.getString(event, "App Name"), null));
+            entity.getTags().put(SparkJobTagName.SPARK_USER.toString(), 
JSONUtils.getString(event, "User"));
+
+            entity.setTimestamp(appStartTime);
+        }
+
+        this.app.setStartTime(appStartTime);
+        this.lastEventTime = appStartTime;
+    }
+
+    private void handleExecutorAdd(JSONObject event) {
+        String executorID = (String) event.get("Executor ID");
+        long executorAddTime = JSONUtils.getLong(event, "Timestamp", 
lastEventTime);
+        this.lastEventTime = executorAddTime;
+        SparkExecutor executor = this.initiateExecutor(executorID, 
executorAddTime);
+
+        JSONObject executorInfo = JSONUtils.getJSONObject(event, "Executor 
Info");
+
+    }
+
+    private void handleBlockManagerAdd(JSONObject event) {
+        long maxMemory = JSONUtils.getLong(event, "Maximum Memory");
+        long timestamp = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        this.lastEventTime = timestamp;
+        JSONObject blockInfo = JSONUtils.getJSONObject(event, "Block Manager 
ID");
+        String executorID = JSONUtils.getString(blockInfo, "Executor ID");
+        String hostAndPort = JSONUtils.getString(blockInfo, "Host") + ":" + 
JSONUtils.getLong(blockInfo, "Port");
+
+        SparkExecutor executor = this.initiateExecutor(executorID, timestamp);
+        executor.setMaxMemory(maxMemory);
+        executor.setHostPort(hostAndPort);
+    }
+
+    private void handleTaskStart(JSONObject event) {
+        this.initializeTask(event);
+    }
+
+    private void handleTaskEnd(JSONObject event) {
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+        SparkTask task = tasks.get(taskId);
+        if (task == null) {
+            return;
+        }
+
+        task.setFailed(JSONUtils.getBoolean(taskInfo, "Failed"));
+        JSONObject taskMetrics = JSONUtils.getJSONObject(event, "Task 
Metrics");
+        if (null != taskMetrics) {
+            task.setExecutorDeserializeTime(JSONUtils.getLong(taskMetrics, 
"Executor Deserialize Time", lastEventTime));
+            task.setExecutorRunTime(JSONUtils.getLong(taskMetrics, "Executor 
Run Time", lastEventTime));
+            task.setJvmGcTime(JSONUtils.getLong(taskMetrics, "JVM GC Time", 
lastEventTime));
+            task.setResultSize(JSONUtils.getLong(taskMetrics, "Result Size"));
+            task.setResultSerializationTime(JSONUtils.getLong(taskMetrics, 
"Result Serialization Time", lastEventTime));
+            task.setMemoryBytesSpilled(JSONUtils.getLong(taskMetrics, "Memory 
Bytes Spilled"));
+            task.setDiskBytesSpilled(JSONUtils.getLong(taskMetrics, "Disk 
Bytes Spilled"));
+
+            JSONObject inputMetrics = JSONUtils.getJSONObject(taskMetrics, 
"Input Metrics");
+            if (null != inputMetrics) {
+                task.setInputBytes(JSONUtils.getLong(inputMetrics, "Bytes 
Read"));
+                task.setInputRecords(JSONUtils.getLong(inputMetrics, "Records 
Read"));
+            }
+
+            JSONObject outputMetrics = JSONUtils.getJSONObject(taskMetrics, 
"Output Metrics");
+            if (null != outputMetrics) {
+                task.setOutputBytes(JSONUtils.getLong(outputMetrics, "Bytes 
Written"));
+                task.setOutputRecords(JSONUtils.getLong(outputMetrics, 
"Records Written"));
+            }
+
+            JSONObject shuffleWriteMetrics = 
JSONUtils.getJSONObject(taskMetrics, "Shuffle Write Metrics");
+            if (null != shuffleWriteMetrics) {
+                
task.setShuffleWriteBytes(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle Bytes 
Written"));
+                
task.setShuffleWriteRecords(JSONUtils.getLong(shuffleWriteMetrics, "Shuffle 
Records Written"));
+            }
+
+            JSONObject shuffleReadMetrics = 
JSONUtils.getJSONObject(taskMetrics, "Shuffle Read Metrics");
+            if (null != shuffleReadMetrics) {
+                
task.setShuffleReadLocalBytes(JSONUtils.getLong(shuffleReadMetrics, "Local 
Bytes Read"));
+                
task.setShuffleReadRemoteBytes(JSONUtils.getLong(shuffleReadMetrics, "Remote 
Bytes Read"));
+                
task.setShuffleReadRecords(JSONUtils.getLong(shuffleReadMetrics, "Total Records 
Read"));
+            }
+        } else {
+            //for tasks success without task metrics, save in the end if no 
other information
+            if (!task.isFailed()) {
+                return;
+            }
+        }
+
+        aggregateToStage(task);
+        aggregateToExecutor(task);
+        tasks.remove(taskId);
+        this.flushEntities(task, false);
+    }
+
+
+    private SparkTask initializeTask(JSONObject event) {
+        SparkTask task = new SparkTask();
+        task.setTags(new HashMap<>(this.app.getTags()));
+        task.setTimestamp(app.getTimestamp());
+
+        task.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
Long.toString(JSONUtils.getLong(event, "Stage ID")));
+        task.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
Long.toString(JSONUtils.getLong(event, "Stage Attempt ID")));
+
+        JSONObject taskInfo = JSONUtils.getJSONObject(event, "Task Info");
+        long taskId = JSONUtils.getLong(taskInfo, "Task ID");
+        task.setTaskId(taskId);
+
+        task.getTags().put(SparkJobTagName.SPARK_TASK_INDEX.toString(), 
Long.toString(JSONUtils.getLong(taskInfo, "Index")));
+        task.getTags().put(SparkJobTagName.SPARK_TASK_ATTEMPT_ID.toString(), 
Integer.toString(JSONUtils.getInt(taskInfo, "Attempt")));
+        long launchTime = JSONUtils.getLong(taskInfo, "Launch Time", 
lastEventTime);
+        this.lastEventTime = launchTime;
+        if (taskId == 0) {
+            this.setFirstTaskLaunchTime(launchTime);
+        }
+        task.setLaunchTime(launchTime);
+        task.setExecutorId(JSONUtils.getString(taskInfo, "Executor ID"));
+        task.setHost(JSONUtils.getString(taskInfo, "Host"));
+        task.setTaskLocality(JSONUtils.getString(taskInfo, "Locality"));
+        task.setSpeculative(JSONUtils.getBoolean(taskInfo, "Speculative"));
+
+        tasks.put(task.getTaskId(), task);
+        return task;
+    }
+
+    private void setFirstTaskLaunchTime(long launchTime) {
+        this.firstTaskLaunchTime = launchTime;
+    }
+
+    private void handleJobStart(JSONObject event) {
+        SparkJob job = new SparkJob();
+        job.setTags(new HashMap<>(this.app.getTags()));
+        job.setTimestamp(app.getTimestamp());
+
+        int jobId = JSONUtils.getInt(event, "Job ID");
+        job.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
Integer.toString(jobId));
+        long submissionTime = JSONUtils.getLong(event, "Submission Time", 
lastEventTime);
+        job.setSubmissionTime(submissionTime);
+        this.lastEventTime = submissionTime;
+
+        //for complete application, no active stages/tasks
+        job.setNumActiveStages(0);
+        job.setNumActiveTasks(0);
+
+        this.jobs.put(jobId, job);
+        this.jobStageMap.put(jobId, new HashSet<String>());
+
+        JSONArray stages = JSONUtils.getJSONArray(event, "Stage Infos");
+        int stagesSize = (stages == null ? 0 : stages.size());
+        job.setNumStages(stagesSize);
+        for (int i = 0; i < stagesSize; i++) {
+            JSONObject stageInfo = (JSONObject) stages.get(i);
+            int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+            int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt 
ID");
+            String stageName = JSONUtils.getString(stageInfo, "Stage Name");
+            int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+            this.initiateStage(jobId, stageId, stageAttemptId, stageName, 
numTasks);
+        }
+    }
+
+    private void handleStageSubmit(JSONObject event) {
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), 
Integer.toString(stageAttemptId));
+        stageTaskStatusMap.put(key, new HashMap<Integer, Boolean>());
+
+        if (!stages.containsKey(key)) {
+            //may be further attempt for one stage
+            String baseAttempt = 
this.generateStageKey(Integer.toString(stageId), "0");
+            if (stages.containsKey(baseAttempt)) {
+                SparkStage stage = stages.get(baseAttempt);
+                String jobId = 
stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString());
+
+                String stageName = JSONUtils.getString(event, "Stage Name");
+                int numTasks = JSONUtils.getInt(stageInfo, "Number of Tasks");
+                this.initiateStage(Integer.parseInt(jobId), stageId, 
stageAttemptId, stageName, numTasks);
+            }
+        }
+    }
+
+    private void handleStageComplete(JSONObject event) {
+        JSONObject stageInfo = JSONUtils.getJSONObject(event, "Stage Info");
+        int stageId = JSONUtils.getInt(stageInfo, "Stage ID");
+        int stageAttemptId = JSONUtils.getInt(stageInfo, "Stage Attempt ID");
+        String key = this.generateStageKey(Integer.toString(stageId), 
Integer.toString(stageAttemptId));
+        SparkStage stage = stages.get(key);
+
+        // If "Submission Time" is not available, use the "Launch Time" of 
"Task ID" = 0.
+        Long submissionTime = JSONUtils.getLong(stageInfo, "Submission Time", 
firstTaskLaunchTime);
+
+        stage.setSubmitTime(submissionTime);
+
+        long completeTime = JSONUtils.getLong(stageInfo, "Completion Time", 
lastEventTime);
+        stage.setCompleteTime(completeTime);
+        this.lastEventTime = completeTime;
+
+        if (stageInfo != null && stageInfo.containsKey("Failure Reason")) {
+            
stage.setStatus(SparkEntityConstant.SparkStageStatus.FAILED.toString());
+        } else {
+            
stage.setStatus(SparkEntityConstant.SparkStageStatus.COMPLETE.toString());
+        }
+    }
+
+    private void handleExecutorRemoved(JSONObject event) {
+        String executorID = JSONUtils.getString(event, "Executor ID");
+        SparkExecutor executor = executors.get(executorID);
+        long removedTime = JSONUtils.getLong(event, "Timestamp", 
lastEventTime);
+        executor.setEndTime(removedTime);
+        this.lastEventTime = removedTime;
+    }
+
+    private void handleJobEnd(JSONObject event) {
+        int jobId = JSONUtils.getInt(event, "Job ID");
+        SparkJob job = jobs.get(jobId);
+
+        long completionTime = JSONUtils.getLong(event, "Completion Time", 
lastEventTime);
+        job.setCompletionTime(completionTime);
+        this.lastEventTime = completionTime;
+
+        JSONObject jobResult = JSONUtils.getJSONObject(event, "Job Result");
+        String result = JSONUtils.getString(jobResult, "Result");
+        if (result.equalsIgnoreCase("JobSucceeded")) {
+            
job.setStatus(SparkEntityConstant.SparkJobStatus.SUCCEEDED.toString());
+        } else {
+            
job.setStatus(SparkEntityConstant.SparkJobStatus.FAILED.toString());
+        }
+    }
+
+    private void handleAppEnd(JSONObject event) {
+        long endTime = JSONUtils.getLong(event, "Timestamp", lastEventTime);
+        app.setEndTime(endTime);
+        this.lastEventTime = endTime;
+    }
+
+    public void clearReader() throws Exception {
+        //clear tasks
+        for (SparkTask task : tasks.values()) {
+            LOG.info("Task {} does not have result or no task metrics.", 
task.getTaskId());
+            task.setFailed(true);
+            aggregateToStage(task);
+            aggregateToExecutor(task);
+            this.flushEntities(task, false);
+        }
+
+        List<SparkStage> needStoreStages = new ArrayList<>();
+        for (SparkStage stage : this.stages.values()) {
+            int jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+            if (stage.getSubmitTime() == 0 || stage.getCompleteTime() == 0) {
+                SparkJob job = this.jobs.get(jobId);
+                job.setNumSkippedStages(job.getNumSkippedStages() + 1);
+                job.setNumSkippedTasks(job.getNumSkippedTasks() + 
stage.getNumTasks());
+            } else {
+                this.aggregateToJob(stage);
+                this.aggregateStageToApp(stage);
+                needStoreStages.add(stage);
+            }
+            String stageId = 
stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+            String stageAttemptId = 
stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+            this.jobStageMap.get(jobId).remove(this.generateStageKey(stageId, 
stageAttemptId));
+        }
+
+        this.flushEntities(needStoreStages, false);
+        for (SparkJob job : jobs.values()) {
+            this.aggregateJobToApp(job);
+        }
+        this.flushEntities(jobs.values(), false);
+
+        app.setExecutors(executors.values().size());
+
+        long executorMemory = Utils.parseMemory((String) 
this.getConfigVal(this.app.getConfig(), "spark.executor.memory", 
String.class.getName()));
+        long driverMemory = 
Utils.parseMemory(this.isClientMode(app.getConfig())
+            ? (String) this.getConfigVal(this.app.getConfig(), 
"spark.yarn.am.memory", String.class.getName())
+            : (String) this.getConfigVal(app.getConfig(), 
"spark.driver.memory", String.class.getName()));
+
+        int executorCore = (Integer) this.getConfigVal(app.getConfig(), 
"spark.executor.cores", Integer.class.getName());
+        int driverCore = this.isClientMode(app.getConfig())
+            ? (Integer) this.getConfigVal(app.getConfig(), 
"spark.yarn.am.cores", Integer.class.getName())
+            : (Integer) this.getConfigVal(app.getConfig(), 
"spark.driver.cores", Integer.class.getName());
+
+        long executorMemoryOverhead = this.getMemoryOverhead(app.getConfig(), 
executorMemory, "spark.yarn.executor.memoryOverhead");
+        long driverMemoryOverhead = this.isClientMode(app.getConfig())
+            ? this.getMemoryOverhead(app.getConfig(), driverMemory, 
"spark.yarn.am.memoryOverhead")
+            : this.getMemoryOverhead(app.getConfig(), driverMemory, 
"spark.yarn.driver.memoryOverhead");
+
+        app.setExecMemoryBytes(executorMemory);
+        app.setDriveMemoryBytes(driverMemory);
+        app.setExecutorCores(executorCore);
+        app.setDriverCores(driverCore);
+        app.setExecutorMemoryOverhead(executorMemoryOverhead);
+        app.setDriverMemoryOverhead(driverMemoryOverhead);
+
+        for (SparkExecutor executor : executors.values()) {
+            String executorID = 
executor.getTags().get(SparkJobTagName.SPARK_EXECUTOR_ID.toString());
+            if (executorID.equalsIgnoreCase("driver")) {
+                executor.setExecMemoryBytes(driverMemory);
+                executor.setCores(driverCore);
+                executor.setMemoryOverhead(driverMemoryOverhead);
+            } else {
+                executor.setExecMemoryBytes(executorMemory);
+                executor.setCores(executorCore);
+                executor.setMemoryOverhead(executorMemoryOverhead);
+            }
+            if (app.getEndTime() <= 0L) {
+                app.setEndTime(this.lastEventTime);
+            }
+            if (executor.getEndTime() <= 0L) {
+                executor.setEndTime(app.getEndTime());
+            }
+            this.aggregateExecutorToApp(executor);
+        }
+        this.flushEntities(executors.values(), false);
+        //spark code...tricky
+        app.setSkippedTasks(app.getCompleteTasks());
+        this.flushEntities(app, true);
+    }
+
+    private long getMemoryOverhead(JobConfig config, long executorMemory, 
String fieldName) {
+        long result = 0L;
+        String fieldValue = config.getConfig().get(fieldName);
+        if (fieldValue != null) {
+            result = Utils.parseMemory(fieldValue + "m");
+            if (result == 0L) {
+               result = Utils.parseMemory(fieldValue);
+            }
+        }
+
+        if (result == 0L) {
+            result = Math.max(
+                    
Utils.parseMemory(conf.getString("spark.defaultVal.spark.yarn.overhead.min")),
+                    executorMemory * conf.getInt("spark.defaultVal." + 
fieldName + ".factor") / 100);
+        }
+        return result;
+    }
+
+    private void aggregateExecutorToApp(SparkExecutor executor) {
+        long totalExecutorTime = app.getTotalExecutorTime() + 
executor.getEndTime() - executor.getStartTime();
+        if (totalExecutorTime < 0L) {
+            totalExecutorTime = 0L;
+        }
+        app.setTotalExecutorTime(totalExecutorTime);
+    }
+
+    private void aggregateJobToApp(SparkJob job) {
+        //aggregate job level metrics
+        app.setNumJobs(app.getNumJobs() + 1);
+        app.setTotalTasks(app.getTotalTasks() + job.getNumTask());
+        app.setCompleteTasks(app.getCompleteTasks() + 
job.getNumCompletedTasks());
+        app.setSkippedTasks(app.getSkippedTasks() + job.getNumSkippedTasks());
+        app.setFailedTasks(app.getFailedTasks() + job.getNumFailedTasks());
+        app.setTotalStages(app.getTotalStages() + job.getNumStages());
+        app.setFailedStages(app.getFailedStages() + job.getNumFailedStages());
+        app.setSkippedStages(app.getSkippedStages() + 
job.getNumSkippedStages());
+    }
+
+    private void aggregateStageToApp(SparkStage stage) {
+        //aggregate task level metrics
+        app.setDiskBytesSpilled(app.getDiskBytesSpilled() + 
stage.getDiskBytesSpilled());
+        app.setMemoryBytesSpilled(app.getMemoryBytesSpilled() + 
stage.getMemoryBytesSpilled());
+        app.setExecutorRunTime(app.getExecutorRunTime() + 
stage.getExecutorRunTime());
+        app.setJvmGcTime(app.getJvmGcTime() + stage.getJvmGcTime());
+        app.setExecutorDeserializeTime(app.getExecutorDeserializeTime() + 
stage.getExecutorDeserializeTime());
+        app.setResultSerializationTime(app.getResultSerializationTime() + 
stage.getResultSerializationTime());
+        app.setResultSize(app.getResultSize() + stage.getResultSize());
+        app.setInputRecords(app.getInputRecords() + stage.getInputRecords());
+        app.setInputBytes(app.getInputBytes() + stage.getInputBytes());
+        app.setOutputRecords(app.getOutputRecords() + 
stage.getOutputRecords());
+        app.setOutputBytes(app.getOutputBytes() + stage.getOutputBytes());
+        app.setShuffleWriteRecords(app.getShuffleWriteRecords() + 
stage.getShuffleWriteRecords());
+        app.setShuffleWriteBytes(app.getShuffleWriteBytes() + 
stage.getShuffleWriteBytes());
+        app.setShuffleReadRecords(app.getShuffleReadRecords() + 
stage.getShuffleReadRecords());
+        app.setShuffleReadBytes(app.getShuffleReadBytes() + 
stage.getShuffleReadBytes());
+    }
+
+    private void aggregateToStage(SparkTask task) {
+        String stageId = 
task.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString());
+        String stageAttemptId = 
task.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString());
+        String key = this.generateStageKey(stageId, stageAttemptId);
+        SparkStage stage = stages.get(key);
+
+        stage.setDiskBytesSpilled(stage.getDiskBytesSpilled() + 
task.getDiskBytesSpilled());
+        stage.setMemoryBytesSpilled(stage.getMemoryBytesSpilled() + 
task.getMemoryBytesSpilled());
+        stage.setExecutorRunTime(stage.getExecutorRunTime() + 
task.getExecutorRunTime());
+        stage.setJvmGcTime(stage.getJvmGcTime() + task.getJvmGcTime());
+        stage.setExecutorDeserializeTime(stage.getExecutorDeserializeTime() + 
task.getExecutorDeserializeTime());
+        stage.setResultSerializationTime(stage.getResultSerializationTime() + 
task.getResultSerializationTime());
+        stage.setResultSize(stage.getResultSize() + task.getResultSize());
+        stage.setInputRecords(stage.getInputRecords() + 
task.getInputRecords());
+        stage.setInputBytes(stage.getInputBytes() + task.getInputBytes());
+        stage.setOutputRecords(stage.getOutputRecords() + 
task.getOutputRecords());
+        stage.setOutputBytes(stage.getOutputBytes() + task.getOutputBytes());
+        stage.setShuffleWriteRecords(stage.getShuffleWriteRecords() + 
task.getShuffleWriteRecords());
+        stage.setShuffleWriteBytes(stage.getShuffleWriteBytes() + 
task.getShuffleWriteBytes());
+        stage.setShuffleReadRecords(stage.getShuffleReadRecords() + 
task.getShuffleReadRecords());
+        long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
+        stage.setShuffleReadBytes(stage.getShuffleReadBytes() + 
taskShuffleReadBytes);
+
+        boolean success = !task.isFailed();
+
+        Integer taskIndex = 
Integer.parseInt(task.getTags().get(SparkJobTagName.SPARK_TASK_INDEX.toString()));
+        if (stageTaskStatusMap.get(key).containsKey(taskIndex)) {
+            //has previous task attempt, retrieved from task index in one stage
+            boolean previousResult = 
stageTaskStatusMap.get(key).get(taskIndex);
+            success = previousResult || success;
+            if (previousResult != success) {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() - 1);
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+                stageTaskStatusMap.get(key).put(taskIndex, success);
+            }
+        } else {
+            if (success) {
+                stage.setNumCompletedTasks(stage.getNumCompletedTasks() + 1);
+            } else {
+                stage.setNumFailedTasks(stage.getNumFailedTasks() + 1);
+            }
+            stageTaskStatusMap.get(key).put(taskIndex, success);
+        }
+
+    }
+
+    private void aggregateToExecutor(SparkTask task) {
+        String executorId = task.getExecutorId();
+        SparkExecutor executor = executors.get(executorId);
+
+        if (null != executor) {
+            executor.setTotalTasks(executor.getTotalTasks() + 1);
+            if (task.isFailed()) {
+                executor.setFailedTasks(executor.getFailedTasks() + 1);
+            } else {
+                executor.setCompletedTasks(executor.getCompletedTasks() + 1);
+            }
+            long taskShuffleReadBytes = task.getShuffleReadLocalBytes() + 
task.getShuffleReadRemoteBytes();
+            executor.setTotalShuffleRead(executor.getTotalShuffleRead() + 
taskShuffleReadBytes);
+            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
+            executor.setTotalInputBytes(executor.getTotalInputBytes() + 
task.getInputBytes());
+            executor.setTotalShuffleWrite(executor.getTotalShuffleWrite() + 
task.getShuffleWriteBytes());
+            executor.setTotalDuration(executor.getTotalDuration() + 
task.getExecutorRunTime());
+        }
+
+    }
+
+    private void aggregateToJob(SparkStage stage) {
+        int jobId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_JOB_ID.toString()));
+        SparkJob job = jobs.get(jobId);
+        job.setNumCompletedTasks(job.getNumCompletedTasks() + 
stage.getNumCompletedTasks());
+        job.setNumFailedTasks(job.getNumFailedTasks() + 
stage.getNumFailedTasks());
+        job.setNumTask(job.getNumTask() + stage.getNumTasks());
+
+
+        if 
(stage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString()))
 {
+            //if multiple attempts succeed, just count one
+            if (!hasStagePriorAttemptSuccess(stage)) {
+                job.setNumCompletedStages(job.getNumCompletedStages() + 1);
+            }
+        } else {
+            job.setNumFailedStages(job.getNumFailedStages() + 1);
+        }
+    }
+
+    private boolean hasStagePriorAttemptSuccess(SparkStage stage) {
+        int stageAttemptId = 
Integer.parseInt(stage.getTags().get(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString()));
+        for (int i = 0; i < stageAttemptId; i++) {
+            SparkStage previousStage = stages.get(this.generateStageKey(
+                    
stage.getTags().get(SparkJobTagName.SPARK_SATGE_ID.toString()), 
Integer.toString(i)));
+            if 
(previousStage.getStatus().equalsIgnoreCase(SparkEntityConstant.SparkStageStatus.COMPLETE.toString()))
 {
+                return true;
+            }
+        }
+        return false;
+    }
+
+
+    private String generateStageKey(String stageId, String stageAttemptId) {
+        return stageId + "-" + stageAttemptId;
+    }
+
+    private void initiateStage(int jobId, int stageId, int stageAttemptId, 
String name, int numTasks) {
+        SparkStage stage = new SparkStage();
+        stage.setTags(new HashMap<>(this.app.getTags()));
+        stage.setTimestamp(app.getTimestamp());
+        stage.getTags().put(SparkJobTagName.SPARK_JOB_ID.toString(), 
Integer.toString(jobId));
+        stage.getTags().put(SparkJobTagName.SPARK_SATGE_ID.toString(), 
Integer.toString(stageId));
+        stage.getTags().put(SparkJobTagName.SPARK_STAGE_ATTEMPT_ID.toString(), 
Integer.toString(stageAttemptId));
+        stage.setName(name);
+        stage.setNumActiveTasks(0);
+        stage.setNumTasks(numTasks);
+        
stage.setSchedulingPool(this.app.getConfig().getConfig().get("spark.scheduler.pool")
 == null ?
+                "default" : 
this.app.getConfig().getConfig().get("spark.scheduler.pool"));
+
+        String stageKey = this.generateStageKey(Integer.toString(stageId), 
Integer.toString(stageAttemptId));
+        stages.put(stageKey, stage);
+        this.jobStageMap.get(jobId).add(stageKey);
+    }
+
+
+    private SparkExecutor initiateExecutor(String executorID, long startTime) {
+        if (!executors.containsKey(executorID)) {
+            SparkExecutor executor = new SparkExecutor();
+            executor.setTags(new HashMap<>(this.app.getTags()));
+            
executor.getTags().put(SparkJobTagName.SPARK_EXECUTOR_ID.toString(), 
executorID);
+            executor.setStartTime(startTime);
+            executor.setTimestamp(app.getTimestamp());
+
+            this.executors.put(executorID, executor);
+        }
+
+        return this.executors.get(executorID);
+    }
+
+    private String getNormalizedName(String jobName, String assignedName) {
+        if (null != assignedName) {
+            return assignedName;
+        } else {
+            return JobNameNormalization.getInstance().normalize(jobName);
+        }
+    }
+
+    private void flushEntities(Object entity, boolean forceFlush) {
+        this.flushEntities(Collections.singletonList(entity), forceFlush);
+    }
+
+    private void flushEntities(Collection entities, boolean forceFlush) {
+        this.createEntities.addAll(entities);
+
+        if (forceFlush || this.createEntities.size() >= FLUSH_LIMIT) {
+            try {
+                this.doFlush(this.createEntities);
+                this.createEntities.clear();
+            } catch (Exception e) {
+                LOG.error("Fail to flush entities", e);
+            }
+
+        }
+    }
+
+    private EagleServiceBaseClient initiateClient() {
+        String host = conf.getString("eagleProps.eagle.service.host");
+        int port = conf.getInt("eagleProps.eagle.service.port");
+        String userName = conf.getString("eagleProps.eagle.service.username");
+        String pwd = conf.getString("eagleProps.eagle.service.password");
+        client = new EagleServiceClientImpl(host, port, userName, pwd);
+        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
+        client.getJerseyClient().setReadTimeout(timeout * 1000);
+
+        return client;
+    }
+
+    private void doFlush(List entities) throws IOException, 
EagleServiceClientException {
+        client.create(entities);
+        int size = (entities == null ? 0 : entities.size());
+        LOG.info("finish flushing entities of total number " + size);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
new file mode 100644
index 0000000..b1dd09c
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkParser.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+public class JHFSparkParser implements JHFParserBase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JHFSparkParser.class);
+
+    private boolean isValidJson;
+
+    private JHFSparkEventReader eventReader;
+
+    public JHFSparkParser(JHFSparkEventReader reader) {
+        this.eventReader = reader;
+    }
+
+    @Override
+    public void parse(InputStream is) throws Exception {
+        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(is))) {
+            for (String line = reader.readLine(); line != null; line = 
reader.readLine()) {
+                isValidJson = true;
+                JSONObject eventObj = parseAndValidateJSON(line);
+                if (isValidJson) {
+                    try {
+                        this.eventReader.read(eventObj);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            this.eventReader.clearReader();
+        }
+    }
+
+    private JSONObject parseAndValidateJSON(String line) {
+        JSONObject eventObj = null;
+        JSONParser parser = new JSONParser();
+        try {
+            eventObj = (JSONObject) parser.parse(line);
+        } catch (ParseException ex) {
+            isValidJson = false;
+            logger.error(String.format("Invalid json string. Fail to parse 
%s.", line), ex);
+        }
+        return eventObj;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
new file mode 100644
index 0000000..c206b71
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkApplicationInfo.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+public class SparkApplicationInfo {
+
+    private String state;
+    private String finalStatus;
+    private String queue;
+    private String name;
+    private String user;
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public String getFinalStatus() {
+        return finalStatus;
+    }
+
+    public void setFinalStatus(String finalStatus) {
+        this.finalStatus = finalStatus;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getUser() {
+        return user;
+    }
+
+    public void setUser(String user) {
+        this.user = user;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
new file mode 100644
index 0000000..0144410
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.history.crawl;
+
+import org.apache.eagle.jpm.util.SparkJobTagName;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkFilesystemInputStreamReaderImpl implements 
JHFInputStreamReader {
+
+    private String site;
+    private SparkApplicationInfo app;
+
+
+    public SparkFilesystemInputStreamReaderImpl(String site, 
SparkApplicationInfo app) {
+        this.site = site;
+        this.app = app;
+    }
+
+    @Override
+    public void read(InputStream is) throws Exception {
+        Map<String, String> baseTags = new HashMap<>();
+        baseTags.put(SparkJobTagName.SITE.toString(), site);
+        baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
+        JHFParserBase parser = new JHFSparkParser(new 
JHFSparkEventReader(baseTags, this.app));
+        parser.parse(is);
+    }
+
+    public static void main(String[] args) throws Exception {
+        SparkFilesystemInputStreamReaderImpl impl = new 
SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+        impl.read(new FileInputStream(new 
File("E:\\eagle\\application_1459803563374_535667_1")));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
index 9fafc1f..0bb65df 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/status/JobHistoryZKStateManager.java
@@ -19,7 +19,7 @@
 
 package org.apache.eagle.jpm.spark.history.status;
 
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
 import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -233,10 +233,11 @@ public class JobHistoryZKStateManager {
                     curator.setData().forPath(path, 
status.toString().getBytes("UTF-8"));
                 }
             } else {
-                LOG.error("Failed to update for application with path: " + 
path);
+                LOG.warn("failed to update with status {} due to path {} not 
existing ", status, path);
+                //throw new RuntimeException("Failed to update for application 
with path: " + path);
             }
         } catch (Exception e) {
-            LOG.error("fail to update application status", e);
+            LOG.error("fail to update application status as {}", status, e);
             throw new RuntimeException(e);
         } finally {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index e88c62f..0351de3 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -19,9 +19,9 @@
 
 package org.apache.eagle.jpm.spark.history.storm;
 
-import org.apache.eagle.jpm.spark.crawl.JHFInputStreamReader;
-import org.apache.eagle.jpm.spark.crawl.SparkApplicationInfo;
-import org.apache.eagle.jpm.spark.crawl.SparkFilesystemInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import 
org.apache.eagle.jpm.spark.history.crawl.SparkFilesystemInputStreamReaderImpl;
 import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
@@ -106,9 +106,12 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt 
{
             zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
             LOG.info("Successfully parse application {}", appId);
             collector.ack(tuple);
+        } catch (RuntimeException e) {
+            LOG.warn("fail to process application {} due to RuntimeException, 
ignore it", appId, e);
+            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
+            collector.ack(tuple);
         } catch (Exception e) {
-            LOG.error("Fail to process application {}", appId, e);
-            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FAILED);
+            LOG.error("Fail to process application {}, and retry", appId, e);
             collector.fail(tuple);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index 5602b4c..4c50607 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -91,10 +91,11 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
                 LOG.info("emit " + appId);
                 zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.SENT_FOR_PARSE);
             }
-            LOG.info("{} apps sent.", appIds.size());
 
             if (appIds.isEmpty()) {
-                this.takeRest(60);
+                this.takeRest(10);
+            } else {
+                LOG.info("{} apps sent.", appIds.size());
             }
         } catch (Exception e) {
             LOG.error("Fail to run next tuple", e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 26842b8..b94c603 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -153,24 +153,6 @@
             <value>http://sandbox.hortonworks.com:8088</value>
         </property>
         <property>
-            <name>storm.mode</name>
-            <displayName>mode</displayName>
-            <description>Storm Mode: local or cluster</description>
-            <value>local</value>
-        </property>
-        <property>
-            <name>storm.worker.num</name>
-            <displayName>worker.num</displayName>
-            <description>The number of workers</description>
-            <value>2</value>
-        </property>
-        <property>
-            <name>name</name>
-            <displayName>name</displayName>
-            <description>Name of the topology</description>
-            <value>sparkHistoryJob</value>
-        </property>
-        <property>
             <name>storm.messageTimeoutSec</name>
             <displayName>messageTimeoutSec</displayName>
             <description>Message timeout (in seconds)</description>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 58dd552..4c22b15 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -15,6 +15,9 @@
 
 
 {
+  "appId": "sparkHistoryJob",
+  "mode": "CLUSTER",
+  "workers" : 3,
   "basic":{
     "cluster":"sandbox",
     "dataCenter":"sandbox",
@@ -45,8 +48,6 @@
     }
   },
   "storm":{
-    worker.num: 2,
-    "name":"sparkHistoryJob",
     "messageTimeoutSec": 3000,
     "pendingSpout": 1000,
     "spoutCrawlInterval": 10000,#in ms
@@ -72,7 +73,5 @@
       spark.yarn.am.memoryOverhead.factor: 10,
       spark.yarn.overhead.min: "384m"
     }
-  },
-  "appId": "sparkHistoryJob",
-  "mode": "CLUSTER"
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
deleted file mode 100644
index 5d1cfaa..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class JPMEntityRepository extends EntityRepository {
-    public JPMEntityRepository() {
-        entitySet.add(SparkAppEntity.class);
-        entitySet.add(SparkJobEntity.class);
-        entitySet.add(SparkStageEntity.class);
-        entitySet.add(SparkTaskEntity.class);
-        entitySet.add(SparkExecutorEntity.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/3110c72e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
deleted file mode 100644
index e18f1e7..0000000
--- 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfig.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.jpm.spark.running.entities;
-
-import java.io.Serializable;
-import java.util.HashMap;
-
-public class JobConfig extends HashMap<String, String> implements Serializable 
{
-}
-

Reply via email to