Author: chirino
Date: Thu Aug 25 17:51:02 2011
New Revision: 1161668
URL: http://svn.apache.org/viewvc?rev=1161668&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-75 : Apollo does not set the
redelivered header on redelivered messages
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Thu Aug 25 17:51:02 2011
@@ -140,19 +140,34 @@ object Delivery extends Sizer[Delivery]
}
sealed trait DeliveryResult
-/** message was processed, does not need redelivery */
+
+/**
+ * message was delivered and processed, does not need redelivery
+ */
+object Consumed extends DeliveryResult
+
+/**
+ * The message was delivered but not consumed, it should be redelivered to
another consumer ASAP.
+ * The redelivery counter should increment.
+ */
object Delivered extends DeliveryResult
-/** message expired before it could be processed, does not need redelivery */
+
+/**
+ * The message was not delivered so it should be redelivered to another
consumer but not effect
+ * it's redelivery counter.
+ */
+object Undelivered extends DeliveryResult
+
+/**
+ * message expired before it could be processed, does not need redelivery
+ */
object Expired extends DeliveryResult
+
/**
- * The receiver thinks the message was poison message, it was not successfully
- * processed and it should not get redelivered..
- */
+ * The receiver thinks the message was poison message, it was not successfully
+ * processed and it should not get redelivered..
+ */
object Poisoned extends DeliveryResult
-/**
- * The message was not consumed, it should be redelivered to another consumer
ASAP.
- */
-object Undelivered extends DeliveryResult
class Delivery {
@@ -183,6 +198,11 @@ class Delivery {
var uow:StoreUOW = null
/**
+ * The number of redeliveries that this message has seen.
+ */
+ var redeliveries:Short = 0
+
+ /**
* Set if the producer requires an ack to be sent back. Consumer
* should execute once the message is processed.
*/
@@ -195,6 +215,7 @@ class Delivery {
message = other.message
storeKey = other.storeKey
storeLocator = other.storeLocator
+ redeliveries = other.redeliveries
this
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Aug 25 17:51:02 2011
@@ -30,6 +30,7 @@ import security.SecurityContext
import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
import org.fusesource.hawtbuf.Buffer
+import java.lang.UnsupportedOperationException
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
@@ -575,13 +576,19 @@ class Queue(val router: LocalRouter, val
ack_source.getData.foreach {
case (entry, consumed, uow) =>
consumed match {
- case Delivered =>
+ case Consumed =>
entry.ack(uow)
- case Expired =>
+ case Expired=>
entry.entry.queue.expired(entry.entry, false)
entry.ack(uow)
- case Poisoned => entry.nack
- case Undelivered => entry.nack
+ case Delivered =>
+ entry.entry.redelivered
+ entry.nack
+ case Poisoned =>
+ entry.entry.redelivered
+ entry.nack
+ case Undelivered =>
+ entry.nack
}
if( uow!=null ) {
uow.release()
@@ -831,7 +838,7 @@ class QueueEntry(val queue:Queue, val se
def init(qer:QueueEntryRecord):QueueEntry = {
val locator = new
AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
- state = new Swapped(qer.message_key, locator, qer.size, qer.expiration)
+ state = new Swapped(qer.message_key, locator, qer.size, qer.expiration,
qer.redeliveries)
this
}
@@ -927,6 +934,8 @@ class QueueEntry(val queue:Queue, val se
def count = state.count
def size = state.size
def expiration = state.expiration
+ def redeliveries = state.redeliveries
+ def redelivered = state.redelivered
def messageKey = state.message_key
def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
def dispatch() = state.dispatch
@@ -966,6 +975,16 @@ class QueueEntry(val queue:Queue, val se
def expiration = 0L
/**
+ * When the entry expires or 0 if it does not expire.
+ */
+ def redeliveries:Short = throw new UnsupportedOperationException()
+
+ /**
+ * Called to increment the redelivery counter
+ */
+ def redelivered:Unit = {}
+
+ /**
* Gets number of messages that this entry represents
*/
def count = 0
@@ -1109,6 +1128,9 @@ class QueueEntry(val queue:Queue, val se
override def expiration = delivery.message.expiration
override def message_key = delivery.storeKey
override def message_locator = delivery.storeLocator
+ override def redeliveries = delivery.redeliveries
+
+ override def redelivered = delivery.redeliveries =
((delivery.redeliveries+1).min(Short.MaxValue)).toShort
var remove_pending = false
@@ -1184,7 +1206,7 @@ class QueueEntry(val queue:Queue, val se
queue.swap_out_size_counter += size
queue.swap_out_item_counter += 1
- state = new Swapped(delivery.storeKey, delivery.storeLocator, size,
expiration)
+ state = new Swapped(delivery.storeKey, delivery.storeLocator, size,
expiration, redeliveries)
if( can_combine_with_prev ) {
getPrevious.as_swapped_range.combineNext
}
@@ -1332,12 +1354,14 @@ class QueueEntry(val queue:Queue, val se
* entry is persisted, it can move into this state. This state only holds
onto the
* the massage key so that it can reload the message from the store quickly
when needed.
*/
- class Swapped(override val message_key:Long, override val
message_locator:AtomicReference[Array[Byte]], override val size:Int, override
val expiration:Long) extends EntryState {
+ class Swapped(override val message_key:Long, override val
message_locator:AtomicReference[Array[Byte]], override val size:Int, override
val expiration:Long, var _redeliveries:Short) extends EntryState {
queue.individual_swapped_items += 1
var swapping_in = false
+ override def redeliveries = _redeliveries
+ override def redelivered = _redeliveries =
((_redeliveries+1).min(Short.MaxValue)).toShort
override def count = 1
@@ -1390,6 +1414,7 @@ class QueueEntry(val queue:Queue, val se
delivery.size = messageRecord.size
delivery.storeKey = messageRecord.key
delivery.storeLocator = messageRecord.locator
+ delivery.redeliveries = redeliveries
queue.swapped_in_size += delivery.size
queue.swapped_in_items += 1
@@ -1629,6 +1654,7 @@ class Subscription(val queue:Queue, val
while( next !=null ) {
val cur = next;
next = next.getNext
+ cur.entry.redelivered
cur.nack // this unlinks the entry.
}
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Aug 25 17:51:02 2011
@@ -186,11 +186,11 @@ abstract class DeliveryProducerRoute(rou
if (delivery.uow != null) {
val ack = pendingAck
delivery.uow.on_complete {
- ack(Delivered, null)
+ ack(Consumed, null)
}
} else {
- pendingAck(Delivered, null)
+ pendingAck(Consumed, null)
}
pendingAck==null
}
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Thu Aug 25 17:51:02 2011
@@ -983,12 +983,12 @@ class OpenwireProtocolHandler extends Pr
val msgid = messageAck.getLastMessageId
val consumed = messageAck.getAckType match {
- case MessageAck.DELIVERED_ACK_TYPE => Delivered
- case MessageAck.INDIVIDUAL_ACK_TYPE => Delivered
- case MessageAck.STANDARD_ACK_TYPE => Delivered
+ case MessageAck.DELIVERED_ACK_TYPE => Consumed
+ case MessageAck.INDIVIDUAL_ACK_TYPE => Consumed
+ case MessageAck.STANDARD_ACK_TYPE => Consumed
case MessageAck.POSION_ACK_TYPE => Poisoned
- case MessageAck.REDELIVERED_ACK_TYPE => Undelivered
- case MessageAck.UNMATCHED_ACK_TYPE => Delivered
+ case MessageAck.REDELIVERED_ACK_TYPE => Delivered
+ case MessageAck.UNMATCHED_ACK_TYPE => Consumed
}
if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Aug 25 17:51:02 2011
@@ -39,6 +39,7 @@ import org.apache.activemq.apollo.transp
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
+import collection.immutable.List._
case class RichBuffer(self:Buffer) extends Proxy {
@@ -190,7 +191,7 @@ class StompProtocolHandler extends Proto
def track(delivery:Delivery) = {
if( delivery.ack!=null ) {
- delivery.ack(Delivered, null)
+ delivery.ack(Consumed, null)
}
ack_source.merge((delivery.size, 1))
}
@@ -336,6 +337,11 @@ class StompProtocolHandler extends Proto
if( subscription_id != None ) {
frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
}
+ if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
+ val header = encode_header(config.add_redeliveries_header)
+ val value = ascii(delivery.redeliveries.toString())
+ frame = frame.append_headers((header, value)::Nil)
+ }
frame
}, Delivery)
@@ -774,7 +780,7 @@ class StompProtocolHandler extends Proto
connected_headers +=
SERVER->encode_header("apache-apollo/"+Broker.version)
- session_id = encode_header("%s-%x".format(this.host.config.id,
this.host.session_counter.incrementAndGet))
+ session_id = encode_header("%s-%x-".format(this.host.config.id,
this.host.session_counter.incrementAndGet))
connected_headers += SESSION->session_id
val outbound_heart_beat_header =
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
@@ -1160,11 +1166,11 @@ class StompProtocolHandler extends Proto
}
def on_stomp_ack(frame:StompFrame):Unit = {
- on_stomp_ack(frame.headers, Delivered)
+ on_stomp_ack(frame.headers, Consumed)
}
def on_stomp_nack(frame:StompFrame):Unit = {
- on_stomp_ack(frame.headers, Undelivered)
+ on_stomp_ack(frame.headers, Delivered)
}
def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
(original)
+++
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
Thu Aug 25 17:51:02 2011
@@ -43,11 +43,19 @@ public class StompDTO extends ProtocolDT
/**
* If set, it will add the configured header name with the value
- * set the a timestamp of when the message is recieved.
+ * set the a timestamp of when the message is received.
*/
@XmlAttribute(name="add_timestamp_header")
public String add_timestamp_header;
+ /**
+ * If set, the configured header will be added to message
+ * sent to consumer if the message is a redelivery. It will be
+ * set to the number of re-deliveries that have occurred.
+ */
+ @XmlAttribute(name="add_redeliveries_header")
+ public String add_redeliveries_header;
+
@XmlAttribute(name="max_header_length")
public Integer max_header_length;
Modified:
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL:
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
---
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
(original)
+++
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Thu Aug 25 17:51:02 2011
@@ -973,6 +973,10 @@ following configuration attributes:
message received. The value of the header will be set to the time the
message
was received. The time will be represented as the number of milliseconds
elapsed
since the UNIX epoch in GMT. Not set by default.
+* `add_redeliveries_header` : Name of the header which will be added to
messages
+ sent to consumers if the messages has been redelivered. The value of the
header
+ will be set to the number of times the message has been redeliverd. Not set
+ by default.
* `max_header_length` : The maximum allowed length of a STOMP header. Defaults
to 10240 (10k).
* `max_headers` : The maximum number of allowed headers in a frame. Defaults