Author: chirino
Date: Fri Aug 26 15:18:53 2011
New Revision: 1162140
URL: http://svn.apache.org/viewvc?rev=1162140&view=rev
Log:
- Realized that even Topics can own Queue so simplified Topic/Queue Metrics
down to just a DestMetricsDTO
-
Added:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
- copied, changed from r1162139,
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
- copied, changed from r1162139,
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
Removed:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateTopicMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestinationMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicMetricsDTO.java
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Fri Aug 26 15:18:53 2011
@@ -29,6 +29,48 @@ import java.util.{Arrays, ArrayList}
import collection.mutable.{LinkedHashMap, HashMap}
import collection.{Iterable, JavaConversions}
+object DestinationMetricsSupport {
+
+ def add_destination_metrics(to:DestMetricsDTO, from:DestMetricsDTO) = {
+ to.enqueue_item_counter += from.enqueue_item_counter
+ to.enqueue_size_counter += from.enqueue_size_counter
+ to.enqueue_ts = to.enqueue_ts max from.enqueue_ts
+
+ to.dequeue_item_counter += from.dequeue_item_counter
+ to.dequeue_size_counter += from.dequeue_size_counter
+ to.dequeue_ts = to.dequeue_ts max from.dequeue_ts
+
+ to.producer_counter += from.producer_counter
+ to.consumer_counter += from.consumer_counter
+ to.producer_count += from.producer_count
+ to.consumer_count += from.consumer_count
+
+ to.nack_item_counter += from.nack_item_counter
+ to.nack_size_counter += from.nack_size_counter
+ to.nack_ts = to.nack_ts max from.nack_ts
+
+ to.expired_item_counter += from.expired_item_counter
+ to.expired_size_counter += from.expired_size_counter
+ to.expired_ts = to.expired_ts max from.expired_ts
+
+ to.queue_size += from.queue_size
+ to.queue_items += from.queue_items
+
+ to.swap_out_item_counter += from.swap_out_item_counter
+ to.swap_out_size_counter += from.swap_out_size_counter
+ to.swap_in_item_counter += from.swap_in_item_counter
+ to.swap_in_size_counter += from.swap_in_size_counter
+
+ to.swapping_in_size += from.swapping_in_size
+ to.swapping_out_size += from.swapping_out_size
+
+ to.swapped_in_items += from.swapped_in_items
+ to.swapped_in_size += from.swapped_in_size
+ to.swapped_in_size_max += from.swapped_in_size_max
+ }
+
+}
+
/**
* <p>
* </p>
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 26 15:18:53 2011
@@ -207,8 +207,8 @@ class Queue(val router: LocalRouter, val
var auto_delete_after = 0
var idled_at = 0L
- def get_queue_metrics:QueueMetricsDTO = {
- val rc = new QueueMetricsDTO
+ def get_queue_metrics:DestMetricsDTO = {
+ val rc = new DestMetricsDTO
rc.enqueue_item_counter = this.enqueue_item_counter
rc.enqueue_size_counter = this.enqueue_size_counter
@@ -1827,7 +1827,7 @@ class Subscription(val queue:Queue, val
def refill_prefetch = {
- var next = if( pos.is_tail ) {
+ var cursor = if( pos.is_tail ) {
null // can't prefetch the tail..
} else if( pos.is_head ) {
pos.getNext // can't prefetch the head.
@@ -1836,18 +1836,19 @@ class Subscription(val queue:Queue, val
}
var remaining = queue.tune_consumer_buffer - acquired_size
- while( remaining>0 && next!=null ) {
- remaining -= next.size
- next.prefetch_flags = (next.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
- next.load
- next = next.getNext
+ while( remaining>0 && cursor!=null ) {
+ val next = cursor.getNext
+ remaining -= cursor.size
+ cursor.prefetch_flags = (cursor.prefetch_flags |
PREFTCH_LOAD_FLAG).toByte
+ cursor.load
+ cursor = next
}
remaining = avg_advanced_size
- while( remaining>0 && next!=null ) {
- remaining -= next.size
- next.prefetch_flags = (next.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
- next = next.getNext
+ while( remaining>0 && cursor!=null ) {
+ remaining -= cursor.size
+ cursor.prefetch_flags = (cursor.prefetch_flags |
PREFTCH_HOLD_FLAG).toByte
+ cursor = cursor.getNext
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Aug 26 15:18:53 2011
@@ -195,6 +195,14 @@ class Topic(val router:LocalRouter, val
rc.metrics.dequeue_ts = rc.metrics.dequeue_ts max link.enqueue_ts
}
+ // Add in any queue metrics that the topic may own.
+ for(queue <- consumer_queues.values) {
+ val metrics = queue.get_queue_metrics
+ metrics.enqueue_item_counter = 0
+ metrics.enqueue_size_counter = 0
+ metrics.enqueue_ts = 0
+ DestinationMetricsSupport.add_destination_metrics(rc.metrics, metrics)
+ }
rc
}
Copied:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
(from r1162139,
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java&r1=1162139&r2=1162140&rev=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateQueueMetricsDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AggregateDestMetricsDTO.java
Fri Aug 26 15:18:53 2011
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.apollo.dto;
-import org.codehaus.jackson.annotate.JsonTypeInfo;
-
import javax.xml.bind.annotation.*;
/**
@@ -26,9 +24,9 @@ import javax.xml.bind.annotation.*;
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="aggregate_queue_metrics")
+@XmlRootElement(name="aggregate_dest_metrics")
@XmlAccessorType(XmlAccessType.FIELD)
-public class AggregateQueueMetricsDTO extends QueueMetricsDTO {
+public class AggregateDestMetricsDTO extends DestMetricsDTO {
/**
* The number of objects which where aggregated.
Copied:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
(from r1162139,
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java)
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java?p2=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java&p1=activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java&r1=1162139&r2=1162140&rev=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueMetricsDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DestMetricsDTO.java
Fri Aug 26 15:18:53 2011
@@ -37,9 +37,82 @@ import javax.xml.bind.annotation.XmlRoot
* </p>
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-@XmlRootElement(name="queue_metrics")
+@XmlRootElement(name="dest_metrics")
@XmlAccessorType(XmlAccessType.FIELD)
-public class QueueMetricsDTO extends DestinationMetricsDTO {
+public class DestMetricsDTO {
+ /**
+ * The current time on the broker machine. In milliseconds since the
epoch.
+ */
+ @XmlAttribute(name="current_time")
+ public long current_time;
+
+ /**
+ * The number of messages that have been sent to the destination.
+ */
+ @XmlAttribute(name="enqueue_item_counter")
+ public long enqueue_item_counter;
+
+ /**
+ * The total size in bytes of messages that have been sent
+ * to the destination
+ */
+ @XmlAttribute(name="enqueue_size_counter")
+ public long enqueue_size_counter;
+
+ /**
+ * The time stamp of when the last message was sent to the destination.
+ */
+ @XmlAttribute(name="enqueue_ts")
+ public long enqueue_ts;
+
+ /**
+ * The number of messages that have been sent to consumers on
+ * the destination.
+ */
+ @XmlAttribute(name="dequeue_item_counter")
+ public long dequeue_item_counter;
+
+ /**
+ * The total size in bytes of messages that have been sent to consumers on
+ * the destination.
+ */
+ @XmlAttribute(name="dequeue_size_counter")
+ public long dequeue_size_counter;
+
+ /**
+ * The time stamp of when the last dequeue to a consumers occurred.
+ */
+ @XmlAttribute(name="dequeue_ts")
+ public long dequeue_ts;
+
+ /**
+ * The total number of producers that have ever sent to
+ * the destination.
+ */
+ @XmlAttribute(name="producer_counter")
+ public long producer_counter;
+
+ /**
+ * The total number of consumers that have ever subscribed to
+ * the queue.
+ */
+ @XmlAttribute(name="consumer_counter")
+ public long consumer_counter;
+
+
+ /**
+ * The current number of producers sending to the destination
+ * the queue.
+ */
+ @XmlAttribute(name="producer_count")
+ public long producer_count;
+
+ /**
+ * The current number of producers consuming from the destination.
+ */
+ @XmlAttribute(name="consumer_count")
+ public long consumer_count;
+
/**
* The number of messages which expired before they could be processed.
*/
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueStatusDTO.java
Fri Aug 26 15:18:53 2011
@@ -37,7 +37,7 @@ public class QueueStatusDTO extends Serv
public DestinationDTO binding;
@XmlElement
- public QueueMetricsDTO metrics = new QueueMetricsDTO();
+ public DestMetricsDTO metrics = new DestMetricsDTO();
/**
* Status of the entries in the queue
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicStatusDTO.java
Fri Aug 26 15:18:53 2011
@@ -51,7 +51,7 @@ public class TopicStatusDTO extends Serv
@XmlElement(name="dsub")
public List<String> dsubs = new ArrayList<String>();
- @XmlElement
- public TopicMetricsDTO metrics = new TopicMetricsDTO();
+ @XmlElement(name="metrics")
+ public DestMetricsDTO metrics = new DestMetricsDTO();
}
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
(original)
+++
activemq/activemq-apollo/trunk/apollo-dto/src/main/resources/org/apache/activemq/apollo/dto/jaxb.index
Fri Aug 26 15:18:53 2011
@@ -46,7 +46,6 @@ QueueConsumerLinkDTO
ValueDTO
StringListDTO
DataPageDTO
-AggregateTopicMetricsDTO
-AggregateQueueMetricsDTO
-DestinationMetricsDTO
+AggregateDestMetricsDTO
+DestMetricsDTO
AggregateConnectionMetricsDTO
\ No newline at end of file
Modified:
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162140&r1=1162139&r2=1162140&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Fri Aug 26 15:18:53 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.apollo.web.r
import org.apache.activemq.apollo.dto._
import java.{lang => jl}
import org.fusesource.hawtdispatch._
-import org.apache.activemq.apollo.broker._
import scala.collection.Iterable
import org.apache.activemq.apollo.util.path.PathParser
import org.apache.activemq.apollo.util._
@@ -33,6 +32,7 @@ import org.josql.{QueryResults, Query}
import java.util.regex.Pattern
import javax.servlet.http.HttpServletResponse
import java.util.{Collections, ArrayList}
+import org.apache.activemq.apollo.broker._
/**
* <p>
@@ -205,8 +205,8 @@ case class BrokerResource() extends Reso
@GET
@Path("queue-metrics")
- def get_queue_metrics(): AggregateQueueMetricsDTO = {
- val rc:AggregateQueueMetricsDTO = with_broker { broker =>
+ def get_queue_metrics(): AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_queue_metrics(broker)
}
@@ -217,8 +217,8 @@ case class BrokerResource() extends Reso
@GET
@Path("topic-metrics")
- def get_topic_metrics(): AggregateTopicMetricsDTO = {
- val rc:AggregateTopicMetricsDTO = with_broker { broker =>
+ def get_topic_metrics(): AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_topic_metrics(broker)
}
@@ -229,8 +229,8 @@ case class BrokerResource() extends Reso
@GET
@Path("dsub-metrics")
- def get_dsub_metrics(): AggregateQueueMetricsDTO = {
- val rc:AggregateQueueMetricsDTO = with_broker { broker =>
+ def get_dsub_metrics(): AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_broker { broker =>
monitoring(broker) {
get_dsub_metrics(broker)
}
@@ -239,14 +239,14 @@ case class BrokerResource() extends Reso
rc
}
- def aggregate(queue:AggregateQueueMetricsDTO,
topic:AggregateTopicMetricsDTO,
dsub:AggregateQueueMetricsDTO):AggregateQueueMetricsDTO = {
+ def aggregate(queue:AggregateDestMetricsDTO, topic:AggregateDestMetricsDTO,
dsub:AggregateDestMetricsDTO):AggregateDestMetricsDTO = {
// zero out the enqueue stats on the dsubs since they will already be
accounted for in the topic
// stats.
dsub.enqueue_item_counter = 0
dsub.enqueue_size_counter = 0
dsub.enqueue_ts = 0
- val rc = aggregate_queue_metrics(List(queue, dsub))
- add_destination_metrics(rc, topic)
+ val rc = aggregate_dest_metrics(List(queue, dsub))
+ DestinationMetricsSupport.add_destination_metrics(rc, topic)
rc.objects += topic.objects
rc.current_time = now
rc
@@ -254,119 +254,67 @@ case class BrokerResource() extends Reso
@GET
@Path("dest-metrics")
- def get_dest_metrics(): AggregateQueueMetricsDTO = {
+ def get_dest_metrics(): AggregateDestMetricsDTO = {
aggregate(get_queue_metrics(), get_topic_metrics(), get_dsub_metrics())
}
- def add_destination_metrics(to:DestinationMetricsDTO,
from:DestinationMetricsDTO) = {
- to.enqueue_item_counter += from.enqueue_item_counter
- to.enqueue_size_counter += from.enqueue_size_counter
- to.enqueue_ts = to.enqueue_ts max from.enqueue_ts
-
- to.dequeue_item_counter += from.dequeue_item_counter
- to.dequeue_size_counter += from.dequeue_size_counter
- to.dequeue_ts = to.dequeue_ts max from.dequeue_ts
-
- to.producer_counter += from.producer_counter
- to.consumer_counter += from.consumer_counter
- to.producer_count += from.producer_count
- to.consumer_count += from.consumer_count
- }
-
- def
aggregate_queue_metrics(metrics:Iterable[QueueMetricsDTO]):AggregateQueueMetricsDTO
= {
- metrics.foldLeft(new AggregateQueueMetricsDTO){ (memo, metric)=>
- add_destination_metrics(memo, metric)
-
- memo.nack_item_counter += metric.nack_item_counter
- memo.nack_size_counter += metric.nack_size_counter
- memo.nack_ts = memo.nack_ts max metric.nack_ts
-
- memo.expired_item_counter += metric.expired_item_counter
- memo.expired_size_counter += metric.expired_size_counter
- memo.expired_ts = memo.expired_ts max metric.expired_ts
-
- memo.queue_size += metric.queue_size
- memo.queue_items += metric.queue_items
-
- memo.swap_out_item_counter += metric.swap_out_item_counter
- memo.swap_out_size_counter += metric.swap_out_size_counter
- memo.swap_in_item_counter += metric.swap_in_item_counter
- memo.swap_in_size_counter += metric.swap_in_size_counter
-
- memo.swapping_in_size += metric.swapping_in_size
- memo.swapping_out_size += metric.swapping_out_size
-
- memo.swapped_in_items += metric.swapped_in_items
- memo.swapped_in_size += metric.swapped_in_size
-
- memo.swapped_in_size_max += metric.swapped_in_size_max
-
- if( metric.isInstanceOf[AggregateQueueMetricsDTO] ) {
- memo.objects += metric.asInstanceOf[AggregateQueueMetricsDTO].objects
- } else {
- memo.objects += 1
- }
- memo
- }
- }
-
- def
aggregate_topic_metrics(metrics:Iterable[TopicMetricsDTO]):AggregateTopicMetricsDTO
= {
- metrics.foldLeft(new AggregateTopicMetricsDTO){ (memo, metric)=>
- add_destination_metrics(memo, metric)
- if( metric.isInstanceOf[AggregateTopicMetricsDTO] ) {
- memo.objects += metric.asInstanceOf[AggregateTopicMetricsDTO].objects
+ def
aggregate_dest_metrics(metrics:Iterable[DestMetricsDTO]):AggregateDestMetricsDTO
= {
+ metrics.foldLeft(new AggregateDestMetricsDTO){ (to, from)=>
+ DestinationMetricsSupport.add_destination_metrics(to, from)
+ if( from.isInstanceOf[AggregateDestMetricsDTO] ) {
+ to.objects += from.asInstanceOf[AggregateDestMetricsDTO].objects
} else {
- memo.objects += 1
+ to.objects += 1
}
- memo
+ to
}
}
- def get_queue_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO]
= {
+ def get_queue_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] =
{
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_queue_metrics(host)
}
- metrics.map( x=>
Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
+ metrics.map( x=>
Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
- def
get_queue_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+ def
get_queue_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
val router:LocalRouter = host
val queues: Iterable[Queue] = router.queue_domain.destinations
val metrics = sync_all(queues) { queue =>
queue.get_queue_metrics
}
- metrics.map( x=>
Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
+ metrics.map( x=>
Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
}
- def get_topic_metrics(broker:Broker):FutureResult[AggregateTopicMetricsDTO]
= {
+ def get_topic_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] =
{
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_topic_metrics(host)
}
- metrics.map( x=>
Success(aggregate_topic_metrics(x.flatMap(_.success_option)) ))
+ metrics.map( x=>
Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
- def
get_topic_metrics(host:VirtualHost):FutureResult[AggregateTopicMetricsDTO] = {
+ def
get_topic_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO] = {
val router:LocalRouter = host
val topics: Iterable[Topic] = router.topic_domain.destinations
val metrics = topics.map(_.status.metrics)
- FutureResult(Success(aggregate_topic_metrics(metrics)))
+ FutureResult(Success(aggregate_dest_metrics(metrics)))
}
- def get_dsub_metrics(broker:Broker):FutureResult[AggregateQueueMetricsDTO] =
{
+ def get_dsub_metrics(broker:Broker):FutureResult[AggregateDestMetricsDTO] = {
val metrics = sync_all(broker.virtual_hosts.values) { host =>
get_dsub_metrics(host)
}
- metrics.map( x=>
Success(aggregate_queue_metrics(x.flatMap(_.success_option)) ))
+ metrics.map( x=>
Success(aggregate_dest_metrics(x.flatMap(_.success_option)) ))
}
- def
get_dsub_metrics(host:VirtualHost):FutureResult[AggregateQueueMetricsDTO] = {
+ def get_dsub_metrics(host:VirtualHost):FutureResult[AggregateDestMetricsDTO]
= {
val router:LocalRouter = host
val dsubs: Iterable[Queue] =
router.topic_domain.durable_subscriptions_by_id.values
val metrics = sync_all(dsubs) { dsub =>
dsub.get_queue_metrics
}
- metrics.map( x=>
Success(aggregate_queue_metrics(x.flatMap(_.success_option))) )
+ metrics.map( x=>
Success(aggregate_dest_metrics(x.flatMap(_.success_option))) )
}
@@ -418,8 +366,8 @@ case class BrokerResource() extends Reso
}
@GET @Path("virtual-hosts/{id}/queue-metrics")
- def virtual_host_queue_metrics(@PathParam("id") id : String):
AggregateQueueMetricsDTO = {
- val rc:AggregateQueueMetricsDTO = with_virtual_host(id) { host =>
+ def virtual_host_queue_metrics(@PathParam("id") id : String):
AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_queue_metrics(host)
}
@@ -429,8 +377,8 @@ case class BrokerResource() extends Reso
}
@GET @Path("virtual-hosts/{id}/topic-metrics")
- def virtual_host_topic_metrics(@PathParam("id") id : String):
AggregateTopicMetricsDTO = {
- val rc:AggregateTopicMetricsDTO = with_virtual_host(id) { host =>
+ def virtual_host_topic_metrics(@PathParam("id") id : String):
AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_topic_metrics(host)
}
@@ -440,8 +388,8 @@ case class BrokerResource() extends Reso
}
@GET @Path("virtual-hosts/{id}/dsub-metrics")
- def virtual_host_dsub_metrics(@PathParam("id") id : String):
AggregateQueueMetricsDTO = {
- val rc:AggregateQueueMetricsDTO = with_virtual_host(id) { host =>
+ def virtual_host_dsub_metrics(@PathParam("id") id : String):
AggregateDestMetricsDTO = {
+ val rc:AggregateDestMetricsDTO = with_virtual_host(id) { host =>
monitoring(host) {
get_dsub_metrics(host)
}
@@ -451,7 +399,7 @@ case class BrokerResource() extends Reso
}
@GET @Path("virtual-hosts/{id}/dest-metrics")
- def virtual_host_dest_metrics(@PathParam("id") id : String):
AggregateQueueMetricsDTO = {
+ def virtual_host_dest_metrics(@PathParam("id") id : String):
AggregateDestMetricsDTO = {
aggregate(virtual_host_queue_metrics(id), virtual_host_topic_metrics(id),
virtual_host_dsub_metrics(id))
}