Repository: eagle Updated Branches: refs/heads/master 0071c79ab -> 33ad0a58b
[EAGLE-959] Add a configuration to limit the total number of apps to be returned in SparkHistoryJobApp https://issues.apache.org/jira/browse/EAGLE-959 Author: Zhao, Qingwen <[email protected]> Closes #875 from qingwen220/EAGLE-959. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/33ad0a58 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/33ad0a58 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/33ad0a58 Branch: refs/heads/master Commit: 33ad0a58b34745afb01887f7b042811f042cfb61 Parents: 0071c79 Author: Zhao, Qingwen <[email protected]> Authored: Thu Mar 16 11:18:16 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Thu Mar 16 11:18:16 2017 +0800 ---------------------------------------------------------------------- .../running/storm/MRRunningAppMetricBolt.java | 45 +++++++++++--------- .../spark/history/SparkHistoryJobAppConfig.java | 6 +++ .../history/storm/SparkHistoryJobSpout.java | 7 ++- ...spark.history.SparkHistoryJobAppProvider.xml | 6 +++ .../util/resourcefetch/RMResourceFetcher.java | 35 +++++++++------ 5 files changed, 65 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java index 28a4e7a..aa62d30 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/storm/MRRunningAppMetricBolt.java @@ -152,26 +152,31 @@ public class MRRunningAppMetricBolt extends BaseRichBolt { public List<YarnAppAPIEntity> parseAcceptedApp() { List<YarnAppAPIEntity> acceptedApps = new ArrayList<>(); try { - List<AppInfo> apps = fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB); - for (AppInfo app : apps) { - Map<String, String> tags = new HashMap<>(); - tags.put(AppStreamInfo.SITE, config.getConfig().getString("siteId")); - tags.put(AppStreamInfo.ID, app.getId()); - tags.put(AppStreamInfo.QUEUE, app.getQueue()); - tags.put(AppStreamInfo.USER, app.getUser()); - - YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity(); - appAPIEntity.setTags(tags); - appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId())); - appAPIEntity.setAppName(app.getName()); - appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage()); - appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage()); - appAPIEntity.setElapsedTime(app.getElapsedTime()); - appAPIEntity.setStartedTime(app.getStartedTime()); - appAPIEntity.setState(app.getState()); - appAPIEntity.setTimestamp(app.getStartedTime()); - acceptedApps.add(appAPIEntity); - collector.emit(new Values("", convertAppToStream(appAPIEntity))); + List<AppInfo> apps = fetcher.getResource(Constants.ResourceType.ACCEPTED_JOB, + config.getEndpointConfig().limitPerRequest); + + if (apps != null) { + LOG.info("successfully fetch {} accepted jobs from {}", apps.size(), fetcher.getSelector().getSelectedUrl()); + for (AppInfo app : apps) { + Map<String, String> tags = new HashMap<>(); + tags.put(AppStreamInfo.SITE, config.getConfig().getString("siteId")); + tags.put(AppStreamInfo.ID, app.getId()); + tags.put(AppStreamInfo.QUEUE, app.getQueue()); + tags.put(AppStreamInfo.USER, app.getUser()); + + YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity(); + appAPIEntity.setTags(tags); + appAPIEntity.setTrackingUrl(buildAcceptedAppTrackingURL(app.getId())); + appAPIEntity.setAppName(app.getName()); + appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage()); + appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage()); + appAPIEntity.setElapsedTime(app.getElapsedTime()); + appAPIEntity.setStartedTime(app.getStartedTime()); + appAPIEntity.setState(app.getState()); + appAPIEntity.setTimestamp(app.getStartedTime()); + acceptedApps.add(appAPIEntity); + collector.emit(new Values("", convertAppToStream(appAPIEntity))); + } } } catch (Exception e) { LOG.error("fetch accepted apps failed {}", e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java index 9646fb1..d60656e 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java @@ -98,6 +98,11 @@ public class SparkHistoryJobAppConfig implements Serializable { this.stormConfig.numOfSpoutTasks = config.getInt("topology.numOfSpoutTasks"); this.stormConfig.numOfParserBoltExecutors = config.getInt("topology.numOfParseBoltExecutors"); this.stormConfig.numOfParserBoltTasks = config.getInt("topology.numOfParserBoltTasks"); + this.stormConfig.requestLimit = ""; + if (config.hasPath("topology.requestLimit")) { + this.stormConfig.requestLimit = config.getString("topology.requestLimit"); + } + } public static class ZKStateConfig implements Serializable { @@ -117,6 +122,7 @@ public class SparkHistoryJobAppConfig implements Serializable { public static class StormConfig implements Serializable { public String siteId; public int spoutCrawlInterval; + public String requestLimit; public int numOfSpoutExecutors; public int numOfSpoutTasks; public int numOfParserBoltExecutors; http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java index 02c68b9..4ca01f1 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobSpout.java @@ -72,9 +72,12 @@ public class SparkHistoryJobSpout extends BaseRichSpout { calendar.setTimeInMillis(this.lastFinishAppTime); if (fetchTime - this.lastFinishAppTime > this.config.stormConfig.spoutCrawlInterval) { LOG.info("Last finished time = {}", calendar.getTime()); - List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, Long.toString(lastFinishAppTime)); + List<AppInfo> appInfos = rmFetch.getResource(Constants.ResourceType.COMPLETE_SPARK_JOB, + Long.toString(lastFinishAppTime), + config.stormConfig.requestLimit); if (appInfos != null) { - LOG.info("Get " + appInfos.size() + " from yarn resource manager."); + LOG.info("Get {} applications with limit {} from yarn resource manager", appInfos.size(), + config.stormConfig.requestLimit); for (AppInfo app : appInfos) { String appId = app.getId(); if (!zkState.hasApplication(appId)) { http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml index 4c4d1cd..290bc07 100644 --- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml +++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.spark.history.SparkHistoryJobAppProvider.xml @@ -59,6 +59,12 @@ <value>10000</value> </property> <property> + <name>topology.requestLimit</name> + <displayName>limit of applications in each request</displayName> + <description>Limit of applications in each request</description> + <value>100</value> + </property> + <property> <name>topology.message.timeout.secs</name> <displayName>topology message timeout (secs)</displayName> <description>default timeout is 300s</description> http://git-wip-us.apache.org/repos/asf/eagle/blob/33ad0a58/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java index 2e967bc..266c6b1 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/RMResourceFetcher.java @@ -19,6 +19,7 @@ */ package org.apache.eagle.jpm.util.resourcefetch; +import com.fasterxml.jackson.databind.util.ContainerBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.jpm.util.Constants; @@ -47,7 +48,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { private static final Logger LOG = LoggerFactory.getLogger(RMResourceFetcher.class); private final HAURLSelector selector; //private final ServiceURLBuilder jobListServiceURLBuilder; - private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder; + //private final ServiceURLBuilder sparkCompleteJobServiceURLBuilder; private static final ObjectMapper OBJ_MAPPER = new ObjectMapper(); static { @@ -56,7 +57,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { public RMResourceFetcher(String[] rmBasePaths) { //this.jobListServiceURLBuilder = new JobListServiceURLBuilderImpl(); - this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl(); + //this.sparkCompleteJobServiceURLBuilder = new SparkCompleteJobServiceURLBuilderImpl(); this.selector = new HAURLSelectorImpl( rmBasePaths, new RmActiveTestURLBuilderImpl(), @@ -111,16 +112,27 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { Constants.ANONYMOUS_PARAMETER); } - private String getMRFinishedJobURL(String lastFinishedTime) { + private String getFinishedJobURL(Constants.JobType jobType, Object... parameter) { String url = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); - return url + "/" + Constants.V2_APPS_URL - + "?applicationTypes=MAPREDUCE&state=FINISHED&finishedTimeBegin=" - + lastFinishedTime + "&" + Constants.ANONYMOUS_PARAMETER; + String lastFinishedTime = (String) parameter[0]; + String limit = ""; + if (parameter.length > 1) { + limit = (String) parameter[1]; + } + limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit); + return String.format("%s/%s?applicationTypes=%s%s&state=FINISHED&finishedTimeBegin=%s&%s", + url, Constants.V2_APPS_URL, jobType, limit, lastFinishedTime, Constants.ANONYMOUS_PARAMETER); } - private String getAcceptedAppURL() { + private String getAcceptedAppURL(Object... parameter) { + String limit = ""; + if (parameter.length > 0) { + limit = (String) parameter[0]; + } + limit = ((limit == null || limit.isEmpty()) ? "" : "&limit=" + limit); + String baseUrl = URLUtil.removeTrailingSlash(selector.getSelectedUrl()); - return String.format("%s/%s?state=ACCEPTED&%s", baseUrl, Constants.V2_APPS_URL, Constants.ANONYMOUS_PARAMETER); + return String.format("%s/%s?state=ACCEPTED%s&%s", baseUrl, Constants.V2_APPS_URL, limit, Constants.ANONYMOUS_PARAMETER); } private List<AppInfo> doFetchRunningApplicationsList(Constants.JobType jobType, @@ -189,7 +201,7 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { Object... parameter) throws Exception { List<AppInfo> apps = new ArrayList<>(); try { - String url = getAcceptedAppURL(); + String url = getAcceptedAppURL(parameter); return doFetchApplicationsList(url, compressionType); } catch (Exception e) { LOG.error("Catch an exception when query {} : {}", selector.getSelectedUrl(), e.getMessage(), e); @@ -201,14 +213,13 @@ public class RMResourceFetcher implements ResourceFetcher<AppInfo> { selector.checkUrl(); switch (resourceType) { case COMPLETE_SPARK_JOB: - final String urlString = sparkCompleteJobServiceURLBuilder.build(selector.getSelectedUrl(), (String) parameter[0]); - return doFetchApplicationsList(urlString, compressionType); + return doFetchApplicationsList(getFinishedJobURL(Constants.JobType.SPARK, parameter), compressionType); case RUNNING_SPARK_JOB: return doFetchRunningApplicationsList(Constants.JobType.SPARK, compressionType, parameter); case RUNNING_MR_JOB: return doFetchRunningApplicationsList(Constants.JobType.MAPREDUCE, compressionType, parameter); case COMPLETE_MR_JOB: - return doFetchApplicationsList(getMRFinishedJobURL((String) parameter[0]), compressionType); + return doFetchApplicationsList(getFinishedJobURL(Constants.JobType.MAPREDUCE, parameter), compressionType); case ACCEPTED_JOB: return doFetchAcceptedApplicationList(compressionType, parameter); default:
