Author: chirino
Date: Fri Aug 26 12:22:24 2011
New Revision: 1162079
URL: http://svn.apache.org/viewvc?rev=1162079&view=rev
Log:
Made DeliverySession extend SessionSink[Delivery] and added helper filter.
REST api now properly reports the enqueue timestamp for consumers and producers.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
Fri Aug 26 12:22:24 2011
@@ -343,7 +343,7 @@ class Broker() extends BaseService {
}
- def schedule_periodic_maintenance:Unit = dispatch_queue.after(1,
TimeUnit.SECONDS) {
+ def schedule_periodic_maintenance:Unit = dispatch_queue.after(100,
TimeUnit.MILLISECONDS) {
if( service_state.is_starting_or_started ) {
now = System.currentTimeMillis
schedule_periodic_maintenance
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Aug 26 12:22:24 2011
@@ -75,18 +75,9 @@ trait DeliveryConsumer extends Retained
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DeliverySession extends Sink[Delivery] {
- /**
- * The number of deliveries accepted by this session.
- */
- def enqueue_item_counter:Long
- /**
- * The total size of the deliveries accepted by this session.
- */
- def enqueue_size_counter:Long
+trait DeliverySession extends SessionSink[Delivery] {
def producer:DeliveryProducer
def consumer:DeliveryConsumer
- def remaining_capacity:Int
def close:Unit
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug 26 12:22:24 2011
@@ -75,7 +75,9 @@ class Queue(val router: LocalRouter, val
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
- val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue,
Delivery)
+ val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue,
Delivery) {
+ override def time_stamp = now
+ }
// sequence numbers.. used to track what's in the store.
var message_seq_counter = 1L
@@ -294,6 +296,7 @@ class Queue(val router: LocalRouter, val
}
link.enqueue_item_counter = session.enqueue_item_counter
link.enqueue_size_counter = session.enqueue_size_counter
+ link.enqueue_ts = session.enqueue_ts
rc.producers.add(link)
}
@@ -309,8 +312,9 @@ class Queue(val router: LocalRouter, val
link.label = "unknown"
}
link.position = sub.pos.seq
- link.enqueue_item_counter = sub.total_dispatched_count
- link.enqueue_size_counter = sub.total_dispatched_size
+ link.enqueue_item_counter = sub.session.enqueue_item_counter
+ link.enqueue_size_counter = sub.session.enqueue_size_counter
+ link.enqueue_ts = sub.session.enqueue_ts
link.total_ack_count = sub.total_ack_count
link.total_nack_count = sub.total_nack_count
link.acquired_size = sub.acquired_size
@@ -734,30 +738,22 @@ class Queue(val router: LocalRouter, val
def is_persistent = tune_persistent
- def connect(p: DeliveryProducer) = new DeliverySession {
+ class QueueDeliverySession(val producer: DeliveryProducer) extends
DeliverySession with SessionSinkFilter[Delivery]{
retain
-
override def toString = Queue.this.toString
-
override def consumer = Queue.this
- override def producer = p
-
val session_max = producer.send_buffer_size
- val session = session_manager.open(producer.dispatch_queue, session_max)
+ val downstream = session_manager.open(producer.dispatch_queue, session_max)
dispatch_queue {
inbound_sessions += this
addCapacity( session_max )
}
- def remaining_capacity = session.remaining_capacity
- def enqueue_item_counter = session.accepted_count
- def enqueue_size_counter = session.accepted_size
-
def close = {
- session_manager.close(session)
+ session_manager.close(downstream)
dispatch_queue {
addCapacity( -session_max )
inbound_sessions -= this
@@ -765,27 +761,21 @@ class Queue(val router: LocalRouter, val
release
}
- // Delegate all the flow control stuff to the session
- def full = session.full
-
def offer(delivery: Delivery) = {
- if (session.full) {
+ if (downstream.full) {
false
} else {
delivery.message.retain
if( tune_persistent && delivery.uow!=null ) {
delivery.uow.retain
}
- val rc = session.offer(delivery)
+ val rc = downstream.offer(delivery)
assert(rc, "session should accept since it was not full")
true
}
}
-
- def refiller = session.refiller
-
- def refiller_=(value: Runnable) = {session.refiller = value}
}
+ def connect(p: DeliveryProducer) = new QueueDeliverySession(p)
/////////////////////////////////////////////////////////////////////
//
@@ -1720,9 +1710,6 @@ class Subscription(val queue:Queue, val
var avg_advanced_size = queue.tune_consumer_buffer
var tail_parkings = 1
- var total_dispatched_count = 0L
- var total_dispatched_size = 0L
-
var total_ack_count = 0L
var total_nack_count = 0L
@@ -1834,15 +1821,8 @@ class Subscription(val queue:Queue, val
def matches(entry:Delivery) = session.consumer.matches(entry)
def full = session.full
- def offer(delivery:Delivery) = {
- if( session.offer(delivery) ) {
- total_dispatched_count += 1
- total_dispatched_size += delivery.size
- true
- } else {
- false
- }
- }
+
+ def offer(delivery:Delivery) = session.offer(delivery)
def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Aug 26 12:22:24 2011
@@ -230,11 +230,35 @@ class CreditWindowFilter[T](val downstre
}
trait SessionSink[T] extends Sink[T] {
- def accepted_count:Long
- def accepted_size:Long
+ /**
+ * The number of elements accepted by this session.
+ */
+ def enqueue_item_counter:Long
+
+ /**
+ * The total size of the elements accepted by this session.
+ */
+ def enqueue_size_counter:Long
+
+ /**
+ * The total size of the elements accepted by this session.
+ */
+ def enqueue_ts:Long
+ /**
+ * An estimate of the capacity left in the session before it stops
+ * accepting more elements.
+ */
def remaining_capacity:Int
}
+trait SessionSinkFilter[T] extends SessionSink[T] with SinkFilter[T] {
+ def downstream:SessionSink[T]
+ def enqueue_item_counter = downstream.enqueue_item_counter
+ def enqueue_size_counter = downstream.enqueue_size_counter
+ def enqueue_ts = downstream.enqueue_ts
+ def remaining_capacity = downstream.remaining_capacity
+}
+
object SessionSinkMux {
val default_session_max_credits =
System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
}
@@ -313,6 +337,7 @@ class SessionSinkMux[T](val downstream:S
}
}
+ def time_stamp = 0L
}
/**
@@ -326,9 +351,11 @@ class Session[T](val producer_queue:Disp
private def downstream = mux.source
@volatile
- var accepted_count = 0L
+ var enqueue_item_counter = 0L
+ @volatile
+ var enqueue_size_counter = 0L
@volatile
- var accepted_size = 0L
+ var enqueue_ts = 0L
// create a source to coalesce credit events back to the producer side...
val credit_adder = createSource(EventAggregators.INTEGER_ADD ,
producer_queue)
@@ -371,8 +398,9 @@ class Session[T](val producer_queue:Disp
} else {
val size = sizer.size(value)
- accepted_count += 1
- accepted_size += size
+ enqueue_item_counter += 1
+ enqueue_size_counter += size
+ enqueue_ts = mux.time_stamp
add_credits(-size)
downstream.merge((this, value))
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Fri Aug 26 12:22:24 2011
@@ -36,16 +36,16 @@ class Topic(val router:LocalRouter, val
var enqueue_item_counter = 0L
var enqueue_size_counter = 0L
- var enqueue_ts = System.currentTimeMillis()
+ var enqueue_ts = now
var dequeue_item_counter = 0L
var dequeue_size_counter = 0L
- var dequeue_ts = System.currentTimeMillis()
+ var dequeue_ts = now
- var proxy_sessions = new HashSet[ProxyDeliverySession]()
+ var proxy_sessions = new HashSet[TopicDeliverySession]()
implicit def
from_link(from:LinkDTO):(Long,Long,Long)=(from.enqueue_item_counter,
from.enqueue_size_counter, from.enqueue_ts)
- implicit def
from_session(from:ProxyDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter,
from.enqueue_size_counter, from.enqueue_ts)
+ implicit def
from_session(from:TopicDeliverySession):(Long,Long,Long)=(from.enqueue_item_counter,
from.enqueue_size_counter, from.enqueue_ts)
def add_counters(to:LinkDTO, from:(Long,Long,Long)):Unit = {
to.enqueue_item_counter += from._1
@@ -58,7 +58,9 @@ class Topic(val router:LocalRouter, val
to.enqueue_ts = to.enqueue_ts max from._3
}
- case class ProxyDeliverySession(session:DeliverySession) extends
DeliverySession with SinkFilter[Delivery] {
+ case class TopicDeliverySession(session:DeliverySession) extends
DeliverySession with SessionSinkFilter[Delivery] {
+ def downstream = session
+
dispatch_queue {
proxy_sessions.add(this)
}
@@ -78,22 +80,10 @@ class Topic(val router:LocalRouter, val
}
}
- var enqueue_ts = now
- def offer(value: Delivery) = {
- if( session.offer(value) ) {
- enqueue_ts = now
- true
- } else {
- false
- }
- }
-
- def downstream = session
- def remaining_capacity = session.remaining_capacity
def producer = session.producer
- def enqueue_size_counter = session.enqueue_size_counter
- def enqueue_item_counter = session.enqueue_item_counter
def consumer = session.consumer
+
+ def offer(value: Delivery) = downstream.offer(value)
}
case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO)
extends DeliveryConsumer {
@@ -104,7 +94,7 @@ class Topic(val router:LocalRouter, val
def is_persistent = consumer.is_persistent
def dispatch_queue = consumer.dispatch_queue
def connect(producer: DeliveryProducer) = {
- new ProxyDeliverySession(consumer.connect(producer))
+ new TopicDeliverySession(consumer.connect(producer))
}
}
@@ -113,7 +103,7 @@ class Topic(val router:LocalRouter, val
var durable_subscriptions = ListBuffer[Queue]()
var consumer_queues = HashMap[DeliveryConsumer, Queue]()
var idled_at = 0L
- val created_at = System.currentTimeMillis()
+ val created_at = now
var auto_delete_after = 0
var producer_counter = 0L
var consumer_counter = 0L
@@ -230,11 +220,11 @@ class Topic(val router:LocalRouter, val
def check_idle {
if (producers.isEmpty && consumers.isEmpty &&
durable_subscriptions.isEmpty) {
if (idled_at==0) {
- val now = System.currentTimeMillis()
- idled_at = now
+ val previously_idle_at = now
+ idled_at = previously_idle_at
if( auto_delete_after!=0 ) {
dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
- if( now == idled_at ) {
+ if( previously_idle_at == idled_at ) {
router.topic_domain.remove_destination(path, this)
}
}
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Fri Aug 26 12:22:24 2011
@@ -23,8 +23,6 @@ import org.fusesource.hawtdispatch._
import org.fusesource.hawtbuf._
import collection.mutable.{ListBuffer, HashMap}
-import org.apache.activemq.apollo.broker._
-import BufferConversions._
import java.io.IOException
import org.apache.activemq.apollo.selector.SelectorParser
import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
@@ -33,16 +31,19 @@ import org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
import java.util.concurrent.TimeUnit
import java.util.Map.Entry
-import protocol._
import scala.util.continuations._
-import security.SecurityContext
-import support.advisory.AdvisorySupport
import tcp.TcpTransport
import codec.OpenWireFormat
import command._
import
org.apache.activemq.apollo.openwire.dto.{OpenwireConnectionStatusDTO,OpenwireDTO}
import org.apache.activemq.apollo.dto.{AcceptingConnectorDTO,
TopicDestinationDTO, DurableSubscriptionDestinationDTO, DestinationDTO}
import org.apache.activemq.apollo.openwire.DestinationConverter._
+import org.apache.activemq.apollo.broker._
+import BufferConversions._
+import protocol._
+import security.SecurityContext
+import support.advisory.AdvisorySupport
+
object OpenwireProtocolHandler extends Log {
def unit:Unit = {}
@@ -87,6 +88,8 @@ class OpenwireProtocolHandler extends Pr
last_command_id
}
+ def broker = connection.connector.broker
+
var producerRoutes = new LRUCache[List[DestinationDTO],
DeliveryProducerRoute](10) {
override def onCacheEviction(eldest: Entry[List[DestinationDTO],
DeliveryProducerRoute]) = {
host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
@@ -195,7 +198,7 @@ class OpenwireProtocolHandler extends Pr
resumeRead
reset {
suspendRead("virtual host lookup")
- this.host = connection.connector.broker.get_default_virtual_host
+ this.host = broker.get_default_virtual_host
resumeRead
if(host==null) {
async_die("Could not find default virtual host")
@@ -767,7 +770,9 @@ class OpenwireProtocolHandler extends Pr
credit_window_filter.credit(0, info.getPrefetchSize)
- val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery)
+ val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
+ override def time_stamp = broker.now
+ }
override def exclusive = info.isExclusive
override def browser = info.isBrowser
@@ -860,17 +865,14 @@ class OpenwireProtocolHandler extends Pr
}
}
- def connect(p:DeliveryProducer) = new DeliverySession with
SinkFilter[Delivery] {
+ class OpenwireConsumerSession(val producer:DeliveryProducer) extends
DeliverySession with SessionSinkFilter[Delivery] {
+ producer.dispatch_queue.assertExecuting()
retain
- def producer = p
- def consumer = ConsumerContext.this
+ val downstream = session_manager.open(producer.dispatch_queue,
receive_buffer_size)
var closed = false
- val downstream = session_manager.open(producer.dispatch_queue,
receive_buffer_size)
- def remaining_capacity = downstream.remaining_capacity
- def enqueue_item_counter = downstream.accepted_count
- def enqueue_size_counter = downstream.accepted_size
+ def consumer = ConsumerContext.this
def close = {
assert(producer.dispatch_queue.isExecuting)
@@ -928,6 +930,8 @@ class OpenwireProtocolHandler extends Pr
}
}
+ def connect(p:DeliveryProducer) = new OpenwireConsumerSession(p)
+
class TrackedAck(val ack:(DeliveryResult, StoreUOW)=>Unit) {
var credited = false
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Aug 26 12:22:24 2011
@@ -39,7 +39,6 @@ import org.apache.activemq.apollo.transp
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
-import collection.immutable.List._
case class RichBuffer(self:Buffer) extends Proxy {
@@ -85,6 +84,7 @@ class StompProtocolHandler extends Proto
var connection_log:Log = StompProtocolHandler
def protocol = "stomp"
+ def broker = connection.connector.broker
def decode_header(value:Buffer):String = {
var rc = new ByteArrayOutputStream(value.length)
@@ -347,7 +347,9 @@ class StompProtocolHandler extends Proto
credit_window_filter.credit(initial_credit_window._1,
initial_credit_window._2)
- val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery)
+ val session_manager = new SessionSinkMux[Delivery](credit_window_filter,
dispatchQueue, Delivery) {
+ override def time_stamp = broker.now
+ }
override def dispose() = dispatchQueue {
super.dispose()
@@ -373,26 +375,17 @@ class StompProtocolHandler extends Proto
}
}
- def connect(p:DeliveryProducer) = new DeliverySession with
SinkFilter[Delivery] {
-
- // This session object should only be used from the dispatch queue
context
- // of the producer.
-
+ class StompConsumerSession(val producer:DeliveryProducer) extends
DeliverySession with SessionSinkFilter[Delivery] {
+ producer.dispatch_queue.assertExecuting()
retain
+ val downstream = session_manager.open(producer.dispatch_queue,
receive_buffer_size)
+
override def toString = "connection to
"+StompProtocolHandler.this.connection.transport.getRemoteAddress
- def producer = p
def consumer = StompConsumer.this
var closed = false
- val downstream = session_manager.open(producer.dispatch_queue,
receive_buffer_size)
-
- def remaining_capacity = downstream.remaining_capacity
- def enqueue_item_counter = downstream.accepted_count
- def enqueue_size_counter = downstream.accepted_size
-
-
def close = {
assert(producer.dispatch_queue.isExecuting)
if( !closed ) {
@@ -456,6 +449,7 @@ class StompProtocolHandler extends Proto
}
}
+ def connect(p:DeliveryProducer) = new StompConsumerSession(p)
}
// var session_manager:SessionSinkMux[StompFrame] = null
@@ -948,7 +942,7 @@ class StompProtocolHandler extends Proto
}
if( config.add_timestamp_header!=null ) {
- rc ::= (encode_header(config.add_timestamp_header),
ascii(System.currentTimeMillis().toString()))
+ rc ::= (encode_header(config.add_timestamp_header),
ascii(broker.now.toString()))
}
// Do we need to add the user id?
Modified:
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
Fri Aug 26 12:22:24 2011
@@ -119,7 +119,7 @@ case class BrokerResource() extends Reso
result.id = broker.id
result.jvm_metrics = create_jvm_metrics
- result.current_time = System.currentTimeMillis
+ result.current_time = now
result.state = broker.service_state.toString
result.state_since = broker.service_state.since
result.version = Broker.version
@@ -211,7 +211,7 @@ case class BrokerResource() extends Reso
get_queue_metrics(broker)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -223,7 +223,7 @@ case class BrokerResource() extends Reso
get_topic_metrics(broker)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -235,7 +235,7 @@ case class BrokerResource() extends Reso
get_dsub_metrics(broker)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -248,7 +248,7 @@ case class BrokerResource() extends Reso
val rc = aggregate_queue_metrics(List(queue, dsub))
add_destination_metrics(rc, topic)
rc.objects += topic.objects
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -424,7 +424,7 @@ case class BrokerResource() extends Reso
get_queue_metrics(host)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -435,7 +435,7 @@ case class BrokerResource() extends Reso
get_topic_metrics(host)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
@@ -446,7 +446,7 @@ case class BrokerResource() extends Reso
get_dsub_metrics(host)
}
}
- rc.current_time = System.currentTimeMillis()
+ rc.current_time = now
rc
}
Modified:
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala?rev=1162079&r1=1162078&r2=1162079&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
Fri Aug 26 12:22:24 2011
@@ -275,6 +275,8 @@ abstract class Resource(parent:Resource=
}
}
+ def now =
BrokerRegistry.list.headOption.map(_.now).getOrElse(System.currentTimeMillis())
+
protected def with_broker[T](func:
(org.apache.activemq.apollo.broker.Broker)=>FutureResult[T]):FutureResult[T] = {
BrokerRegistry.list.headOption match {
case Some(broker)=>