Repository: eagle Updated Branches: refs/heads/master 031835c77 -> 5e4c937e3
[EAGLE-882] Stream leaf RunningQueueAPIEntity into Kafka for queue monitoring https://issues.apache.org/jira/browse/EAGLE-882 Author: Zhao, Qingwen <[email protected]> Closes #793 from qingwen220/EAGLE-882. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/5e4c937e Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/5e4c937e Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/5e4c937e Branch: refs/heads/master Commit: 5e4c937e3479a09b7a21cd5176b7b2ebdd6c8287 Parents: 031835c Author: Zhao, Qingwen <[email protected]> Authored: Wed Jan 25 12:21:01 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Wed Jan 25 12:21:01 2017 +0800 ---------------------------------------------------------------------- .../hadoop/queue/HadoopQueueRunningApp.java | 13 +- .../queue/HadoopQueueRunningAppConfig.java | 6 +- .../queue/common/HadoopClusterConstants.java | 18 ++ .../crawler/SchedulerInfoParseListener.java | 18 +- .../model/scheduler/RunningQueueAPIEntity.java | 17 +- .../queue/model/scheduler/UserWrapper.java | 16 +- .../queue/model/scheduler/UserWrappers.java | 39 ++++ .../storm/HadoopQueueMetricPersistBolt.java | 57 ++++++ ...doop.queue.HadoopQueueRunningAppProvider.xml | 186 ++++++++++++++++--- .../src/main/resources/application.conf | 14 +- .../hadoop/queue/HadoopQueueRunningAppTest.java | 2 +- .../src/test/resources/application.conf | 14 +- 12 files changed, 347 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 77dc0cb..68ca8c7 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -17,11 +17,13 @@ package org.apache.eagle.hadoop.queue; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.messaging.StormStreamSink; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; @@ -33,14 +35,19 @@ public class HadoopQueueRunningApp extends StormApplication { HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); TopologyBuilder builder = new TopologyBuilder(); - int numOfParserTasks = appConfig.topology.numOfParserTasks; + int numOfPersistTasks = appConfig.topology.numPersistTasks; + int numOfSinkTasks = appConfig.topology.numSinkTasks; int numOfSpoutTasks = 1; String spoutName = "runningQueueSpout"; - String boltName = "parserBolt"; + String persistBoltName = "persistBolt"; builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); - builder.setBolt(boltName, bolt, numOfParserTasks).setNumTasks(numOfParserTasks).shuffleGrouping(spoutName); + builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName); + + StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config); + builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks) + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName); return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java index d398028..690ac6b 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppConfig.java @@ -36,7 +36,8 @@ public class HadoopQueueRunningAppConfig implements Serializable { } public static class Topology implements Serializable { - public int numOfParserTasks; + public int numSinkTasks; + public int numPersistTasks; } public static class DataSourceConfig implements Serializable { @@ -67,7 +68,8 @@ public class HadoopQueueRunningAppConfig implements Serializable { private void init(Config config) { this.config = config; - this.topology.numOfParserTasks = config.getInt("topology.numOfParserTasks"); + this.topology.numPersistTasks = config.getInt("topology.numPersistTasks"); + this.topology.numSinkTasks = config.getInt("topology.numSinkTasks"); this.dataSourceConfig.rMEndPoints = config.getString("dataSourceConfig.rMEndPoints"); this.dataSourceConfig.fetchIntervalSec = config.getString("dataSourceConfig.fetchIntervalSec"); http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 4a63343..9a08f05 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -61,6 +61,24 @@ public class HadoopClusterConstants { } + public static class LeafQueueInfo { + public static final String TIMESTAMP = "timestamp"; + public static final String QUEUE_SITE = "site"; + public static final String QUEUE_NAME = "queue"; + public static final String QUEUE_STATE = "state"; + public static final String QUEUE_SCHEDULER = "scheduler"; + public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity"; + public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity"; + public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity"; + public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity"; + public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity"; + public static final String QUEUE_USED_MEMORY = "memory"; + public static final String QUEUE_USED_VCORES = "vcores"; + public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications"; + public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications"; + public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications"; + } + public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; // tag constants http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 214e7f6..b0452c9 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -112,15 +112,17 @@ public class SchedulerInfoParseListener { _entity.setNumPendingApplications(queue.getNumPendingApplications()); _entity.setMaxActiveApplications(queue.getMaxActiveApplications()); _entity.setTimestamp(currentTimestamp); + _entity.setUserLimitFactor(queue.getUserLimitFactor()); List<UserWrapper> userList = new ArrayList<>(); if (queue.getUsers() != null && queue.getUsers().getUser() != null) { for (User user : queue.getUsers().getUser()) { - UserWrapper newUser = new UserWrapper(user); - userList.add(newUser); + userList.add(wrapUser(user)); } } - _entity.setUsers(userList); + UserWrappers users = new UserWrappers(); + users.setUsers(userList); + _entity.setUsers(users); runningQueueAPIEntities.add(_entity); @@ -149,4 +151,14 @@ public class SchedulerInfoParseListener { } } } + + private UserWrapper wrapUser(User user) { + UserWrapper wrapper = new UserWrapper(); + wrapper.setUsername(user.getUsername()); + wrapper.setMemory(user.getResourcesUsed().getMemory()); + wrapper.setvCores(user.getResourcesUsed().getvCores()); + wrapper.setNumActiveApplications(user.getNumActiveApplications()); + wrapper.setNumPendingApplications(user.getNumPendingApplications()); + return wrapper; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java index e7563d4..be4ef31 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/RunningQueueAPIEntity.java @@ -54,7 +54,9 @@ public class RunningQueueAPIEntity extends TaggedLogAPIEntity { @Column("j") private String scheduler; @Column("k") - private List<UserWrapper> users; + private UserWrappers users; + @Column("l") + private double userLimitFactor; public String getScheduler() { return scheduler; @@ -147,12 +149,21 @@ public class RunningQueueAPIEntity extends TaggedLogAPIEntity { valueChanged("numPendingApplications"); } - public List<UserWrapper> getUsers() { + public UserWrappers getUsers() { return users; } - public void setUsers(List<UserWrapper> users) { + public void setUsers(UserWrappers users) { this.users = users; valueChanged("users"); } + + public double getUserLimitFactor() { + return userLimitFactor; + } + + public void setUserLimitFactor(double userLimitFactor) { + this.userLimitFactor = userLimitFactor; + valueChanged("userLimitFactor"); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java index 9303727..f5c15f5 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrapper.java @@ -18,22 +18,20 @@ package org.apache.eagle.hadoop.queue.model.scheduler; -public class UserWrapper { +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.io.Serializable; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserWrapper implements Serializable { private String username; private long memory; private long vCores; private int numPendingApplications; private int numActiveApplications; - public UserWrapper(User user) { - this.username = user.getUsername(); - this.memory = user.getResourcesUsed().getMemory(); - this.vCores = user.getResourcesUsed().getvCores(); - this.numActiveApplications = user.getNumActiveApplications(); - this.numPendingApplications = user.getNumPendingApplications(); - } - public String getUsername() { return username; } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java new file mode 100644 index 0000000..9a6bf8a --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/UserWrappers.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.scheduler; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.Serializable; +import java.util.List; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserWrappers implements Serializable { + private List<UserWrapper> users; + + public List<UserWrapper> getUsers() { + return users; + } + + public void setUsers(List<UserWrapper> users) { + this.users = users; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 3609184..1bafc13 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -22,10 +22,14 @@ import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; +import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; @@ -33,6 +37,8 @@ import org.apache.eagle.service.client.impl.EagleServiceClientImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -67,6 +73,11 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { writeMetrics(metrics); } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) { List<RunningQueueAPIEntity> entities = (List<RunningQueueAPIEntity>) data; + for (RunningQueueAPIEntity queue : entities) { + if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { + collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue))); + } + } writeEntities(entities); } this.collector.ack(input); @@ -74,7 +85,18 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message")); + } + @Override + public void cleanup() { + if (client != null) { + try { + client.close(); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + } + } } private void writeEntities(List<RunningQueueAPIEntity> entities) { @@ -104,5 +126,40 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { } } + private Map<String, Object> parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) { + Map<String, Object> queueInfoMap = new HashMap<>(); + queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE)); + queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE)); + queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity()); + queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity()); + queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity()); + queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications()); + queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications()); + queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications()); + queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler()); + queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState()); + queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory()); + queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores()); + queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp()); + double maxUserUsedCapacity = 0; + double userUsedCapacity; + for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) { + userUsedCapacity = calculateUserUsedCapacity( + queueAPIEntity.getAbsoluteUsedCapacity(), + queueAPIEntity.getMemory(), + user.getMemory()); + if (userUsedCapacity > maxUserUsedCapacity) { + maxUserUsedCapacity = userUsedCapacity; + } + + } + queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity); + queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity()); + return queueInfoMap; + } + + private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) { + return userUsedMem * absoluteUsedCapacity / queueUsedMem; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index 4984b43..4cf745c 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -17,36 +17,162 @@ --> <application> - <type>HADOOP_QUEUE_RUNNING_APP</type> - <name>Hadoop Queue Monitor</name> - <configuration> - <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig --> - <property> - <name>dataSourceConfig.rMEndPoints</name> - <displayName>Resource Manager End Points</displayName> - <description>end points of resource manager, comma-separated for multiple</description> - <value>http://sandbox.hortonworks.com:8088/</value> - <required>true</required> - </property> - <property> - <name>workers</name> - <displayName>Storm Worker Number</displayName> - <description>the number of storm worker</description> - <value>1</value> - </property> - <property> - <name>topology.numOfParserTasks</name> - <displayName>Parallel Tasks Per Bolt</displayName> - <description>the number of tasks that should be assigned to execute a bolt</description> - <value>2</value> - </property> - <property> - <name>dataSourceConfig.fetchIntervalSec</name> - <displayName>Fetching Metric Interval in Seconds</displayName> - <description>interval seconds of fetching metric from resource manager</description> - <value>10</value> - </property> - </configuration> + <type>HADOOP_QUEUE_RUNNING_APP</type> + <name>Hadoop Queue Monitor</name> + <configuration> + <!-- org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig --> + <property> + <name>dataSourceConfig.rMEndPoints</name> + <displayName>Resource Manager End Points</displayName> + <description>end points of resource manager, comma-separated for multiple</description> + <value>http://sandbox.hortonworks.com:8088/</value> + <required>true</required> + </property> + <property> + <name>workers</name> + <displayName>Storm Worker Number</displayName> + <description>the number of storm worker</description> + <value>1</value> + </property> + <property> + <name>topology.numPersistTasks</name> + <displayName>Tasks for Data Storage Bolt</displayName> + <description>the number of tasks that persist metrics or entities into the database</description> + <value>2</value> + </property> + <property> + <name>topology.numSinkTasks</name> + <displayName>Tasks for Stream Sink Bolt</displayName> + <description>the number of tasks that stream leaf queue info into Kafka</description> + <value>2</value> + </property> + <property> + <name>dataSourceConfig.fetchIntervalSec</name> + <displayName>Fetching Metric Interval in Seconds</displayName> + <description>interval seconds of fetching metric from resource manager</description> + <value>10</value> + </property> + + <!-- sink to kafka --> + <property> + <name>dataSinkConfig.topic</name> + <displayName>dataSinkConfig.topic</displayName> + <value>hadoop_leaf_queue</value> + <description>topic for kafka data sink</description> + </property> + <property> + <name>dataSinkConfig.brokerList</name> + <displayName>dataSinkConfig.brokerList</displayName> + <value>localhost:6667</value> + <description>kafka broker list</description> + <required>true</required> + </property> + <property> + <name>dataSinkConfig.serializerClass</name> + <displayName>dataSinkConfig.serializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message value</description> + </property> + <property> + <name>dataSinkConfig.keySerializerClass</name> + <displayName>dataSinkConfig.keySerializerClass</displayName> + <value>kafka.serializer.StringEncoder</value> + <description>serializer class Kafka message key</description> + </property> + <property> + <name>dataSinkConfig.producerType</name> + <displayName>dataSinkConfig.producerType</displayName> + <value>async</value> + <description>whether the messages are sent asynchronously in a background thread</description> + </property> + <property> + <name>dataSinkConfig.numBatchMessages</name> + <displayName>dataSinkConfig.numBatchMessages</displayName> + <value>4096</value> + <description>number of messages to send in one batch when using async mode</description> + </property> + <property> + <name>dataSinkConfig.maxQueueBufferMs</name> + <displayName>dataSinkConfig.maxQueueBufferMs</displayName> + <value>5000</value> + <description>maximum time to buffer data when using async mode</description> + </property> + <property> + <name>dataSinkConfig.requestRequiredAcks</name> + <displayName>dataSinkConfig.requestRequiredAcks</displayName> + <value>0</value> + <description>value controls when a produce request is considered completed</description> + </property> + </configuration> + <streams> + <stream> + <streamId>HADOOP_LEAF_QUEUE_STREAM</streamId> + <description>Hadoop Leaf Queue Info Stream</description> + <validate>true</validate> + <columns> + <column> + <name>timestamp</name> + <type>long</type> + </column> + <column> + <name>site</name> + <type>string</type> + </column> + <column> + <name>queue</name> + <type>string</type> + </column> + <column> + <name>state</name> + <type>string</type> + </column> + <column> + <name>scheduler</name> + <type>string</type> + </column> + <column> + <name>absoluteCapacity</name> + <type>double</type> + </column> + <column> + <name>absoluteMaxCapacity</name> + <type>double</type> + </column> + <column> + <name>absoluteUsedCapacity</name> + <type>double</type> + </column> + <column> + <name>maxUserUsedCapacity</name> + <type>double</type> + </column> + <column> + <name>userLimitCapacity</name> + <type>double</type> + </column> + <column> + <name>memory</name> + <type>long</type> + </column> + <column> + <name>vcores</name> + <type>long</type> + </column> + <column> + <name>numActiveApplications</name> + <type>int</type> + </column> + <column> + <name>numPendingApplications</name> + <type>int</type> + </column> + <column> + <name>maxActiveApplications</name> + <type>int</type> + </column> + </columns> + </stream> + </streams> <docs> <install> </install> http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf index e5c9b81..9d69084 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/application.conf @@ -15,7 +15,8 @@ { "topology" : { - "numOfParserTasks" : 2, + "numSinkTasks" : 2, + "numPersistTasks" : 2 }, "dataSourceConfig": { "rMEndPoints" : "http://sandbox.hortonworks.com:8088/", @@ -32,4 +33,15 @@ "mode":"LOCAL", application.storm.nimbusHost=localhost, "workers":1, + + "dataSinkConfig": { + "topic" : "hadoop_leaf_queue", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java index 32ed320..da79ea2 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java +++ b/eagle-jpm/eagle-hadoop-queue/src/test/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningAppTest.java @@ -21,7 +21,7 @@ import org.junit.Test; public class HadoopQueueRunningAppTest { @Test - public void testRun(){ + public void testRun() { new HadoopQueueRunningApp().run(ConfigFactory.load()); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/5e4c937e/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf index e5c9b81..9d69084 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf +++ b/eagle-jpm/eagle-hadoop-queue/src/test/resources/application.conf @@ -15,7 +15,8 @@ { "topology" : { - "numOfParserTasks" : 2, + "numSinkTasks" : 2, + "numPersistTasks" : 2 }, "dataSourceConfig": { "rMEndPoints" : "http://sandbox.hortonworks.com:8088/", @@ -32,4 +33,15 @@ "mode":"LOCAL", application.storm.nimbusHost=localhost, "workers":1, + + "dataSinkConfig": { + "topic" : "hadoop_leaf_queue", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + } }
