Repository: eagle Updated Branches: refs/heads/master a27289fd3 -> 4a5c4a43c
[EAGLE-915] Fetch accepted MR jobs to assist queue analysis https://issues.apache.org/jira/browse/EAGLE-915 Author: Zhao, Qingwen <[email protected]> Closes #826 from qingwen220/EAGLE-915. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/4a5c4a43 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/4a5c4a43 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/4a5c4a43 Branch: refs/heads/master Commit: 4a5c4a43c1c38bea07867306028e5f3a4b856552 Parents: a27289f Author: Zhao, Qingwen <[email protected]> Authored: Tue Feb 21 14:50:33 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Tue Feb 21 14:50:33 2017 +0800 ---------------------------------------------------------------------- .../hadoop/queue/HadoopQueueRunningApp.java | 29 +++-- .../queue/common/HadoopClusterConstants.java | 24 +--- .../common/YarnClusterResourceURLBuilder.java | 14 ++- .../crawler/ClusterMetricsParseListener.java | 6 +- .../queue/crawler/RunningAppParseListener.java | 66 ++++++++--- .../queue/crawler/RunningAppsCrawler.java | 5 +- .../crawler/SchedulerInfoParseListener.java | 8 +- .../model/HadoopQueueEntityRepository.java | 2 + .../hadoop/queue/model/applications/App.java | 19 ++++ .../queue/model/applications/AppStreamInfo.java | 53 +++++++++ .../model/applications/YarnAppAPIEntity.java | 111 +++++++++++++++++++ .../queue/model/scheduler/QueueStreamInfo.java | 79 +++++++++++++ .../storm/HadoopQueueMetricPersistBolt.java | 110 +++++++----------- .../queue/storm/HadoopQueueRunningSpout.java | 4 +- ...doop.queue.HadoopQueueRunningAppProvider.xml | 63 ++++++++++- .../resourcefetch/ha/AbstractURLSelector.java | 2 +- 16 files changed, 472 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 68ca8c7..4708baa 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -17,37 +17,52 @@ package org.apache.eagle.hadoop.queue; import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.messaging.StormStreamSink; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; +import java.util.HashMap; +import java.util.Map; + public class HadoopQueueRunningApp extends StormApplication { public StormTopology execute(Config config, StormEnvironment environment) { HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config); + String spoutName = "runningQueueSpout"; + String persistBoltName = "persistBolt"; + IRichSpout spout = new HadoopQueueRunningSpout(appConfig); - HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); - TopologyBuilder builder = new TopologyBuilder(); + Map<HadoopClusterConstants.DataSource, String> streamMaps = new HashMap<>(); + + String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString(); + String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString(); + streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId); + streamMaps.put(DataSource.SCHEDULER, schedulerStreamId); int numOfPersistTasks = appConfig.topology.numPersistTasks; int numOfSinkTasks = appConfig.topology.numSinkTasks; int numOfSpoutTasks = 1; - String spoutName = "runningQueueSpout"; - String persistBoltName = "persistBolt"; + HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps); + TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName); - StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config); + StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config); builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks) - .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName); + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId); + + StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config); + builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks) + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 1d64f87..159da21 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -31,6 +31,10 @@ public class HadoopClusterConstants { CLUSTER_METRIC, RUNNING_APPS, SCHEDULER } + public enum AppState { + RUNNING, ACCEPTED + } + public static class MetricName { // Metrics from running apps @@ -61,26 +65,9 @@ public class HadoopClusterConstants { } - public static class LeafQueueInfo { - public static final String TIMESTAMP = "timestamp"; - public static final String QUEUE_SITE = "site"; - public static final String QUEUE_NAME = "queue"; - public static final String QUEUE_STATE = "state"; - public static final String QUEUE_SCHEDULER = "scheduler"; - public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity"; - public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity"; - public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity"; - public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity"; - public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity"; - public static final String QUEUE_USED_MEMORY = "memory"; - public static final String QUEUE_USED_VCORES = "vcores"; - public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications"; - public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications"; - public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications"; - } - public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; + public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService"; // tag constants public static final String TAG_PARENT_QUEUE = "parentQueue"; @@ -90,6 +77,7 @@ public class HadoopClusterConstants { public static final String TAG_CLUSTER = "cluster"; // field constants + public static final String FIELD_DATASOURCE = "dataSource"; public static final String FIELD_DATATYPE = "dataType"; public static final String FIELD_DATA = "data"; http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java index 0ee4318..7ec24df 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java @@ -36,7 +36,19 @@ public class YarnClusterResourceURLBuilder { } public static String buildRunningAppsURL(String urlBase) { - return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER); + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAndRunningAppsURL(String urlBase) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED,RUNNING&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAppsURL(String urlBase) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAppTrackingURL(String urlBase, String appId) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId); } public static String buildFinishedAppsURL(String urlBase) { http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java index d3219ef..57dd454 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java @@ -70,9 +70,7 @@ public class ClusterMetricsParseListener { entity.setValue(new double[] {0.0}); clusterMetricEntities.put(key, entity); } - if (clusterMetricCounts.get(key) == null) { - clusterMetricCounts.put(key, 0); - } + clusterMetricCounts.putIfAbsent(key, 0); updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key)); clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1); } @@ -89,7 +87,7 @@ public class ClusterMetricsParseListener { public void flush() { HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis()); List<GenericMetricEntity> metrics = new ArrayList<>(clusterMetricEntities.values()); - this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId); + this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC, DataType.METRIC, metrics), messageId); reset(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index 364a1a7..ff54ca3 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -25,8 +25,13 @@ import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; +import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; import org.apache.eagle.hadoop.queue.model.applications.App; +import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; import org.apache.eagle.hadoop.queue.model.applications.Apps; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; import org.apache.eagle.log.entity.GenericMetricEntity; import backtype.storm.spout.SpoutOutputCollector; @@ -54,19 +59,29 @@ public class RunningAppParseListener { }; private String site; + private String rmUrl; private SpoutOutputCollector collector; private Map<String, GenericMetricEntity> appMetricEntities = new HashMap<>(); + private List<YarnAppAPIEntity> acceptedApps = new ArrayList<>(); - public RunningAppParseListener(String site, SpoutOutputCollector collector) { + public RunningAppParseListener(String site, SpoutOutputCollector collector, String rmUrl) { this.site = site; + this.rmUrl = rmUrl; this.collector = collector; } public void flush() { - logger.info("start sending app metrics, size: " + appMetricEntities.size()); - HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis()); + logger.info("crawled {} running app metrics", appMetricEntities.size()); + HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.RUNNING_APPS, System.currentTimeMillis()); List<GenericMetricEntity> metrics = new ArrayList<>(appMetricEntities.values()); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC, metrics), messageId); + + logger.info("crawled {} accepted apps", acceptedApps.size()); + messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.RUNNING_APPS, System.currentTimeMillis()); + List<YarnAppAPIEntity> entities = new ArrayList<>(acceptedApps); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.ENTITY, entities), messageId); + + acceptedApps.clear(); appMetricEntities.clear(); } @@ -97,21 +112,44 @@ public class RunningAppParseListener { public void onMetric(Apps apps, long timestamp) throws Exception { timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL; for (App app : apps.getApp()) { - Map<String, String> tags = new HashMap<>(); - tags.put(HadoopClusterConstants.TAG_USER, app.getUser()); - tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue()); - for (AggLevel level : AggLevel.values()) { - Map<String, String> newTags = buildMetricTags(level, tags); - for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) { - Method method = App.class.getMethod(entry.getValue()); - Integer value = (Integer) method.invoke(app); - String metricName = String.format(entry.getKey(), level.name); - createMetric(metricName, newTags, timestamp, value); + if (app.getState().equalsIgnoreCase(HadoopClusterConstants.AppState.ACCEPTED.toString())) { + YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity(); + appAPIEntity.setTags(buildAppTags(app)); + appAPIEntity.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId())); + appAPIEntity.setAppName(app.getName()); + appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage()); + appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage()); + appAPIEntity.setElapsedTime(app.getElapsedTime()); + appAPIEntity.setStartedTime(app.getStartedTime()); + appAPIEntity.setState(app.getState()); + appAPIEntity.setTimestamp(app.getStartedTime()); + acceptedApps.add(appAPIEntity); + } else { + Map<String, String> tags = new HashMap<>(); + tags.put(HadoopClusterConstants.TAG_USER, app.getUser()); + tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue()); + for (AggLevel level : AggLevel.values()) { + Map<String, String> newTags = buildMetricTags(level, tags); + for (java.util.Map.Entry<String, String> entry : metrics.entrySet()) { + Method method = App.class.getMethod(entry.getValue()); + Integer value = (Integer) method.invoke(app); + String metricName = String.format(entry.getKey(), level.name); + createMetric(metricName, newTags, timestamp, value); + } } } } } + private Map<String, String> buildAppTags(App app) { + Map<String, String> tags = new HashMap<>(); + tags.put(AppStreamInfo.SITE, this.site); + tags.put(AppStreamInfo.ID, app.getId()); + tags.put(AppStreamInfo.QUEUE, app.getQueue()); + tags.put(AppStreamInfo.USER, app.getUser()); + return tags; + } + private enum AggLevel { CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""), QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE), http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java index 3ffd371..39eec80 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java @@ -36,9 +36,10 @@ public class RunningAppsCrawler implements Runnable { private String urlString; public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector) { - this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl); + this.urlString = YarnClusterResourceURLBuilder.buildAcceptedAndRunningAppsURL(baseUrl); + //this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl); //this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl); - listener = new RunningAppParseListener(site, collector); + listener = new RunningAppParseListener(site, collector, baseUrl); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 67cc5c9..165bdb1 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -21,6 +21,8 @@ package org.apache.eagle.hadoop.queue.crawler; import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; import org.apache.eagle.hadoop.queue.model.scheduler.*; import org.apache.eagle.hadoop.queue.model.scheduler.Queue; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; @@ -68,12 +70,12 @@ public class SchedulerInfoParseListener { LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size()); HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); List<GenericMetricEntity> metrics = new ArrayList<>(metricEntities); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC, metrics), messageId); LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); - messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); + messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.SCHEDULER, System.currentTimeMillis()); List<TaggedLogAPIEntity> entities = new ArrayList<>(runningQueueAPIEntities); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY, entities), messageId); runningQueueAPIEntities.clear(); metricEntities.clear(); http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index 40d6e53..800bd03 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,6 +17,7 @@ */ package org.apache.eagle.hadoop.queue.model; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; @@ -25,5 +26,6 @@ public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); this.registerEntity(QueueStructureAPIEntity.class); + this.registerEntity(YarnAppAPIEntity.class); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java index b1cbb42..393ede3 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java @@ -51,6 +51,9 @@ public class App { private int allocatedMB; private int allocatedVCores; private int runningContainers; + // for HDP 2.7 + private double queueUsagePercentage; + private double clusterUsagePercentage; public String getId() { return id; @@ -219,4 +222,20 @@ public class App { public void setRunningContainers(int runningContainers) { this.runningContainers = runningContainers; } + + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java new file mode 100644 index 0000000..7e72023 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.applications; + +import java.util.HashMap; +import java.util.Map; + +public class AppStreamInfo { + public static final String SITE = "site"; + public static final String ID = "id"; + public static final String USER = "user"; + public static final String QUEUE = "queue"; + private static final String NAME = "appName"; + private static final String STATE = "state"; + private static final String STARTEDTIME = "startTime"; + private static final String ELAPSEDTIME = "elapsedTime"; + private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage"; + private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage"; + private static final String TRACKING_URL = "trackingUrl"; + + public static Map<String, Object> convertAppToStream(YarnAppAPIEntity appAPIEntity) { + Map<String, Object> queueStreamInfo = new HashMap<>(); + queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE)); + queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID)); + queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER)); + queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE)); + queueStreamInfo.put(NAME, appAPIEntity.getAppName()); + queueStreamInfo.put(STATE, appAPIEntity.getState()); + queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime()); + queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime()); + queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage()); + queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage()); + queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl()); + + return queueStreamInfo; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java new file mode 100644 index 0000000..7b36523 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.applications; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("yarn_app") +@ColumnFamily("f") +@Prefix("accepted") +@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME) +@TimeSeries(true) +@Partition( {"site"}) +@Tags({"site","id","user","queue"}) +public class YarnAppAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private String appName; + @Column("b") + private String state; + @Column("c") + private long startedTime; + @Column("d") + private long elapsedTime; + @Column("e") + private String trackingUrl; + @Column("f") + private double queueUsagePercentage; + @Column("g") + private double clusterUsagePercentage; + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + valueChanged("appName"); + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + valueChanged("state"); + } + + public long getStartedTime() { + return startedTime; + } + + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + valueChanged("startedTime"); + } + + public long getElapsedTime() { + return elapsedTime; + } + + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + valueChanged("elapsedTime"); + } + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } + + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + valueChanged("queueUsagePercentage"); + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + valueChanged("clusterUsagePercentage"); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java new file mode 100644 index 0000000..af06b27 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.scheduler; + +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; + +import java.util.HashMap; +import java.util.Map; + +public class QueueStreamInfo { + private static final String TIMESTAMP = "timestamp"; + private static final String QUEUE_SITE = "site"; + public static final String QUEUE_NAME = "queue"; + private static final String QUEUE_STATE = "state"; + private static final String QUEUE_SCHEDULER = "scheduler"; + private static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity"; + private static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity"; + private static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity"; + private static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity"; + private static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity"; + private static final String QUEUE_USED_MEMORY = "memory"; + private static final String QUEUE_USED_VCORES = "vcores"; + private static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications"; + private static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications"; + private static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications"; + + + public static Map<String, Object> convertEntityToStream(RunningQueueAPIEntity queueAPIEntity) { + Map<String, Object> queueInfoMap = new HashMap<>(); + queueInfoMap.put(QueueStreamInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE)); + queueInfoMap.put(QueueStreamInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE)); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler()); + queueInfoMap.put(QueueStreamInfo.QUEUE_STATE, queueAPIEntity.getState()); + queueInfoMap.put(QueueStreamInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory()); + queueInfoMap.put(QueueStreamInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores()); + queueInfoMap.put(QueueStreamInfo.TIMESTAMP, queueAPIEntity.getTimestamp()); + + double maxUserUsedCapacity = 0; + double userUsedCapacity; + for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) { + userUsedCapacity = calculateUserUsedCapacity( + queueAPIEntity.getAbsoluteUsedCapacity(), + queueAPIEntity.getMemory(), + user.getMemory()); + if (userUsedCapacity > maxUserUsedCapacity) { + maxUserUsedCapacity = userUsedCapacity; + } + + } + queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity); + queueInfoMap.put(QueueStreamInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity()); + return queueInfoMap; + } + + private static double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) { + return userUsedMem * absoluteUsedCapacity / queueUsedMem; + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 9eb7008..43a62b7 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -27,11 +27,13 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; -import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; +import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; -import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -39,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,18 +48,25 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class); + private Map<HadoopClusterConstants.DataSource, String> streamMap; private HadoopQueueRunningAppConfig config; private IEagleServiceClient client; private OutputCollector collector; - public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) { + public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config, + Map<HadoopClusterConstants.DataSource, String> streamMap) { this.config = config; + this.streamMap = streamMap; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { HadoopQueueRunningAppConfig.EagleProps.EagleService eagleService = config.eagleProps.eagleService; - this.client = new EagleServiceClientImpl(eagleService.host, eagleService.port, eagleService.username, eagleService.password); + this.client = new EagleServiceClientImpl( + eagleService.host, + eagleService.port, + eagleService.username, + eagleService.password); this.collector = collector; } @@ -67,30 +75,44 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { if (input == null) { return; } - String dataType = input.getStringByField(HadoopClusterConstants.FIELD_DATATYPE); + DataSource dataSource = (DataSource) input.getValueByField(HadoopClusterConstants.FIELD_DATASOURCE); + DataType dataType = (DataType) input.getValueByField(HadoopClusterConstants.FIELD_DATATYPE); Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA); - if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) { - List<GenericMetricEntity> metrics = (List<GenericMetricEntity>) data; - writeMetrics(metrics); - } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) { - List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data; + + List<TaggedLogAPIEntity> entities = (List<TaggedLogAPIEntity>) data; + if (dataType.equals(DataType.METRIC)) { + writeEntities(entities, dataType, dataSource); + } else { for (TaggedLogAPIEntity entity : entities) { if (entity instanceof RunningQueueAPIEntity) { RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { - collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), - parseLeafQueueInfo(queue))); + String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE); + collector.emit(streamMap.get(dataSource), + new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); } + } else if (entity instanceof YarnAppAPIEntity) { + YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity; + collector.emit(streamMap.get(dataSource), + new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity))); } } - writeEntities(entities); + if (!dataSource.equals(DataSource.RUNNING_APPS)) { + writeEntities(entities, dataType, dataSource); + } } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message")); + if (streamMap != null) { + for (String stormStreamId : streamMap.values()) { + declarer.declareStream(stormStreamId, new Fields("f1", "message")); + } + } else { + declarer.declare(new Fields("f1", "message")); + } } @Override @@ -104,67 +126,17 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { } } - private void writeEntities(List<TaggedLogAPIEntity> entities) { + private void writeEntities(List<TaggedLogAPIEntity> entities, DataType dataType, DataSource dataSource) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { LOG.error("Got exception from eagle service: " + response.getException()); } else { - LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities"); + LOG.info("Successfully wrote {} items of {} for {}", entities.size(), dataType, dataSource); } } catch (Exception e) { - LOG.error("cannot create running queue entities successfully", e); + LOG.error("cannot create {} entities", entities.size(), e); } entities.clear(); } - - private void writeMetrics(List<GenericMetricEntity> entities) { - try { - GenericServiceAPIResponseEntity response = client.create(entities); - if (response.isSuccess()) { - LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities"); - } else { - LOG.error(response.getException()); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) { - Map<String, Object> queueInfoMap = new HashMap<>(); - queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE)); - queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE)); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler()); - queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState()); - queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory()); - queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores()); - queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp()); - - double maxUserUsedCapacity = 0; - double userUsedCapacity; - for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) { - userUsedCapacity = calculateUserUsedCapacity( - queueAPIEntity.getAbsoluteUsedCapacity(), - queueAPIEntity.getMemory(), - user.getMemory()); - if (userUsedCapacity > maxUserUsedCapacity) { - maxUserUsedCapacity = userUsedCapacity; - } - - } - queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity); - queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity()); - return queueInfoMap; - } - - private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) { - return userUsedMem * absoluteUsedCapacity / queueUsedMem; - } } http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java index 530be9a..681f25e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java @@ -51,7 +51,9 @@ public class HadoopQueueRunningSpout extends BaseRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATATYPE, HadoopClusterConstants.FIELD_DATA)); + declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATASOURCE, + HadoopClusterConstants.FIELD_DATATYPE, + HadoopClusterConstants.FIELD_DATA)); } @Override http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index 5fb041d..da22836 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -55,12 +55,18 @@ <!-- sink to kafka --> <property> - <name>dataSinkConfig.topic</name> - <displayName>dataSinkConfig.topic</displayName> + <name>dataSinkConfig.HADOOP_QUEUE_STREAM.topic</name> + <displayName>Destination(Kafka Topic) Of Queue Stream Data</displayName> <value>yarn_queue</value> <description>topic for kafka data sink</description> </property> <property> + <name>dataSinkConfig.ACCEPTED_APP_STREAM.topic</name> + <displayName>Destination(Kafka Topic) Of App Stream Data</displayName> + <value>yarn_accepted_app</value> + <description>topic for kafka data sink</description> + </property> + <property> <name>dataSinkConfig.brokerList</name> <displayName>dataSinkConfig.brokerList</displayName> <value>localhost:6667</value> @@ -106,7 +112,7 @@ </configuration> <streams> <stream> - <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId> + <streamId>HADOOP_QUEUE_STREAM</streamId> <description>Hadoop Leaf Queue Info Stream</description> <validate>true</validate> <columns> @@ -172,6 +178,57 @@ </column> </columns> </stream> + <stream> + <streamId>ACCEPTED_APP_STREAM</streamId> + <description>Accepted App Info Stream</description> + <validate>true</validate> + <columns> + <column> + <name>id</name> + <type>string</type> + </column> + <column> + <name>site</name> + <type>string</type> + </column> + <column> + <name>appName</name> + <type>string</type> + </column> + <column> + <name>queue</name> + <type>string</type> + </column> + <column> + <name>state</name> + <type>string</type> + </column> + <column> + <name>user</name> + <type>string</type> + </column> + <column> + <name>trackingUrl</name> + <type>string</type> + </column> + <column> + <name>elapsedTime</name> + <type>long</type> + </column> + <column> + <name>startedTime</name> + <type>long</type> + </column> + <column> + <name>queueUsagePercentage</name> + <type>double</type> + </column> + <column> + <name>clusterUsagePercentage</name> + <type>double</type> + </column> + </columns> + </stream> </streams> <docs> <install> http://git-wip-us.apache.org/repos/asf/eagle/blob/4a5c4a43/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java index d25d05b..2a99d26 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java @@ -91,7 +91,7 @@ public abstract class AbstractURLSelector implements HAURLSelector { LOG.info("Successfully switch to new url : " + selectedUrl); return; } - LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. "); + LOG.info("try url " + urlToCheck + " failed for " + (time + 1) + " times, sleep 5 seconds before try again. "); try { Thread.sleep(5 * 1000); } catch (InterruptedException ex) {
