Repository: incubator-eagle
Updated Branches:
  refs/heads/master 6dbdb4f72 -> 71a4bb013


[EAGLE-634] clean up configuration for MR running feeder

Author: wujinhu <wujinhu...@126.com>

Closes #530 from wujinhu/EAGLE-634.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/71a4bb01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/71a4bb01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/71a4bb01

Branch: refs/heads/master
Commit: 71a4bb013c2acabb54e03d8988f5b07c4923384c
Parents: 6dbdb4f
Author: wujinhu <wujinhu...@126.com>
Authored: Wed Oct 19 12:05:09 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Wed Oct 19 12:05:09 2016 +0800

----------------------------------------------------------------------
 eagle-jpm/eagle-jpm-mr-running/pom.xml          |  15 ---
 .../jpm/mr/running/MRRunningJobApplication.java |  26 ++--
 .../jpm/mr/running/MRRunningJobConfig.java      |  51 +++-----
 .../parser/MRJobEntityCreationHandler.java      |   3 +-
 .../jpm/mr/running/parser/MRJobParser.java      |  15 +--
 .../running/storm/MRRunningJobFetchSpout.java   |   7 +-
 .../mr/running/storm/MRRunningJobParseBolt.java |   7 +-
 ....running.MRRunningJobApplicationProvider.xml | 122 +++++--------------
 .../src/main/resources/application.conf         |  47 +++----
 9 files changed, 88 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/pom.xml 
b/eagle-jpm/eagle-jpm-mr-running/pom.xml
index aff1a90..34dcedc 100644
--- a/eagle-jpm/eagle-jpm-mr-running/pom.xml
+++ b/eagle-jpm/eagle-jpm-mr-running/pom.xml
@@ -37,21 +37,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-data-process</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.wso2.orbit.com.lmax</groupId>
-                    <artifactId>disruptor</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
             <groupId>org.jsoup</groupId>
             <artifactId>jsoup</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
index e8abf30..16e8ea7 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobApplication.java
@@ -27,7 +27,6 @@ import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 public class MRRunningJobApplication extends StormApplication {
@@ -51,33 +50,24 @@ public class MRRunningJobApplication extends 
StormApplication {
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrRunningJobFetchSpout";
         String boltName = "mrRunningJobParseBolt";
-        int parallelism = 
mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + 
spoutName);
-        int tasks = 
mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + spoutName);
-        if (parallelism > tasks) {
-            parallelism = tasks;
-        }
+        int tasks = mrRunningJobConfig.getConfig().getInt("stormConfig." + 
spoutName + "Tasks");
+
         topologyBuilder.setSpout(
             spoutName,
-            new MRRunningJobFetchSpout(
-                mrRunningJobConfig.getJobExtractorConfig(),
-                mrRunningJobConfig.getEndpointConfig(),
-                mrRunningJobConfig.getZkStateConfig()),
-            parallelism
+            new MRRunningJobFetchSpout(mrRunningJobConfig.getEndpointConfig(),
+                    mrRunningJobConfig.getZkStateConfig()),
+            tasks
         ).setNumTasks(tasks);
 
-        parallelism = 
mrRunningJobConfig.getConfig().getInt("envContextConfig.parallelismConfig." + 
boltName);
-        tasks = 
mrRunningJobConfig.getConfig().getInt("envContextConfig.tasks." + boltName);
-        if (parallelism > tasks) {
-            parallelism = tasks;
-        }
+        tasks = mrRunningJobConfig.getConfig().getInt("stormConfig." + 
boltName + "Tasks");
+
         topologyBuilder.setBolt(boltName,
             new MRRunningJobParseBolt(
                 mrRunningJobConfig.getEagleServiceConfig(),
                 mrRunningJobConfig.getEndpointConfig(),
-                mrRunningJobConfig.getJobExtractorConfig(),
                 mrRunningJobConfig.getZkStateConfig(),
                 confKeyKeys),
-            parallelism).setNumTasks(tasks).fieldsGrouping(spoutName, new 
Fields("appId"));
+                tasks).setNumTasks(tasks).fieldsGrouping(spoutName, new 
Fields("appId"));
         return topologyBuilder.createTopology();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
