Repository: eagle
Updated Branches:
  refs/heads/master 0071c79ab -> 33ad0a58b


[EAGLE-959] Add a configuration to limit the total number of apps to be 
returned in SparkHistoryJobApp

https://issues.apache.org/jira/browse/EAGLE-959

Author: Zhao, Qingwen <[email protected]>

Closes #875 from qingwen220/EAGLE-959.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/33ad0a58
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/33ad0a58
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/33ad0a58

Branch: refs/heads/master
Commit: 33ad0a58b34745afb01887f7b042811f042cfb61
Parents: 0071c79
Author: Zhao, Qingwen <[email protected]>
Authored: Thu Mar 16 11:18:16 2017 +0800
Committer: Zhao, Qingwen <[email protected]>
Committed: Thu Mar 16 11:18:16 2017 +0800

----------------------------------------------------------------------
 .../running/storm/MRRunningAppMetricBolt.java   | 45 +++++++++++---------
 .../spark/history/SparkHistoryJobAppConfig.java |  6 +++
 .../history/storm/SparkHistoryJobSpout.java     |  7 ++-
 ...spark.history.SparkHistoryJobAppProvider.xml |  6 +++
 .../util/resourcefetch/RMResourceFetcher.java   | 35 +++++++++------
 5 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
index 28a4e7a..aa62d30 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java
@@ -152,26 +152,31 @@ public class MRRunningAppMetricBolt extends BaseRichBolt {
     public List<YarnAppAPIEntity> parseAcceptedApp() {
         List<YarnAppAPIEntity> acceptedApps = new ArrayList<>();
         try {
-            List<AppInfo> apps = 
fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB);
-            for (AppInfo app : apps) {
-                Map<String, String> tags = new HashMap<>();
-                tags.put(AppStreamInfo.SITE, 
config.getConfig().getString("siteId"));
-                tags.put(AppStreamInfo.ID, app.getId());
-                tags.put(AppStreamInfo.QUEUE, app.getQueue());
-                tags.put(AppStreamInfo.USER, app.getUser());
-
-                YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
-                appAPIEntity.setTags(tags);
-                
appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId()));
-                appAPIEntity.setAppName(app.getName());
-                
appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
-                
appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
-                appAPIEntity.setElapsedTime(app.getElapsedTime());
-                appAPIEntity.setStartedTime(app.getStartedTime());
-                appAPIEntity.setState(app.getState());
-                appAPIEntity.setTimestamp(app.getStartedTime());
-                acceptedApps.add(appAPIEntity);
-                collector.emit(new Values("", 
convertAppToStream(appAPIEntity)));
+            List<AppInfo> apps = 
fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB,
+                    config.getEndpointConfig().limitPerRequest);
+
+            if (apps != null) {
+                LOG.info("successfully fetch {} accepted jobs from {}", 
apps.size(), fetcher.getSelector().getSelectedUrl());
+                for (AppInfo app : apps) {
+                    Map<String, String> tags = new HashMap<>();
+                    tags.put(AppStreamInfo.SITE, 
config.getConfig().getString("siteId"));
+                    tags.put(AppStreamInfo.ID, app.getId());
+                    tags.put(AppStreamInfo.QUEUE, app.getQueue());
+                    tags.put(AppStreamInfo.USER, app.getUser());
+
+                    YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity();
+                    appAPIEntity.setTags(tags);
+                    
appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId()));
+                    appAPIEntity.setAppName(app.getName());
+                    
appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage());
+                    
appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage());
+                    appAPIEntity.setElapsedTime(app.getElapsedTime());
+                    appAPIEntity.setStartedTime(app.getStartedTime());
+                    appAPIEntity.setState(app.getState());
+                    appAPIEntity.setTimestamp(app.getStartedTime());
+                    acceptedApps.add(appAPIEntity);
+                    collector.emit(new Values("", 
convertAppToStream(appAPIEntity)));
+                }
             }
         } catch (Exception e) {
             LOG.error("fetch accepted apps failed {}", e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 9646fb1..d60656e 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -98,6 +98,11 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
         this.stormConfig.numOfSpoutTasks = 
config.getInt("topology.numOfSpoutTasks");
         this.stormConfig.numOfParserBoltExecutors = 
config.getInt("topology.numOfParseBoltExecutors");
         this.stormConfig.numOfParserBoltTasks = 
config.getInt("topology.numOfParserBoltTasks");
+        this.stormConfig.requestLimit = "";
+        if (config.hasPath("topology.requestLimit")) {
+            this.stormConfig.requestLimit = 
config.getString("topology.requestLimit");
+        }
+
     }
 
     public static class ZKStateConfig implements Serializable {
@@ -117,6 +122,7 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
     public static class StormConfig implements Serializable {
         public String siteId;
         public int spoutCrawlInterval;
+        public String requestLimit;
         public int numOfSpoutExecutors;
         public int numOfSpoutTasks;
         public int numOfParserBoltExecutors;

http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
index 02c68b9..4ca01f1 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java
@@ -72,9 +72,12 @@ public class SparkHistoryJobSpout extends BaseRichSpout {
             calendar.setTimeInMillis(this.lastFinishAppTime);
             if (fetchTime - this.lastFinishAppTime > 
this.config.stormConfig.spoutCrawlInterval) {
                 LOG.info("Last finished time = {}", calendar.getTime());
-                List<AppInfo> appInfos = 
rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, 
Long.toString(lastFinishAppTime));
+                List<AppInfo> appInfos = 
rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB,
+                        Long.toString(lastFinishAppTime),
+                        config.stormConfig.requestLimit);
                 if (appInfos != null) {
-                    LOG.info("Get " + appInfos.size() + " from yarn resource 
manager.");
+                    LOG.info("Get {} applications with limit {} from yarn 
resource manager", appInfos.size(),
+                            config.stormConfig.requestLimit);
                     for (AppInfo app : appInfos) {
                         String appId = app.getId();
                         if (!zkState.hasApplication(appId)) {

http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
index 4c4d1cd..290bc07 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml
@@ -59,6 +59,12 @@
             <value>10000</value>
         </property>
         <property>
+            <name>topology.requestLimit</name>
+            <displayName>limit of applications in each request</displayName>
+            <description>Limit of applications in each request</description>
+            <value>100</value>
+        </property>
+        <property>
             <name>topology.message.timeout.secs</name>
             <displayName>topology message timeout (secs)</displayName>
             <description>default timeout is 300s</description>

http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
index 2e967bc..266c6b1 100644
--- 
a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
+++ 
b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java
@@ -19,6 +19,7 @@
  */
 package org.apache.eagle.jpm.util.resourcefetch;
 
+import com.fasterxml.jackson.databind.util.ContainerBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.jpm.util.Constants;
@@ -47,7 +48,7 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
     private static final Logger LOG = 
LoggerFactory.getLogger(RMResourceFetcher.class);
     private final HAURLSelector selector;
     //private final ServiceURLBuilder jobListServiceURLBuilder;
-    private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
+    //private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder;
     private static final ObjectMapper OBJ_MAPPER = new ObjectMapper();
 
     static {
@@ -56,7 +57,7 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
 
     public RMResourceFetcher(String[] rmBasePaths) {
         //this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl();
-        this.sparkCompleteJobServiceURLBuilder = new 
SparkCompleteJobServiceURLBuilderImpl();
+        //this.sparkCompleteJobServiceURLBuilder = new 
SparkCompleteJobServiceURLBuilderImpl();
         this.selector = new HAURLSelectorImpl(
                 rmBasePaths,
                 new RmActiveTestURLBuilderImpl(),
@@ -111,16 +112,27 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
                 Constants.ANONYMOUS_PARAMETER);
     }
 
-    private String getMRFinishedJobURL(String lastFinishedTime) {
+    private String getFinishedJobURL(Constants.JobType jobType, Object... 
parameter) {
         String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl());
-        return url + "/" + Constants.V2_APPS_URL
-                + 
"?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin="
-                + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER;
+        String lastFinishedTime = (String) parameter[0];
+        String limit = "";
+        if (parameter.length > 1) {
+            limit = (String) parameter[1];
+        }
+        limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit);
+        return 
String.format("%s/%s?applicationTypes=%s%s&state=FINISHED&finishedTimeBegin=%s&%s",
+                url, Constants.V2_APPS_URL, jobType, limit, lastFinishedTime, 
Constants.ANONYMOUS_PARAMETER);
     }
 
-    private String getAcceptedAppURL() {
+    private String getAcceptedAppURL(Object... parameter) {
+        String limit = "";
+        if (parameter.length > 0) {
+            limit = (String) parameter[0];
+        }
+        limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit);
+
         String baseUrl = 
URLUtil.removeTrailingSlash(selector.getSelectedUrl());
-        return String.format("%s/%s?state=ACCEPTED&%s", baseUrl, 
Constants.V2_APPS_URL, Constants.ANONYMOUS_PARAMETER);
+        return String.format("%s/%s?state=ACCEPTED%s&%s", baseUrl, 
Constants.V2_APPS_URL, limit, Constants.ANONYMOUS_PARAMETER);
     }
 
     private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType 
jobType,
@@ -189,7 +201,7 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
                                                          Object... parameter) 
throws Exception {
         List<AppInfo> apps = new ArrayList<>();
         try {
-            String url = getAcceptedAppURL();
+            String url = getAcceptedAppURL(parameter);
             return doFetchApplicationsList(url, compressionType);
         } catch (Exception e) {
             LOG.error("Catch an exception when query {} : {}", 
selector.getSelectedUrl(), e.getMessage(), e);
@@ -201,14 +213,13 @@ public class RMResourceFetcher implements 
ResourceFetcher<AppInfo> {
         selector.checkUrl();
         switch (resourceType) {
             case COMPLETE_SPARK_JOB:
-                final String urlString = 
sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) 
parameter[0]);
-                return doFetchApplicationsList(urlString, compressionType);
+                return 
doFetchApplicationsList(getFinishedJobURL(Constants.JobType.SPARK, parameter), 
compressionType);
             case RUNNING_SPARK_JOB:
                 return doFetchRunningApplicationsList(Constants.JobType.SPARK, 
compressionType, parameter);
             case RUNNING_MR_JOB:
                 return 
doFetchRunningApplicationsList(Constants.JobType.MAPREDUCE, compressionType, 
parameter);
             case COMPLETE_MR_JOB:
-                return doFetchApplicationsList(getMRFinishedJobURL((String) 
parameter[0]), compressionType);
+                return 
doFetchApplicationsList(getFinishedJobURL(Constants.JobType.MAPREDUCE, 
parameter), compressionType);
             case ACCEPTED_JOB:
                 return doFetchAcceptedApplicationList(compressionType, 
parameter);
             default:

Reply via email to