http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
new file mode 100644
index 0000000..4e6bf03
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkJobParseBolt.java
@@ -0,0 +1,178 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.spark.history.crawl.JHFInputStreamReader;
+import org.apache.eagle.jpm.spark.history.crawl.SparkApplicationInfo;
+import 
org.apache.eagle.jpm.spark.history.crawl.SparkHistoryFileInputStreamReaderImpl;
+import org.apache.eagle.jpm.spark.history.status.JobHistoryZKStateManager;
+import org.apache.eagle.jpm.spark.history.status.ZKStateConstant;
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.eagle.jpm.util.resourceFetch.ResourceFetcher;
+import 
org.apache.eagle.jpm.util.resourceFetch.SparkHistoryServerResourceFetcher;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplicationAttempt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SparkJobParseBolt extends BaseRichBolt {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkJobParseBolt.class);
+
+    private OutputCollector collector;
+    private ResourceFetcher historyServerFetcher;
+    private SparkHistoryCrawlConfig config;
+    private JobHistoryZKStateManager zkState;
+    private Configuration hdfsConf;
+
+    public SparkJobParseBolt(SparkHistoryCrawlConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
+        this.collector = outputCollector;
+        this.hdfsConf  = new Configuration();
+        this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+        this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+        this.hdfsConf.set("hdfs.kerberos.principal", 
config.hdfsConfig.principal);
+        this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+        this.historyServerFetcher = new 
SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
+                config.jobHistoryConfig.historyServerUserName, 
config.jobHistoryConfig.historyServerUserPwd);
+        this.zkState = new JobHistoryZKStateManager(config);
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        String appId = tuple.getStringByField("appId");
+        FileSystem hdfs = null;
+        try {
+            if (!zkState.hasApplication(appId)) {
+                //may already be processed due to some reason
+                collector.ack(tuple);
+                return;
+            }
+
+            SparkApplicationInfo info = zkState.getApplicationInfo(appId);
+            //first try to get attempts under the application
+            List<SparkApplicationAttempt> attempts = 
this.getAttemptList(appId);
+
+            if (attempts.isEmpty()) {
+                LOG.info("Application:{}( Name:{}, user: {}, queue: {}) not 
found on history server.", appId, info.getName(), info.getUser(), 
info.getQueue());
+            } else {
+                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+                for (SparkApplicationAttempt attempt : attempts) {
+                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir 
+ "/" + this.getAppAttemptLogName(appId, attempt.getAttemptId()));
+                    JHFInputStreamReader reader = new 
SparkHistoryFileInputStreamReaderImpl(config.info.site , info);
+                    reader.read(hdfs.open(attemptFile));
+                }
+            }
+
+            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FINISHED);
+            LOG.info("Successfully parse application {}", appId);
+            collector.ack(tuple);
+        } catch (Exception e) {
+            LOG.error("Fail to process application {}", appId, e);
+            zkState.updateApplicationStatus(appId, 
ZKStateConstant.AppStatus.FAILED);
+            collector.fail(tuple);
+        } finally {
+            if (null != hdfs) {
+                try {
+                    hdfs.close();
+                } catch (Exception e) {
+                    LOG.error("Fail to close hdfs");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) 
{
+
+    }
+
+    private String getAppAttemptLogName(String appId, String attemptId) {
+        return String.format("%s_%s", appId, attemptId);
+    }
+
+    private List<SparkApplicationAttempt> getAttemptList(String appId) throws 
IOException {
+        FileSystem hdfs = null;
+        List<SparkApplicationAttempt> attempts = new ArrayList<>();
+        try {
+
+            SparkApplication app = null;
+            /*try {
+                List apps = 
this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, 
appId);
+                if (apps != null) {
+                    app = (SparkApplication) apps.get(0);
+                    attempts = app.getAttempts();
+                }
+            } catch (Exception e) {
+                LOG.warn("Fail to get application detail from history server 
for appId " + appId, e);
+            }*/
+
+
+            if (null == app) {
+                //history server may not have the info, just double check
+                hdfs = HDFSUtil.getFileSystem(this.hdfsConf);
+                Integer attemptId = 1;
+
+                boolean exists = true;
+                while (exists) {
+                    Path attemptFile = new Path(this.config.hdfsConfig.baseDir 
+ "/" + this.getAppAttemptLogName(appId, attemptId.toString()));
+                    if (hdfs.exists(attemptFile)) {
+                        SparkApplicationAttempt attempt = new 
SparkApplicationAttempt();
+                        attempt.setAttemptId(attemptId.toString());
+                        attempts.add(attempt);
+                        attemptId++;
+                    } else {
+                        exists = false;
+                    }
+                }
+            }
+            return attempts;
+        } finally {
+            if (null != hdfs) {
+                hdfs.close();
+            }
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        super.cleanup();
+        zkState.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
new file mode 100644
index 0000000..eb30f5e
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/TestHDFS.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.spark.history.storm;
+
+import org.apache.eagle.jpm.spark.history.config.SparkHistoryCrawlConfig;
+import org.apache.eagle.jpm.util.HDFSUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHDFS {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestHDFS.class);
+    public static void main(String[] args) throws Exception{
+        SparkHistoryCrawlConfig config = new SparkHistoryCrawlConfig();
+
+        Configuration conf  = new Configuration();
+        conf.set("fs.defaultFS", config.hdfsConfig.endpoint);
+        conf.set("hdfs.kerberos.principal", config.hdfsConfig.principal);
+        conf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+
+        FileSystem hdfs = HDFSUtil.getFileSystem(conf);
+        Path path = new Path("/logs/spark-events/local-1463002514438");
+        boolean exists = hdfs.exists(path);
+        LOG.info("File exists:{}", exists);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 0000000..21686a6
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DistributedFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
+org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+org.apache.hadoop.hdfs.web.SWebHdfsFileSystem
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
new file mode 100644
index 0000000..36f0836
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+{
+  "basic":{
+    "cluster":"sandbox",
+    "datacenter":"sandbox",
+    jobConf.additional.info: []
+  },
+  "eagleProps":{
+    eagle.service.host:"sandbox.hortonworks.com",
+    eagle.service.port: 9099,
+    eagle.service.userName: "admin",
+    eagle.service.pwd : "secret",
+    eagle.service.read_timeout : 2
+  },
+  "dataSourceConfig":{
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/sparkJobHistory",
+    "zkSessionTimeoutMs" : 15000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 20000,
+    spark.history.server.url : "http://sandbox.hortonworks.com:18080/";,
+    spark.history.server.username : "",
+    spark.history.server.pwd : "",
+    rm.url:["http://sandbox.hortonworks.com:8088";] ,
+    "hdfs": {
+      "baseDir": "/logs/spark-events",
+      "endPoint": "hdfs://sandbox.hortonworks.com:8020",
+      "principal": "",
+      "keytab" : ""
+      }
+  },
+  "storm":{
+    "mode": "local",
+    "workerNo": 2,
+    "name":"sparkHistory",
+    "messageTimeoutSec":  3000,
+    "pendingSpout": 1000,
+    "spoutCrawlInterval": 10000,#in ms
+    "parallelismConfig" : {
+      "sparkHistoryJobSpout" : 1,
+      "sparkHistoryJobBolt" : 6
+    },
+    "tasks" : {
+      "sparkHistoryJobSpout" : 1,
+      "sparkHistoryJobBolt" : 6
+    }
+  },
+  "spark":{
+    "defaultVal":{
+      spark.executor.memory:"1g",
+      spark.driver.memory: "1g",
+      spark.driver.cores:1,
+      spark.executor.cores:1,
+      spark.yarn.am.memory:"512m",
+      spark.yarn.am.cores:1,
+      spark.yarn.executor.memoryOverhead.factor: 10,
+      spark.yarn.driver.memoryOverhead.factor: 10,
+      spark.yarn.am.memoryOverhead.factor: 10,
+      spark.yarn.overhead.min: "384m"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
new file mode 100644
index 0000000..6b8c8d6
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout, DRFA
+
+eagle.log.dir=../logs
+eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-spark-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-running/pom.xml 
b/eagle-jpm/eagle-jpm-spark-running/pom.xml
new file mode 100644
index 0000000..7bf90d4
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-spark-running/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-spark-running</artifactId>
+  <name>eagle-jpm-spark-running</name>
+  <url>http://maven.apache.org</url>
+  <dependencies>
+       <dependency>
+               <groupId>org.slf4j</groupId>
+               <artifactId>slf4j-api</artifactId>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.eagle</groupId>
+               <artifactId>eagle-stream-process-api</artifactId>
+        <version>${project.version}</version>
+       </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-process-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+       <dependency>
+               <groupId>org.apache.eagle</groupId>
+               <artifactId>eagle-job-common</artifactId>
+               <version>${project.version}</version>
+       </dependency>           
+       <dependency>
+               <groupId>org.jsoup</groupId>
+               <artifactId>jsoup</artifactId>
+       </dependency>
+       <dependency>
+               <groupId>org.apache.storm</groupId>
+               <artifactId>storm-core</artifactId>
+               <exclusions>
+               <exclusion>
+                       <groupId>ch.qos.logback</groupId>
+                       <artifactId>logback-classic</artifactId>
+               </exclusion>
+       </exclusions>
+       </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-util/pom.xml b/eagle-jpm/eagle-jpm-util/pom.xml
new file mode 100644
index 0000000..ed7f658
--- /dev/null
+++ b/eagle-jpm/eagle-jpm-util/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"; 
xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.eagle</groupId>
+    <artifactId>eagle-jpm-parent</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+  <artifactId>eagle-jpm-util</artifactId>
+  <name>eagle-jpm-util</name>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.googlecode.json-simple</groupId>
+      <artifactId>json-simple</artifactId>
+      <version>1.1.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${storm.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>log4j-over-slf4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>1.9</version>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
new file mode 100644
index 0000000..0792f15
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/Constants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class Constants {
+
+    public final static String SPARK_APP_SERVICE_ENDPOINT_NAME = 
"SparkAppService";
+    public final static String SPARK_JOB_SERVICE_ENDPOINT_NAME = 
"SparkJobService";
+    public final static String SPARK_STAGE_SERVICE_ENDPOINT_NAME = 
"SparkStageService";
+    public final static String SPARK_TASK_SERVICE_ENDPOINT_NAME = 
"SparkTaskService";
+    public final static String SPARK_EXECUTOR_SERVICE_ENDPOINT_NAME = 
"SparkExecutorService";
+
+    public static final String APPLICATION_PREFIX = "application";
+    public static final String JOB_PREFIX = "job";
+    public static final String V2_APPS_URL = "ws/v1/cluster/apps";
+    public static final String ANONYMOUS_PARAMETER = "anonymous=true";
+
+    public static final String V2_APPS_RUNNING_URL = 
"ws/v1/cluster/apps?state=RUNNING";
+    public static final String V2_APPS_COMPLETED_URL = 
"ws/v1/cluster/apps?state=FINISHED";
+
+    public static final String SPARK_APPS_URL ="api/v1/applications";
+
+    public enum CompressionType {
+        GZIP, NONE
+    }
+    public enum JobState {
+        RUNNING, COMPLETED, ALL
+    }
+
+    public enum ResourceType {
+         COMPLETE_SPARK_JOB, SPARK_JOB_DETAIL
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
new file mode 100644
index 0000000..8adb001
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/HDFSUtil.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+
+public class HDFSUtil {
+
+    public static FileSystem getFileSystem(Configuration conf) throws 
IOException {
+        HDFSUtil.login(conf);
+       return FileSystem.get(conf);
+    }
+
+    public static void login(Configuration kConfig) throws IOException {
+        if(kConfig.get("hdfs.kerberos.principal") == null || 
kConfig.get("hdfs.kerberos.principal").isEmpty()){
+            return;
+        }
+       kConfig.setBoolean("hadoop.security.authorization", true);
+       kConfig.set("hadoop.security.authentication", "kerberos");
+       UserGroupInformation.setConfiguration(kConfig);
+       
UserGroupInformation.loginUserFromKeytab(kConfig.get("hdfs.kerberos.principal"),
 kConfig.get("hdfs.keytab.file"));
+     }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
new file mode 100644
index 0000000..8080147
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JSONUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+public class JSONUtil {
+
+    public static String getString(JSONObject obj, String field) {
+        if (obj.containsKey(field)) {
+            return (String) obj.get(field);
+        }
+        return null;
+    }
+
+    public static Integer getInt(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return ((Long)obj.get(field)).intValue();
+        }
+        return null;
+    }
+
+    public static Long getLong(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (Long)obj.get(field);
+        }
+        return null;
+    }
+
+    public static Boolean getBoolean(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (Boolean)obj.get(field);
+        }
+        return null;
+    }
+
+    public static JSONObject getJSONObject(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (JSONObject)obj.get(field);
+        }
+        return null;
+    }
+
+    public static JSONArray getJSONArray(JSONObject obj, String field){
+        if(obj.containsKey(field)){
+            return (JSONArray)obj.get(field);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
new file mode 100644
index 0000000..c5cc82f
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/JobNameNormalization.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JobNameNormalization {
+       private static Logger logger = 
LoggerFactory.getLogger(JobNameNormalization.class);
+       private static JobNameNormalization instance = new 
JobNameNormalization();
+       private static final String JOB_NAME_NORMALIZATION_RULES_KEY = 
"job.name.normalization.rules.key";
+       private static final String PARAMETERIZED_PREFIX = "\\$";
+       private static final String MULTIPLE_RULE_DILIMITER = ";";
+       /**
+        * map from source string to target string
+        * source string is regular expression, for example 
^(.*)[0-9]{4}/[0-9]{2}/[0-9]{2}/[0-9]{2}$
+        * target string is parameterized string, for example $1, $2
+        */
+       private List<JobNameNormalizationRule> _rules = new 
ArrayList<JobNameNormalizationRule>();
+       
+       private enum NormalizationOp{
+               REPLACE("=>");
+               private String value;
+               private NormalizationOp(String value){
+                       this.value = value;
+               }
+               public String toString(){
+                       return value;
+               }
+       }
+       
+       static class JobNameNormalizationRule{
+               Pattern pattern;
+               NormalizationOp op;
+               String target;
+       }
+       
+       private JobNameNormalization(){
+               try{
+                       // load normalization rules
+                       Config conf = ConfigFactory.load();
+                       String key = 
JOB_NAME_NORMALIZATION_RULES_KEY.toLowerCase();
+                       String value = conf.getString(key);
+                       if(value == null){
+                               logger.info("no job name normalization rules 
are loaded");
+                               return;
+                       }
+                       // multiple rules are concatenated with semicolon, i.e. 
;
+                       String rules[] = value.split(MULTIPLE_RULE_DILIMITER);
+                       for(String rule : rules){
+                               rule = rule.trim();
+                               logger.info("jobNormalizationRule is loaded " + 
rule);
+                               addRule(rule);
+                       }
+               }catch(Exception ex){
+                       logger.error("fail loading job name normalization 
rules", ex);
+                       throw new RuntimeException(ex);
+               }
+       }
+       
+       public static JobNameNormalization getInstance(){
+               return instance;
+       }
+       
+       private void addRule(String rule){
+               for(NormalizationOp op : NormalizationOp.values()){
+                       // split the rule to be source and target string
+                       String elements[] = rule.split(op.toString());
+                       if(elements == null || elements.length != 2) return;
+                       JobNameNormalizationRule r = new 
JobNameNormalizationRule();
+                       r.pattern = Pattern.compile(elements[0].trim());
+                       r.op = op;
+                       r.target = elements[1].trim();
+                       _rules.add(r);
+                       break;  //once one Op is matched, exit
+               }
+       
+       }
+       
+       public String normalize(String jobName){
+               String normalizedJobName = jobName;
+               // go through each rules and do actions
+               for(JobNameNormalizationRule rule : _rules){
+                       Pattern p = rule.pattern;
+                       Matcher m = p.matcher(jobName);
+                       if(m.find()){
+                               normalizedJobName = rule.target;
+                               int c = m.groupCount();
+                               for(int i=1; i<c+1; i++){
+                                       normalizedJobName = 
normalizedJobName.replaceAll(PARAMETERIZED_PREFIX+String.valueOf(i), 
m.group(i));
+                               }
+                       }
+               }
+               return normalizedJobName;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
new file mode 100644
index 0000000..35014b1
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkEntityConstant.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public class SparkEntityConstant {
+
+    public enum SPARK_STAGE_STATUS{
+        COMPLETE, FAILED
+    }
+
+    public enum SPARK_JOB_STATUS{
+        SUCCEEDED, FAILED
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
new file mode 100644
index 0000000..1d38eea
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/SparkJobTagName.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.jpm.util;
+
+public enum SparkJobTagName {
+    SITE("site"),
+    SPARK_APP_ID("sprkAppId"),
+    SPARK_APP_ATTEMPT_ID("sprkAppAttemptId"),
+    SPARK_APP_NAME("sprkAppName"),
+    SPARK_APP_NORM_NAME("normSprkAppName"),
+    SPARK_JOB_ID("jobId"),
+    SPARK_SATGE_ID("stageId"),
+    SPARK_STAGE_ATTEMPT_ID("stageAttemptId"),
+    SPARK_TASK_INDEX("taskIndex"),
+    SPARK_TASK_ATTEMPT_ID("taskAttemptId"),
+    SPARK_USER("user"),
+    SPARK_QUEUE("queue"),
+    SPARK_EXECUTOR_ID("executorId");
+
+
+    private String tagName;
+    private SparkJobTagName(String tagName){
+        this.tagName = tagName;
+    }
+
+    public String toString(){
+        return this.tagName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
new file mode 100644
index 0000000..d5147b6
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/RMResourceFetcher.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelector;
+import org.apache.eagle.jpm.util.resourceFetch.ha.HAURLSelectorImpl;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppInfo;
+import org.apache.eagle.jpm.util.resourceFetch.model.AppsWrapper;
+import 
org.apache.eagle.jpm.util.resourceFetch.url.JobListServiceURLBuilderImpl;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import 
org.apache.eagle.jpm.util.resourceFetch.url.SparkCompleteJobServiceURLBuilderImpl;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class RMResourceFetcher implements ResourceFetcher{
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(RMResourceFetcher.class);
+       private final HAURLSelector selector;
+       private final ServiceURLBuilder jobListServiceURLBuilder;
+       private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
+       
+       private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+       
+       static {
+               
OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
+       }
+       
+       public RMResourceFetcher(String[] RMBasePaths) {
+               this.jobListServiceURLBuilder = new 
JobListServiceURLBuilderImpl();
+               this.sparkCompleteJobServiceURLBuilder = new 
SparkCompleteJobServiceURLBuilderImpl();
+
+               this.selector = new HAURLSelectorImpl(RMBasePaths, 
jobListServiceURLBuilder, Constants.CompressionType.GZIP);
+       }
+       
+       private void checkUrl() throws IOException {
+               if 
(!selector.checkUrl(jobListServiceURLBuilder.build(selector.getSelectedUrl(), 
Constants.JobState.RUNNING.name()))) {
+                       selector.reSelectUrl();
+               }
+       }
+       
+       private List<Object> doFetchSparkFinishApplicationsList(String 
lastFinishTime) throws Exception {
+               List<AppInfo> result = null;
+               InputStream is = null;
+               try {
+                       checkUrl();
+                       final String urlString = 
sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), 
lastFinishTime);
+                       LOG.info("Going to call yarn api to fetch finished 
spark job list: " + urlString);
+                       is = InputStreamUtils.getInputStream(urlString, null, 
Constants.CompressionType.GZIP);
+                       final AppsWrapper appWrapper = OBJ_MAPPER.readValue(is, 
AppsWrapper.class);
+                       if (appWrapper != null && appWrapper.getApps() != null
+                                       && appWrapper.getApps().getApp() != 
null) {
+                               result = appWrapper.getApps().getApp();
+                               return Arrays.asList((Object)result);
+                       }
+                       return null;
+               }finally {
+                       if (is != null) { try {is.close();} catch (Exception 
e){} }
+               }
+       }
+       
+
+       
+       public List<Object> getResource(Constants.ResourceType resoureType, 
Object... parameter) throws Exception{
+               switch(resoureType) {
+                       case COMPLETE_SPARK_JOB:
+                               return 
doFetchSparkFinishApplicationsList((String)parameter[0]);
+
+                       default:
+                               throw new Exception("Not support ressourceType 
:" + resoureType);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
new file mode 100644
index 0000000..b21d030
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ResourceFetcher.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.util.List;
+
+public interface ResourceFetcher {
+
+       List<Object> getResource(Constants.ResourceType resoureType, Object... 
parameter) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
new file mode 100644
index 0000000..c13bee0
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/SparkHistoryServerResourceFetcher.java
@@ -0,0 +1,81 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.model.SparkApplication;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import 
org.apache.eagle.jpm.util.resourceFetch.url.SparkJobServiceURLBuilderImpl;
+import org.apache.commons.codec.binary.Base64;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class SparkHistoryServerResourceFetcher implements ResourceFetcher{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkHistoryServerResourceFetcher.class);
+
+    private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
+
+    private String historyServerURL;
+    private final ServiceURLBuilder sparkDetailJobServiceURLBuilder;
+    private String auth;
+
+    static {
+        OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
true);
+    }
+
+    public SparkHistoryServerResourceFetcher(String historyServerURL, String 
userName, String pwd){
+        this.historyServerURL = historyServerURL;
+        this.sparkDetailJobServiceURLBuilder = new 
SparkJobServiceURLBuilderImpl();
+        this.auth = "Basic " + new String(new 
Base64().encode(String.format("%s:%s", userName, pwd).getBytes()));;
+    }
+
+    private List<Object> doFetchSparkApplicationDetail(String appId) throws 
Exception {
+        InputStream is = null;
+        try {
+            final String urlString = 
sparkDetailJobServiceURLBuilder.build(this.historyServerURL, appId);
+            LOG.info("Going to call spark history server api to fetch spark 
job: " + urlString);
+            is = InputStreamUtils.getInputStream(urlString, auth, 
Constants.CompressionType.NONE);
+            SparkApplication app = OBJ_MAPPER.readValue(is, 
SparkApplication.class);
+            return Arrays.asList((Object)app);
+        } catch (FileNotFoundException e) {
+            return null;
+        } finally {
+            if (is != null) { try {is.close();} catch (Exception e) { } }
+        }
+    }
+
+    public List<Object> getResource(Constants.ResourceType resoureType, 
Object... parameter) throws Exception{
+        switch(resoureType) {
+            case SPARK_JOB_DETAIL:
+                return doFetchSparkApplicationDetail((String)parameter[0]);
+            default:
+                throw new Exception("Not support resourceType :" + 
resoureType);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
new file mode 100644
index 0000000..6d3fa45
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/InputStreamUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.zip.GZIPInputStream;
+
+public class InputStreamUtils {
+
+       private static final int CONNECTION_TIMEOUT = 10 * 1000;
+       private static final int READ_TIMEOUT = 5 * 60 * 1000;
+       private static final String GZIP_HTTP_HEADER = "Accept-Encoding";
+       private static final String GZIP_COMPRESSION = "gzip";
+       
+       private static InputStream openGZIPInputStream(URL url, String auth, 
int timeout) throws IOException {
+               final URLConnection connection = url.openConnection();
+               connection.setConnectTimeout(CONNECTION_TIMEOUT);
+               connection.setReadTimeout(timeout);
+               connection.addRequestProperty(GZIP_HTTP_HEADER, 
GZIP_COMPRESSION);
+               if (null != auth){
+                       connection.setRequestProperty ("Authorization", auth);
+               }
+               return new GZIPInputStream(connection.getInputStream());
+       }
+       
+       private static InputStream openInputStream(URL url, String auth, int 
timeout) throws IOException {
+               URLConnection connection = url.openConnection();
+               connection.setConnectTimeout(timeout);
+               if (null != auth){
+                       connection.setRequestProperty ("Authorization", auth);
+               }
+
+               return connection.getInputStream();
+       }
+       
+       public static InputStream getInputStream(String urlString, String auth, 
Constants.CompressionType compressionType, int timeout) throws Exception {
+               final URL url = URLConnectionUtils.getUrl(urlString);
+               if (compressionType.equals(Constants.CompressionType.GZIP)) {
+                       return openGZIPInputStream(url, auth, timeout);
+               }
+               else { // CompressionType.NONE
+                       return openInputStream(url, auth, timeout);
+               }
+       }
+       
+       public static InputStream getInputStream(String urlString, String auth, 
Constants.CompressionType compressionType) throws Exception {
+               return getInputStream(urlString, auth, compressionType, 
READ_TIMEOUT);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
new file mode 100644
index 0000000..2e7b248
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/JobUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobUtils {
+       
+       public static String checkAndAddLastSlash(String urlBase) {
+               if (!urlBase.endsWith("/")) {
+                       return urlBase + "/";
+               }
+               return urlBase;
+       }
+       
+       public static String getJobIDByAppID(String appID) {
+               if (appID.startsWith(Constants.APPLICATION_PREFIX)) {
+                       return appID.replace(Constants.APPLICATION_PREFIX, 
Constants.JOB_PREFIX);
+               }
+               return null;
+       }
+
+       public static String getAppIDByJobID(String jobID) {
+               if (jobID.startsWith(Constants.JOB_PREFIX)) {
+                       return jobID.replace(Constants.JOB_PREFIX, 
Constants.APPLICATION_PREFIX);
+               }
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
new file mode 100644
index 0000000..d340d7b
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/connection/URLConnectionUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.connection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.*;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+
+public final class URLConnectionUtils {
+       //TODO: change some public method to private
+    private static final Logger LOG = 
LoggerFactory.getLogger(URLConnectionUtils.class);
+       
+       public static URLConnection getConnection(String url) throws Exception {
+               if (url.startsWith("https://";)) {
+                       return getHTTPSConnection(url);
+               } else if (url.startsWith("http://";)) {
+                       return getHTTPConnection(url);
+               }
+               throw new Exception("Invalid input argument url: " + url);
+       }
+
+       public static URLConnection getHTTPConnection(String urlString) throws 
Exception {
+               final URL url = new URL(urlString);
+               return url.openConnection();
+       }
+
+       public static URL getUrl(String urlString) throws Exception  {
+               if(urlString.toLowerCase().contains("https")){
+                       return getHTTPSUrl(urlString);
+               }else if (urlString.toLowerCase().contains("http")) {
+                       return getURL(urlString);
+               }
+               throw new Exception("Invalid input argument url: " + urlString);
+       }
+       
+       public static URL getURL(String urlString) throws MalformedURLException 
{
+               return new URL(urlString);
+       }
+       
+       public static URL getHTTPSUrl(String urlString) throws 
MalformedURLException, NoSuchAlgorithmException, KeyManagementException  {
+       // Create a trust manager that does not validate certificate chains   
+        final TrustManager[] trustAllCerts = new TrustManager[] {new 
TrustAllX509TrustManager()};
+        // Install the all-trusting trust manager   
+        final SSLContext sc = SSLContext.getInstance("SSL");   
+        sc.init(null, trustAllCerts, new java.security.SecureRandom());   
+        HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());  
 
+        // Create all-trusting host name verifier   
+        final HostnameVerifier allHostsValid = new HostnameVerifier() {   
+            public boolean verify(String hostname, SSLSession session) {   
+                return true;   
+            }   
+        };
+        HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid);
+        return new URL(urlString);
+       }
+
+       public static URLConnection getHTTPSConnection(String urlString) throws 
IOException, KeyManagementException, NoSuchAlgorithmException  {
+               final URL url = getHTTPSUrl(urlString);
+               return url.openConnection();
+       }
+       
+       public static class TrustAllX509TrustManager implements 
X509TrustManager {
+               @Override
+               public void checkClientTrusted(
+                               java.security.cert.X509Certificate[] chain, 
String authType)
+                               throws CertificateException {
+               }
+
+               @Override
+               public void checkServerTrusted(
+                               java.security.cert.X509Certificate[] chain, 
String authType)
+                               throws CertificateException {
+               }
+
+               @Override
+               public java.security.cert.X509Certificate[] 
getAcceptedIssuers() {
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
new file mode 100644
index 0000000..6eea7e3
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelector.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import java.io.IOException;
+
+public interface HAURLSelector {
+       
+       boolean checkUrl(String url);
+               
+       void reSelectUrl() throws IOException;
+       
+       String getSelectedUrl();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
new file mode 100644
index 0000000..6518ca1
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/ha/HAURLSelectorImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.ha;
+
+import org.apache.eagle.jpm.util.Constants;
+import org.apache.eagle.jpm.util.resourceFetch.connection.InputStreamUtils;
+import org.apache.eagle.jpm.util.resourceFetch.url.ServiceURLBuilder;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+public class HAURLSelectorImpl implements HAURLSelector {
+
+       private final String[] urls;
+       private volatile String selectedUrl;
+       private final ServiceURLBuilder builder;
+       
+       private volatile boolean reselectInProgress;
+       private final Constants.CompressionType compressionType;
+       private static final long MAX_RETRY_TIME = 3;
+       private static final Logger LOG = 
LoggerFactory.getLogger(HAURLSelectorImpl.class);
+       
+       public HAURLSelectorImpl(String[] urls, ServiceURLBuilder builder, 
Constants.CompressionType compressionType) {
+               this.urls = urls;
+               this.compressionType = compressionType;
+               this.builder = builder;
+       }
+       
+       public boolean checkUrl(String urlString) {
+               InputStream is = null;
+               try {
+                       is = InputStreamUtils.getInputStream(urlString, null, 
compressionType);
+               }
+               catch (Exception ex) {
+                       LOG.info("get inputstream from url: " + urlString + " 
failed. ");
+                       return false;
+               }
+               finally {
+                       if (is != null) { try { is.close(); } catch 
(IOException e) {/*Do nothing*/} }
+               }
+               return true;
+       }
+
+       @Override
+       public String getSelectedUrl() {
+               if (selectedUrl == null) {
+                       selectedUrl = urls[0];
+               }
+               return selectedUrl;
+       }
+       
+       @Override
+       public void reSelectUrl() throws IOException {
+               if (reselectInProgress) return;
+               synchronized(this) {
+                       if (reselectInProgress) return;
+                       reselectInProgress = true;
+                       try {
+                               LOG.info("Going to reselect url");
+                               for (int i = 0; i < urls.length; i++) {         
+                                       String urlToCheck = urls[i];
+                                       LOG.info("Going to try url :" + 
urlToCheck);
+                                       for (int time = 0; time < 
MAX_RETRY_TIME; time++) {
+                                               if 
(checkUrl(builder.build(urlToCheck, Constants.JobState.RUNNING.name()))) {
+                                                       selectedUrl = urls[i];
+                                                       LOG.info("Successfully 
switch to new url : " + selectedUrl);
+                                                       return;
+                                               }
+                                               LOG.info("try url " + 
urlToCheck + "fail for " + (time+1) + " times, sleep 5 seconds before try 
again. ");
+                                               try {
+                                                       Thread.sleep(5 * 1000);
+                                               }
+                                               catch (InterruptedException ex) 
{ /* Do Nothing */}
+                                       }
+                               }
+                               throw new IOException("No alive url found: "+ 
StringUtils.join(";", Arrays.asList(this.urls)));
+                       }
+                       finally {
+                               reselectInProgress = false;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
new file mode 100644
index 0000000..463ce1e
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppInfo.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppInfo {
+       String id;
+       String user;
+       String name;
+       String queue;
+       String state;
+       String finalStatus;
+       double progress;
+       String trackingUI;
+       String trackingUrl;
+       String diagnostics;
+       String clusterId;
+       String applicationType;
+       long startedTime;
+       long finishedTime;
+       long elapsedTime;
+       String amContainerLogs;
+       String amHostHttpAddress;
+       
+       public String getId() {
+               return id;
+       }
+       public void setId(String id) {
+               this.id = id;
+       }
+       public String getUser() {
+               return user;
+       }
+       public void setUser(String user) {
+               this.user = user;
+       }
+       public String getName() {
+               return name;
+       }
+       public void setName(String name) {
+               this.name = name;
+       }
+       public String getQueue() {
+               return queue;
+       }
+       public void setQueue(String queue) {
+               this.queue = queue;
+       }
+       public String getState() {
+               return state;
+       }
+       public void setState(String state) {
+               this.state = state;
+       }
+       public String getFinalStatus() {
+               return finalStatus;
+       }
+       public void setFinalStatus(String finalStatus) {
+               this.finalStatus = finalStatus;
+       }
+       public double getProgress() {
+               return progress;
+       }
+       public void setProgress(double progress) {
+               this.progress = progress;
+       }
+       public String getTrackingUI() {
+               return trackingUI;
+       }
+       public void setTrackingUI(String trackingUI) {
+               this.trackingUI = trackingUI;
+       }
+       public String getTrackingUrl() {
+               return trackingUrl;
+       }
+       public void setTrackingUrl(String trackingUrl) {
+               this.trackingUrl = trackingUrl;
+       }
+       public String getDiagnostics() {
+               return diagnostics;
+       }
+       public void setDiagnostics(String diagnostics) {
+               this.diagnostics = diagnostics;
+       }
+       public String getClusterId() {
+               return clusterId;
+       }
+       public void setClusterId(String clusterId) {
+               this.clusterId = clusterId;
+       }
+       public String getApplicationType() {
+               return applicationType;
+       }
+       public void setApplicationType(String applicationType) {
+               this.applicationType = applicationType;
+       }
+       public long getStartedTime() {
+               return startedTime;
+       }
+       public void setStartedTime(long startedTime) {
+               this.startedTime = startedTime;
+       }
+       public long getFinishedTime() {
+               return finishedTime;
+       }
+       public void setFinishedTime(long finishedTime) {
+               this.finishedTime = finishedTime;
+       }
+       public long getElapsedTime() {
+               return elapsedTime;
+       }
+       public void setElapsedTime(long elapsedTime) {
+               this.elapsedTime = elapsedTime;
+       }
+       public String getAmContainerLogs() {
+               return amContainerLogs;
+       }
+       public void setAmContainerLogs(String amContainerLogs) {
+               this.amContainerLogs = amContainerLogs;
+       }
+       public String getAmHostHttpAddress() {
+               return amHostHttpAddress;
+       }
+       public void setAmHostHttpAddress(String amHostHttpAddress) {
+               this.amHostHttpAddress = amHostHttpAddress;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
new file mode 100644
index 0000000..741fa1d
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/Applications.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Applications {
+
+       private List<AppInfo> app;
+
+       public List<AppInfo> getApp() {
+               return app;
+       }
+
+       public void setApp(List<AppInfo> app) {
+               this.app = app;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
new file mode 100644
index 0000000..d791685
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/AppsWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AppsWrapper {
+       
+       private Applications apps;
+
+       public Applications getApps() {
+               return apps;
+       }
+
+       public void setApps(Applications apps) {
+               this.apps = apps;
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
new file mode 100644
index 0000000..5d25d84
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplication.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.List;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplication {
+    String id;
+    String name;
+    List<SparkApplicationAttempt> attempts;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public List<SparkApplicationAttempt> getAttempts() {
+        return attempts;
+    }
+
+    public void setAttempts(List<SparkApplicationAttempt> attempts) {
+        this.attempts = attempts;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
new file mode 100644
index 0000000..6e91c03
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationAttempt.java
@@ -0,0 +1,73 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationAttempt {
+    String attemptId;
+    String sparkUser;
+    String startTime;
+    String endTime;
+    boolean completed;
+
+    public String getAttemptId() {
+        return attemptId;
+    }
+
+    public void setAttemptId(String attemptId) {
+        this.attemptId = attemptId;
+    }
+
+    public String getSparkUser() {
+        return sparkUser;
+    }
+
+    public void setSparkUser(String sparkUser) {
+        this.sparkUser = sparkUser;
+    }
+
+    public String getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(String startTime) {
+        this.startTime = startTime;
+    }
+
+    public String getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(String endTime) {
+        this.endTime = endTime;
+    }
+
+    public boolean isCompleted() {
+        return completed;
+    }
+
+    public void setCompleted(boolean completed) {
+        this.completed = completed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
new file mode 100644
index 0000000..5508863
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/model/SparkApplicationWrapper.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.model;
+
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationWrapper {
+
+    SparkApplication app;
+
+    public SparkApplication getApp() {
+        return app;
+    }
+
+    public void setApp(SparkApplication app) {
+        this.app = app;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
new file mode 100644
index 0000000..3e21e8d
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/JobListServiceURLBuilderImpl.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class JobListServiceURLBuilderImpl implements ServiceURLBuilder {
+       
+       public String build(String ... parameters) {
+               // {rmUrl}/ws/v1/cluster/apps?state=RUNNING 
+               String jobState = parameters[1];
+               if (jobState.equals(Constants.JobState.RUNNING.name())) {
+                       return parameters[0] + "/" + 
Constants.V2_APPS_RUNNING_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+               }
+               else if (jobState.equals(Constants.JobState.COMPLETED.name())) {
+                       return parameters[0] + "/" + 
Constants.V2_APPS_COMPLETED_URL + "&" + Constants.ANONYMOUS_PARAMETER;
+               }
+               else if (jobState.equals(Constants.JobState.ALL.name())) {
+                       return parameters[0]  + "/" + Constants.V2_APPS_URL + 
"&" + Constants.ANONYMOUS_PARAMETER;
+               }
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
new file mode 100644
index 0000000..597e359
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/ServiceURLBuilder.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+public interface ServiceURLBuilder {
+       String build(String... parameters);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fe509125/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
new file mode 100644
index 0000000..d4d235f
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourceFetch/url/SparkCompleteJobServiceURLBuilderImpl.java
@@ -0,0 +1,29 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.jpm.util.resourceFetch.url;
+
+import org.apache.eagle.jpm.util.Constants;
+
+public class SparkCompleteJobServiceURLBuilderImpl implements 
ServiceURLBuilder {
+
+    public String build(String... parameters) {
+        return 
String.format("%s/%s?applicationTypes=SPARK&state=FINISHED&finishedTimeBegin=%s&%s",
 parameters[0], Constants.V2_APPS_URL, parameters[1], 
Constants.ANONYMOUS_PARAMETER);
+    }
+}


Reply via email to