index 93bcd0c..2fe91d2 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/MRRunningJobConfig.java
@@ -41,12 +41,6 @@ public class MRRunningJobConfig implements Serializable {
 
     private EagleServiceConfig eagleServiceConfig;
 
-    public JobExtractorConfig getJobExtractorConfig() {
-        return jobExtractorConfig;
-    }
-
-    private JobExtractorConfig jobExtractorConfig;
-
     public EndpointConfig getEndpointConfig() {
         return endpointConfig;
     }
@@ -59,27 +53,21 @@ public class MRRunningJobConfig implements Serializable {
         public int zkSessionTimeoutMs;
         public int zkRetryTimes;
         public int zkRetryInterval;
-        public String zkPort;
     }
 
     public static class EagleServiceConfig implements Serializable {
         public String eagleServiceHost;
         public int eagleServicePort;
         public int readTimeoutSeconds;
-        public int maxFlushNum;
         public String username;
         public String password;
     }
 
-    public static class JobExtractorConfig implements Serializable {
+    public static class EndpointConfig implements Serializable {
         public String site;
+        public String[] rmUrls;
         public int fetchRunningJobInterval;
         public int parseJobThreadPoolSize;
-        public int topAndBottomTaskByElapsedTime;
-    }
-
-    public static class EndpointConfig implements Serializable {
-        public String[] rmUrls;
     }
 
     public Config getConfig() {
@@ -92,7 +80,6 @@ public class MRRunningJobConfig implements Serializable {
 
     private MRRunningJobConfig() {
         this.eagleServiceConfig = new EagleServiceConfig();
-        this.jobExtractorConfig = new JobExtractorConfig();
         this.endpointConfig = new EndpointConfig();
         this.zkStateConfig = new ZKStateConfig();
     }
@@ -116,32 +103,28 @@ public class MRRunningJobConfig implements Serializable {
         this.config = config;
 
         //parse eagle zk
-        this.zkStateConfig.zkQuorum = 
config.getString("zookeeperConfig.zkQuorum");
-        this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zookeeperConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = 
config.getInt("zookeeperConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = 
config.getInt("zookeeperConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = config.getString("zookeeperConfig.zkRoot");
+        this.zkStateConfig.zkQuorum = config.getString("zookeeper.zkQuorum");
+        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zookeeper.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = 
config.getInt("zookeeper.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = 
config.getInt("zookeeper.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zookeeper.zkRoot");
 
         // parse eagle service endpoint
-        this.eagleServiceConfig.eagleServiceHost = 
config.getString("eagleProps.eagleService.host");
-        String port = config.getString("eagleProps.eagleService.port");
+        this.eagleServiceConfig.eagleServiceHost = 
config.getString("service.host");
+        String port = config.getString("service.port");
         this.eagleServiceConfig.eagleServicePort = Integer.parseInt(port);
-        this.eagleServiceConfig.username = 
config.getString("eagleProps.eagleService.username");
-        this.eagleServiceConfig.password = 
config.getString("eagleProps.eagleService.password");
-        this.eagleServiceConfig.readTimeoutSeconds = 
config.getInt("eagleProps.eagleService.readTimeOutSeconds");
-        this.eagleServiceConfig.maxFlushNum = 
config.getInt("eagleProps.eagleService.maxFlushNum");
-        //parse job extractor
-        this.jobExtractorConfig.site = 
config.getString("jobExtractorConfig.site");
-        this.jobExtractorConfig.fetchRunningJobInterval = 
config.getInt("jobExtractorConfig.fetchRunningJobInterval");
-        this.jobExtractorConfig.parseJobThreadPoolSize = 
config.getInt("jobExtractorConfig.parseJobThreadPoolSize");
-        this.jobExtractorConfig.topAndBottomTaskByElapsedTime = 
config.getInt("jobExtractorConfig.topAndBottomTaskByElapsedTime");
+        this.eagleServiceConfig.username = 
config.getString("service.username");
+        this.eagleServiceConfig.password = 
config.getString("service.password");
+        this.eagleServiceConfig.readTimeoutSeconds = 
config.getInt("service.readTimeOutSeconds");
 
         //parse data source config
-        this.endpointConfig.rmUrls = 
config.getString("dataSourceConfig.rmUrls").split(",");
+        this.endpointConfig.rmUrls = 
config.getString("endpointConfig.rmUrls").split(",");
+        this.endpointConfig.site = config.getString("siteId");
+        this.endpointConfig.fetchRunningJobInterval = 
config.getInt("endpointConfig.fetchRunningJobInterval");
+        this.endpointConfig.parseJobThreadPoolSize = 
config.getInt("endpointConfig.parseJobThreadPoolSize");
 
         LOG.info("Successfully initialized MRRunningJobConfig");
-        LOG.info("site: " + this.jobExtractorConfig.site);
+        LOG.info("site: " + this.endpointConfig.site);
         LOG.info("eagle.service.host: " + 
this.eagleServiceConfig.eagleServiceHost);
         LOG.info("eagle.service.port: " + 
this.eagleServiceConfig.eagleServicePort);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
index 1a0fb61..ad80fd6 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobEntityCreationHandler.java
@@ -40,6 +40,7 @@ public class MRJobEntityCreationHandler {
     private MRRunningJobConfig.EagleServiceConfig eagleServiceConfig;
     private JobExecutionMetricsCreationListener jobMetricsListener;
     private TaskExecutionMetricsCreationListener taskMetricsListener;
+    private static final int MAX_FLUSH_NUM = 1000;
 
     public MRJobEntityCreationHandler(MRRunningJobConfig.EagleServiceConfig 
eagleServiceConfig) {
         this.eagleServiceConfig = eagleServiceConfig;
@@ -61,7 +62,7 @@ public class MRJobEntityCreationHandler {
             metricEntities = 
jobMetricsListener.generateMetrics((JobExecutionAPIEntity) entity);
             entities.addAll(metricEntities);
         }
-        if (entities.size() >= eagleServiceConfig.maxFlushNum) {
+        if (entities.size() >= MAX_FLUSH_NUM) {
             this.flush();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index e5e3444..797bf21 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -77,18 +77,19 @@ public class MRJobParser implements Runnable {
     private boolean first;
     private Set<String> finishedTaskIds;
     private List<String> configKeys;
-    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
+    private MRRunningJobConfig.EndpointConfig endpointConfig;
+    private static final int TOP_BOTTOM_TASKS_BY_ELASPED_TIME = 10;
 
     static {
         OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, 
true);
     }
 
-    public MRJobParser(MRRunningJobConfig.JobExtractorConfig 
jobExtractorConfig,
+    public MRJobParser(MRRunningJobConfig.EndpointConfig endpointConfig,
                        MRRunningJobConfig.EagleServiceConfig 
eagleServiceConfig,
                        AppInfo app, Map<String, JobExecutionAPIEntity> 
mrJobMap,
                        MRRunningJobManager runningJobManager, ResourceFetcher 
rmResourceFetcher,
                        List<String> configKeys) {
-        this.jobExtractorConfig = jobExtractorConfig;
+        this.endpointConfig = endpointConfig;
         this.app = app;
         this.mrJobEntityMap = new HashMap<>();
         this.mrJobEntityMap = mrJobMap;
@@ -99,7 +100,7 @@ public class MRJobParser implements Runnable {
 
         this.mrJobEntityCreationHandler = new 
MRJobEntityCreationHandler(eagleServiceConfig);
 
-        this.commonTags.put(MRJobTagName.SITE.toString(), 
jobExtractorConfig.site);
+        this.commonTags.put(MRJobTagName.SITE.toString(), endpointConfig.site);
         this.commonTags.put(MRJobTagName.USER.toString(), app.getUser());
         this.commonTags.put(MRJobTagName.JOB_QUEUE.toString(), app.getQueue());
         this.runningJobManager = runningJobManager;
@@ -403,7 +404,7 @@ public class MRJobParser implements Runnable {
             .filter(task -> 
task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
             .sorted(byElapsedTimeIncrease).iterator();
         int i = 0;
-        while (taskIteratorIncrease.hasNext() && i < 
jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+        while (taskIteratorIncrease.hasNext() && i < 
TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
             MRTask mrTask = taskIteratorIncrease.next();
             if (mrTask.getElapsedTime() > 0) {
                 i++;
@@ -415,7 +416,7 @@ public class MRJobParser implements Runnable {
             .filter(task -> 
task.getState().equals(Constants.TaskState.SUCCEEDED.toString()))
             .sorted(byElapsedTimeDecrease).iterator();
         i = 0;
-        while (taskIteratorDecrease.hasNext() && i < 
jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+        while (taskIteratorDecrease.hasNext() && i < 
TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
             MRTask mrTask = taskIteratorDecrease.next();
             if (mrTask.getElapsedTime() > 0) {
                 i++;
@@ -427,7 +428,7 @@ public class MRJobParser implements Runnable {
             .filter(task -> 
task.getState().equals(Constants.TaskState.RUNNING.toString()))
             .sorted(byElapsedTimeDecrease).iterator();
         i = 0;
-        while (taskIteratorDecrease.hasNext() && i < 
jobExtractorConfig.topAndBottomTaskByElapsedTime) {
+        while (taskIteratorDecrease.hasNext() && i < 
TOP_BOTTOM_TASKS_BY_ELASPED_TIME) {
             MRTask mrTask = taskIteratorDecrease.next();
             if (mrTask.getElapsedTime() > 0) {
                 i++;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
index 27d1575..7c910e7 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobFetchSpout.java
@@ -41,7 +41,6 @@ import java.util.*;
 
 public class MRRunningJobFetchSpout extends BaseRichSpout {
     private static final Logger LOG = 
LoggerFactory.getLogger(MRRunningJobFetchSpout.class);
-    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
     private MRRunningJobConfig.EndpointConfig endpointConfig;
     private MRRunningJobConfig.ZKStateConfig zkStateConfig;
     private ResourceFetcher resourceFetcher;
@@ -50,10 +49,8 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
     private transient MRRunningJobManager runningJobManager;
     private Set<String> runningYarnApps;
 
-    public MRRunningJobFetchSpout(MRRunningJobConfig.JobExtractorConfig 
jobExtractorConfig,
-                                  MRRunningJobConfig.EndpointConfig 
endpointConfig,
+    public MRRunningJobFetchSpout(MRRunningJobConfig.EndpointConfig 
endpointConfig,
                                   MRRunningJobConfig.ZKStateConfig 
zkStateConfig) {
-        this.jobExtractorConfig = jobExtractorConfig;
         this.endpointConfig = endpointConfig;
         this.zkStateConfig = zkStateConfig;
         this.init = false;
@@ -140,7 +137,7 @@ public class MRRunningJobFetchSpout extends BaseRichSpout {
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
-            Utils.sleep(jobExtractorConfig.fetchRunningJobInterval);
+            Utils.sleep(endpointConfig.fetchRunningJobInterval);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
index 4e0cdbc..9ebc1a7 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningJobParseBolt.java
@@ -42,7 +42,6 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
     private static final Logger LOG = 
LoggerFactory.getLogger(MRRunningJobParseBolt.class);
 
     private MRRunningJobConfig.EndpointConfig endpointConfig;
-    private MRRunningJobConfig.JobExtractorConfig jobExtractorConfig;
     private MRRunningJobConfig.ZKStateConfig zkStateConfig;
     private ExecutorService executorService;
     private Map<String, MRJobParser> runningMRParsers;
@@ -53,12 +52,10 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
 
     public MRRunningJobParseBolt(MRRunningJobConfig.EagleServiceConfig 
eagleServiceConfig,
                                  MRRunningJobConfig.EndpointConfig 
endpointConfig,
-                                 MRRunningJobConfig.JobExtractorConfig 
jobExtractorConfig,
                                  MRRunningJobConfig.ZKStateConfig 
zkStateConfig,
                                  List<String> configKeys) {
         this.eagleServiceConfig = eagleServiceConfig;
         this.endpointConfig = endpointConfig;
-        this.jobExtractorConfig = jobExtractorConfig;
         this.runningMRParsers = new HashMap<>();
         this.zkStateConfig = zkStateConfig;
         this.configKeys = configKeys;
@@ -66,7 +63,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
-        this.executorService = 
Executors.newFixedThreadPool(jobExtractorConfig.parseJobThreadPoolSize);
+        this.executorService = 
Executors.newFixedThreadPool(endpointConfig.parseJobThreadPoolSize);
 
         this.runningJobManager = new MRRunningJobManager(zkStateConfig);
         this.resourceFetcher = new RMResourceFetcher(endpointConfig.rmUrls);
@@ -81,7 +78,7 @@ public class MRRunningJobParseBolt extends BaseRichBolt {
 
         MRJobParser applicationParser;
         if (!runningMRParsers.containsKey(appInfo.getId())) {
-            applicationParser = new MRJobParser(jobExtractorConfig, 
eagleServiceConfig,
+            applicationParser = new MRJobParser(endpointConfig, 
eagleServiceConfig,
                     appInfo, mrJobs, runningJobManager, this.resourceFetcher, 
configKeys);
             runningMRParsers.put(appInfo.getId(), applicationParser);
             LOG.info("create application parser for {}", appInfo.getId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
index 4b95b36..4063b3a 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.running.MRRunningJobApplicationProvider.xml
@@ -18,122 +18,60 @@
 
 <application>
     <type>MR_RUNNING_JOB_APP</type>
-    <name>MR Running Job Monitoring</name>
+    <name>Map Reduce Running Job Monitoring</name>
     <version>0.5.0-incubating</version>
     <configuration>
-        <!-- org.apache.eagle.jpm.spark.running.SparkRunningJobAppConfig -->
-        <property>
-            <name>jobExtractorConfig.site</name>
-            <displayName>Site ID</displayName>
-            <value>sandbox</value>
-        </property>
         <property>
             <name>workers</name>
-            <displayName>storm worker number</displayName>
+            <displayName>Worker Number</displayName>
+            <description>the number of storm workers will be used</description>
             <value>4</value>
         </property>
         <property>
-            
<name>envContextConfig.parallelismConfig.mrRunningJobFetchSpout</name>
-            <value>1</value>
-        </property>
-        <property>
-            
<name>envContextConfig.parallelismConfig.mrRunningJobParseBolt</name>
-            <value>8</value>
-        </property>
-        <property>
-            <name>envContextConfig.tasks.mrRunningJobFetchSpout</name>
+            <name>stormConfig.mrRunningJobFetchSpoutTasks</name>
+            <displayName>Read Task Number</displayName>
+            <description>number of tasks to fetch map reduce running jobs from 
resource manager</description>
             <value>1</value>
         </property>
         <property>
-            <name>envContextConfig.tasks.mrRunningJobParseBolt</name>
-            <value>8</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkQuorum</name>
-            <displayName>zkQuorum</displayName>
-            <description>Zookeeper Quorum</description>
-            <value>sandbox.hortonworks.com:2181</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkPort</name>
-            <displayName>zkPort</displayName>
-            <description>Zookeeper Port</description>
-            <value>2181</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkSessionTimeoutMs</name>
-            <displayName>zkSessionTimeoutMs</displayName>
-            <description>Zookeeper session timeoutMs</description>
-            <value>15000</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryTimes</name>
-            <displayName>zkRetryTimes</displayName>
-            <description>zookeeperConfig.zkRetryTimes</description>
-            <value>3</value>
-        </property>
-        <property>
-            <name>zookeeperConfig.zkRetryInterval</name>
-            <displayName>zkRetryInterval</displayName>
-            <description>zookeeperConfig.zkRetryInterval</description>
-            <value>20000</value>
+            <name>stormConfig.mrRunningJobParseBoltTasks</name>
+            <displayName>Parse Task Number</displayName>
+            <description>number of tasks to parse map reduce running jobs got 
from resource manager</description>
+            <value>5</value>
         </property>
+
         <property>
-            <name>zookeeperConfig.zkRoot</name>
+            <name>zookeeper.zkRoot</name>
+            <displayName>Zookeeper Root Path</displayName>
+            <description>zkRoot that used to save context for this 
application</description>
             <value>/apps/mr/runningSandbox</value>
+            <required>true</required>
         </property>
+
         <property>
-            <name>eagleProps.eagleService.host</name>
-            <description>eagleProps.eagleService.host</description>
-            <value>sandbox.hortonworks.com</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.port</name>
-            <description>eagleProps.eagleService.port</description>
-            <value>9099</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.username</name>
-            <description>eagleProps.eagleService.username</description>
-            <value>admin</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.password</name>
-            <description>eagleProps.eagleService.password</description>
-            <value>secret</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.readTimeOutSeconds</name>
-            
<description>eagleProps.eagleService.readTimeOutSeconds</description>
-            <value>60</value>
-        </property>
-        <property>
-            <name>eagleProps.eagleService.maxFlushNum</name>
-            <description>eagleProps.eagleService.maxFlushNum</description>
-            <value>1000</value>
-        </property>
-        <property>
-            <name>jobExtractorConfig.fetchRunningJobInterval</name>
-            
<description>jobExtractorConfig.fetchRunningJobInterval</description>
+            <name>endpointConfig.fetchRunningJobInterval</name>
+            <displayName>Interval of Fetch Running Job From Resource 
Manager</displayName>
+            <description>interval of fetch map reduce running jobs from 
resource manager</description>
             <value>60</value>
         </property>
         <property>
-            <name>jobExtractorConfig.parseJobThreadPoolSize</name>
-            
<description>jobExtractorConfig.parseJobThreadPoolSize</description>
+            <name>endpointConfig.parseJobThreadPoolSize</name>
+            <displayName>Parse Job ThreadPool Size in Each Parse 
Task</displayName>
+            <description>parse job thread pool size in each parse 
task</description>
             <value>5</value>
         </property>
         <property>
-            <name>jobExtractorConfig.topAndBottomTaskByElapsedTime</name>
-            
<description>jobExtractorConfig.topAndBottomTaskByElapsedTime</description>
-            <value>5</value>
-        </property>
-        <property>
-            <name>dataSourceConfig.rmUrls</name>
-            <description>dataSourceConfig.rmUrls</description>
+            <name>endpointConfig.rmUrls</name>
+            <displayName>Resource Manager URLs</displayName>
+            <description>resource manager urls of this site</description>
             <value>http://sandbox.hortonworks.com:8088</value>
+            <required>true</required>
         </property>
+
         <property>
             <name>MRConfigureKeys.jobConfigKey</name>
+            <displayName>Map Reduce Extracted Configuration Keys</displayName>
+            <description>which configures will be extracted from map reduce 
job configurations</description>
             <value>mapreduce.map.output.compress,
                 mapreduce.map.output.compress.codec,
                 mapreduce.output.fileoutputformat.compress,
@@ -148,6 +86,8 @@
         </property>
         <property>
             <name>MRConfigureKeys.jobNameKey</name>
+            <displayName>Map Reduce Job Name Key</displayName>
+            <description>User use -Dkey=value to specify name of a job when 
submit. use this to extract job name from job configuration</description>
             <value>eagle.job.name</value>
         </property>
     </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/71a4bb01/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
index 9c354c7..6d1be06 100644
--- a/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-running/src/main/resources/application.conf
@@ -18,49 +18,38 @@
   "mode":"LOCAL",
   application.storm.nimbusHost=localhost,
   "workers" : 8,
-  "envContextConfig" : {
-    "stormConfigFile" : "storm.yaml",
-    "parallelismConfig" : {
-      "mrRunningJobFetchSpout" : 1,
-      "mrRunningJobParseBolt" : 16
-    },
-    "tasks" : {
-      "mrRunningJobFetchSpout" : 1,
-      "mrRunningJobParseBolt" : 16
-    }
-  },
+  "siteId" : "sandbox",
 
-  "jobExtractorConfig" : {
-    "site" : "sandbox",
-    "fetchRunningJobInterval" : 60,
-    "parseJobThreadPoolSize" : 6,
-    "topAndBottomTaskByElapsedTime" : 10
+  "stormConfig" : {
+    "mrRunningJobFetchSpoutTasks" : 1,
+    "mrRunningJobParseBoltTasks" : 8
   },
 
-  "zookeeperConfig" : {
+  "zookeeper" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
     "zkPort" : "2181",
-    "zkRoot" : "/apps/mr/running",
+    "zkRoot" : "/apps/mr/runningSandbox",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000
   },
 
-  "dataSourceConfig" : {
-    "rmUrls": "http://sandbox.hortonworks.com:50030";
+  "endpointConfig" : {
+    "rmUrls": "http://sandbox.hortonworks.com:50030";,
+    "fetchRunningJobInterval" : 60,
+    "parseJobThreadPoolSize" : 6,
   },
 
-  "eagleProps" : {
-    "eagleService": {
-      "host": "sandbox.hortonworks.com",
-      "port": 9099,
-      "readTimeOutSeconds" : 20,
-      "maxFlushNum" : 500,
-      "username": "admin",
-      "password": "secret"
-    }
+
+  "service": {
+    "host": "sandbox.hortonworks.com",
+    "port": 9099,
+    "readTimeOutSeconds" : 20,
+    "username": "admin",
+    "password": "secret"
   },
 
+
   "MRConfigureKeys" : {
     "jobNameKey" : "eagle.job.name",
     "jobConfigKey" : "mapreduce.map.output.compress, 
mapreduce.map.output.compress.codec, 
mapreduce.output.fileoutputformat.compress, 
mapreduce.output.fileoutputformat.compress.type, 
mapreduce.output.fileoutputformat.compress.codec, mapred.output.format.class, 
eagle.job.runid, eagle.job.runidfieldname, eagle.job.name, 
eagle.job.normalizedfieldname, eagle.alert.email, eagle.job.alertemailaddress, 
dataplatform.etl.info, mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, 
mapreduce.map.java.opts, mapreduce.reduce.java.opts"

Reply via email to