Author: chirino
Date: Tue Nov 2 17:25:47 2010
New Revision: 1030134
URL: http://svn.apache.org/viewvc?rev=1030134&view=rev
Log:
Updating the consumer interface so that the router can tell if it needs to
persist a message or not by asking the consumer if it's persistent.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1030134&r1=1030133&r2=1030134&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Tue Nov 2 17:25:47 2010
@@ -56,6 +56,7 @@ trait DeliveryConsumer extends Retained
def dispatchQueue:DispatchQueue;
def matches(message:Delivery):Boolean
def connect(producer:DeliveryProducer):DeliverySession
+ def is_persistent:Boolean
}
/**
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1030134&r1=1030133&r2=1030134&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Nov 2 17:25:47 2010
@@ -460,6 +460,8 @@ class Queue(val host: VirtualHost, var i
def matches(delivery: Delivery) = filter.matches(delivery.message)
+ def is_persistent = tune_persistent
+
def connect(p: DeliveryProducer) = new DeliverySession {
retain
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1030134&r1=1030133&r2=1030134&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Tue Nov 2 17:25:47 2010
@@ -416,25 +416,21 @@ case class DeliveryProducerRoute(val rou
} else {
// Do we need to store the message if we have a matching consumer?
- var storeOnMatch = delivery.message.persistent && router.host.store!=null
delivery.message.retain
-
targets.foreach { target=>
// only deliver to matching consumers
if( target.consumer.matches(delivery) ) {
- if( storeOnMatch ) {
+ if( delivery.storeKey == -1L && target.consumer.is_persistent ) {
if( delivery.uow==null ) {
delivery.uow = router.host.store.createStoreUOW
} else {
delivery.uow.retain
}
delivery.storeKey =
delivery.uow.store(delivery.createMessageRecord)
- storeOnMatch = false
}
-
if( !target.offer(delivery) ) {
overflowSessions ::= target
}
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1030134&r1=1030133&r2=1030134&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Tue Nov 2 17:25:47 2010
@@ -282,6 +282,8 @@ class StompProtocolHandler extends Proto
override def connection = Some(StompProtocolHandler.this.connection)
+ def is_persistent = false
+
def matches(delivery:Delivery) = {
if( delivery.message.protocol eq StompProtocol ) {
if( selector!=null ) {