Author: chirino
Date: Wed Jan 11 19:11:17 2012
New Revision: 1230193
URL: http://svn.apache.org/viewvc?rev=1230193&view=rev
Log:
Simplify the requirements of a message (removed fields which were not needed).
Also have the delivery track the sending destination.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Wed Jan 11 19:11:17 2012
@@ -23,6 +23,7 @@ import org.apache.activemq.apollo.filter
import org.apache.activemq.apollo.broker.store.StoreUOW
import org.apache.activemq.apollo.util.Log
import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import org.apache.activemq.apollo.dto.DestinationDTO
object DeliveryProducer extends Log
@@ -95,16 +96,6 @@ trait DeliverySession extends SessionSin
trait Message extends Filterable with Retained {
/**
- * the globally unique id of the message
- */
- def id: AsciiBuffer
-
- /**
- * the globally unique id of the producer
- */
- def producer: AsciiBuffer
-
- /**
* the message priority.
*/
def priority:Byte
@@ -175,6 +166,11 @@ object Poisoned extends DeliveryResult
class Delivery {
/**
+ * Where the delivery is originating from.
+ */
+ var sender:DestinationDTO = _
+
+ /**
* Total size of the delivery. Used for resource allocation tracking
*/
var size:Int = 0
@@ -219,6 +215,7 @@ class Delivery {
def copy() = (new Delivery).set(this)
def set(other:Delivery) = {
+ sender = other.sender
size = other.size
seq = other.seq
message = other.message
@@ -230,7 +227,7 @@ class Delivery {
def createMessageRecord() = {
val record = message.protocol.encode(message)
- assert( record.size == size )
+ record.size = size
record.locator = storeLocator
record
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Jan 11 19:11:17 2012
@@ -527,6 +527,7 @@ class Queue(val router: LocalRouter, val
val entry = tail_entry
tail_entry = new QueueEntry(Queue.this, next_message_seq)
val queueDelivery = delivery.copy
+ queueDelivery.sender = destination_dto
queueDelivery.seq = entry.seq
entry.init(queueDelivery)
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Wed Jan 11 19:11:17 2012
@@ -131,7 +131,11 @@ class Topic(val router:LocalRouter, val
def producer = session.producer
def consumer = session.consumer
- def offer(value: Delivery) = downstream.offer(value)
+ def offer(value: Delivery) = {
+ val copy = value.copy();
+ copy.sender = destination_dto
+ downstream.offer(copy)
+ }
}
case class ProxyDeliveryConsumer(consumer:DeliveryConsumer, link:LinkDTO,
registered:DeliveryConsumer) extends DeliveryConsumer {
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireMessage.scala
Wed Jan 11 19:11:17 2012
@@ -41,14 +41,10 @@ class OpenwireMessage(val message:Active
def protocol = OpenwireProtocol
- def producer = ascii(message.getProducerId.toString)
-
def priority = message.getPriority
def persistent = message.isPersistent
- def id = _id
-
def expiration = message.getExpiration
def getBodyAs[T](toType : Class[T]) = {
@@ -89,10 +85,8 @@ object EndOfBrowseMessage extends Messag
def retain() {}
def release() {}
def protocol: Protocol = null
- def producer: AsciiBuffer = null
def priority: Byte = 0
def persistent: Boolean = false
- def id: AsciiBuffer = null
def expiration: Long = 0L
def getProperty(name: String): AnyRef = null
def getLocalConnectionId: AnyRef = null
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Wed Jan 11 19:11:17 2012
@@ -42,7 +42,6 @@ object StompCodec extends Log {
val rc = new MessageRecord
rc.protocol = PROTOCOL
- rc.size = frame.size
rc.expiration = message.expiration
if( frame.content.isInstanceOf[ZeroCopyContent] ) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Jan 11 19:11:17 2012
@@ -45,11 +45,6 @@ case class StompFrameMessage(frame:Stomp
var id: AsciiBuffer = null
/**
- * the globally unique id of the producer
- */
- var producer: AsciiBuffer = null
-
- /**
* the message priority.
*/
var priority:Byte = 4;
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1230193&r1=1230192&r2=1230193&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Jan 11 19:11:17 2012
@@ -128,6 +128,10 @@ class StompProtocolHandler extends Proto
}
protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
+
+ def id(message:Message) = {
+ message.asInstanceOf[StompFrameMessage].id
+ }
class StompConsumer (
@@ -249,9 +253,9 @@ class StompProtocolHandler extends Proto
} else {
if( protocol_version eq V1_0 ) {
// register on the connection since 1.0 acks may not include the
subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
+ connection_ack_handlers += ( id(delivery.message) -> this )
}
- consumer_acks += delivery.message.id -> new
TrackedAck(Some(delivery.size), delivery.ack )
+ consumer_acks += id(delivery.message) -> new
TrackedAck(Some(delivery.size), delivery.ack )
}
}
@@ -337,9 +341,9 @@ class StompProtocolHandler extends Proto
} else {
if( protocol_version eq V1_0 ) {
// register on the connection since 1.0 acks may not include the
subscription id
- connection_ack_handlers += ( delivery.message.id-> this )
+ connection_ack_handlers += ( id(delivery.message) -> this )
}
- consumer_acks += delivery.message.id -> new
TrackedAck(Some(delivery.size), delivery.ack)
+ consumer_acks += id(delivery.message) -> new
TrackedAck(Some(delivery.size), delivery.ack)
}
}
@@ -555,7 +559,7 @@ class StompProtocolHandler extends Proto
var codec:StompCodec = _
implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
- val rc = destination_parser.decode_destination(value.toString)
+ val rc = destination_parser.decode_multi_destination(value.toString)
if( rc==null ) {
throw new ProtocolException("Invalid stomp destination name: "+value);
}