Author: chirino
Date: Thu Aug 25 17:51:02 2011
New Revision: 1161668

URL: http://svn.apache.org/viewvc?rev=1161668&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-75 : Apollo does not set the 
redelivered header on redelivered messages

Modified:
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
    
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
 Thu Aug 25 17:51:02 2011
@@ -140,19 +140,34 @@ object Delivery extends Sizer[Delivery] 
 }
 
 sealed trait DeliveryResult
-/** message was processed, does not need redelivery */
+
+/**
+ * message was delivered and processed, does not need redelivery
+ */
+object Consumed extends DeliveryResult
+
+/**
+ * The message was delivered but not consumed, it should be redelivered to 
another consumer ASAP.
+ * The redelivery counter should increment.
+ */
 object Delivered extends DeliveryResult
-/** message expired before it could be processed, does not need redelivery */
+
+/**
+ * The message was not delivered so it should be redelivered to another 
consumer but not effect
+ * it's redelivery counter.
+ */
+object Undelivered extends DeliveryResult
+
+/**
+ * message expired before it could be processed, does not need redelivery
+ */
 object Expired extends DeliveryResult
+
 /**
-  * The receiver thinks the message was poison message, it was not successfully
-  * processed and it should not get redelivered..
-  */
+ * The receiver thinks the message was poison message, it was not successfully
+ * processed and it should not get redelivered..
+ */
 object Poisoned extends DeliveryResult
-/**
-  * The message was not consumed, it should be redelivered to another consumer 
ASAP.
-  */
-object Undelivered extends DeliveryResult
 
 class Delivery {
 
@@ -183,6 +198,11 @@ class Delivery {
   var uow:StoreUOW = null
 
   /**
+   * The number of redeliveries that this message has seen.
+   */
+  var redeliveries:Short = 0
+
+  /**
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
@@ -195,6 +215,7 @@ class Delivery {
     message = other.message
     storeKey = other.storeKey
     storeLocator = other.storeLocator
+    redeliveries = other.redeliveries
     this
   }
 

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
 Thu Aug 25 17:51:02 2011
@@ -30,6 +30,7 @@ import security.SecurityContext
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong, AtomicInteger}
 import org.fusesource.hawtbuf.Buffer
+import java.lang.UnsupportedOperationException
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -575,13 +576,19 @@ class Queue(val router: LocalRouter, val
     ack_source.getData.foreach {
       case (entry, consumed, uow) =>
         consumed match {
-          case Delivered   =>
+          case Consumed =>
             entry.ack(uow)
-          case Expired     =>
+          case Expired=>
             entry.entry.queue.expired(entry.entry, false)
             entry.ack(uow)
-          case Poisoned    => entry.nack
-          case Undelivered => entry.nack
+          case Delivered =>
+            entry.entry.redelivered
+            entry.nack
+          case Poisoned    =>
+            entry.entry.redelivered
+            entry.nack
+          case Undelivered =>
+            entry.nack
         }
         if( uow!=null ) {
           uow.release()
@@ -831,7 +838,7 @@ class QueueEntry(val queue:Queue, val se
 
   def init(qer:QueueEntryRecord):QueueEntry = {
     val locator = new 
AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
-    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration)
+    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration, 
qer.redeliveries)
     this
   }
 
@@ -927,6 +934,8 @@ class QueueEntry(val queue:Queue, val se
   def count = state.count
   def size = state.size
   def expiration = state.expiration
+  def redeliveries = state.redeliveries
+  def redelivered = state.redelivered
   def messageKey = state.message_key
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def dispatch() = state.dispatch
@@ -966,6 +975,16 @@ class QueueEntry(val queue:Queue, val se
     def expiration = 0L
 
     /**
+     * When the entry expires or 0 if it does not expire.
+     */
+    def redeliveries:Short = throw new UnsupportedOperationException()
+
+    /**
+     * Called to increment the redelivery counter
+     */
+    def redelivered:Unit = {}
+
+    /**
      * Gets number of messages that this entry represents
      */
     def count = 0
@@ -1109,6 +1128,9 @@ class QueueEntry(val queue:Queue, val se
     override def expiration = delivery.message.expiration
     override def message_key = delivery.storeKey
     override def message_locator = delivery.storeLocator
+    override def redeliveries = delivery.redeliveries
+
+    override def redelivered = delivery.redeliveries = 
((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
     var remove_pending = false
 
@@ -1184,7 +1206,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, 
expiration)
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, 
expiration, redeliveries)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1332,12 +1354,14 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds 
onto the
    * the massage key so that it can reload the message from the store quickly 
when needed.
    */
-  class Swapped(override val message_key:Long, override val 
message_locator:AtomicReference[Array[Byte]], override val size:Int, override 
val expiration:Long) extends EntryState {
+  class Swapped(override val message_key:Long, override val 
message_locator:AtomicReference[Array[Byte]], override val size:Int, override 
val expiration:Long, var _redeliveries:Short) extends EntryState {
 
     queue.individual_swapped_items += 1
 
     var swapping_in = false
 
+    override def redeliveries = _redeliveries
+    override def redelivered = _redeliveries = 
((_redeliveries+1).min(Short.MaxValue)).toShort
 
     override def count = 1
 
@@ -1390,6 +1414,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
         delivery.storeLocator = messageRecord.locator
+        delivery.redeliveries = redeliveries
 
         queue.swapped_in_size += delivery.size
         queue.swapped_in_items += 1
@@ -1629,6 +1654,7 @@ class Subscription(val queue:Queue, val 
       while( next !=null ) {
         val cur = next;
         next = next.getNext
+        cur.entry.redelivered
         cur.nack // this unlinks the entry.
       }
 

Modified: 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
 Thu Aug 25 17:51:02 2011
@@ -186,11 +186,11 @@ abstract class DeliveryProducerRoute(rou
       if (delivery.uow != null) {
         val ack = pendingAck
         delivery.uow.on_complete {
-          ack(Delivered, null)
+          ack(Consumed, null)
         }
 
       } else {
-        pendingAck(Delivered, null)
+        pendingAck(Consumed, null)
       }
       pendingAck==null
     }

Modified: 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
 Thu Aug 25 17:51:02 2011
@@ -983,12 +983,12 @@ class OpenwireProtocolHandler extends Pr
 
         val msgid = messageAck.getLastMessageId
         val consumed = messageAck.getAckType match {
-          case MessageAck.DELIVERED_ACK_TYPE => Delivered
-          case MessageAck.INDIVIDUAL_ACK_TYPE => Delivered
-          case MessageAck.STANDARD_ACK_TYPE => Delivered
+          case MessageAck.DELIVERED_ACK_TYPE => Consumed
+          case MessageAck.INDIVIDUAL_ACK_TYPE => Consumed
+          case MessageAck.STANDARD_ACK_TYPE => Consumed
           case MessageAck.POSION_ACK_TYPE => Poisoned
-          case MessageAck.REDELIVERED_ACK_TYPE => Undelivered
-          case MessageAck.UNMATCHED_ACK_TYPE => Delivered
+          case MessageAck.REDELIVERED_ACK_TYPE => Delivered
+          case MessageAck.UNMATCHED_ACK_TYPE => Consumed
         }
 
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
 Thu Aug 25 17:51:02 2011
@@ -39,6 +39,7 @@ import org.apache.activemq.apollo.transp
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
+import collection.immutable.List._
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -190,7 +191,7 @@ class StompProtocolHandler extends Proto
 
       def track(delivery:Delivery) = {
         if( delivery.ack!=null ) {
-          delivery.ack(Delivered, null)
+          delivery.ack(Consumed, null)
         }
         ack_source.merge((delivery.size, 1))
       }
@@ -336,6 +337,11 @@ class StompProtocolHandler extends Proto
       if( subscription_id != None ) {
         frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
       }
+      if( config.add_redeliveries_header!=null && delivery.redeliveries > 0) {
+        val header = encode_header(config.add_redeliveries_header)
+        val value = ascii(delivery.redeliveries.toString())
+        frame = frame.append_headers((header, value)::Nil)
+      }
       frame
     }, Delivery)
 
@@ -774,7 +780,7 @@ class StompProtocolHandler extends Proto
 
       connected_headers += 
SERVER->encode_header("apache-apollo/"+Broker.version)
 
-      session_id = encode_header("%s-%x".format(this.host.config.id, 
this.host.session_counter.incrementAndGet))
+      session_id = encode_header("%s-%x-".format(this.host.config.id, 
this.host.session_counter.incrementAndGet))
       connected_headers += SESSION->session_id
 
       val outbound_heart_beat_header = 
ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
@@ -1160,11 +1166,11 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_ack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, Delivered)
+    on_stomp_ack(frame.headers, Consumed)
   }
 
   def on_stomp_nack(frame:StompFrame):Unit = {
-    on_stomp_ack(frame.headers, Undelivered)
+    on_stomp_ack(frame.headers, Delivered)
   }
 
   def on_stomp_ack(headers:HeaderMap, consumed:DeliveryResult):Unit = {

Modified: 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
 (original)
+++ 
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
 Thu Aug 25 17:51:02 2011
@@ -43,11 +43,19 @@ public class StompDTO extends ProtocolDT
 
     /**
      * If set, it will add the configured header name with the value
-     * set the a timestamp of when the message is recieved.
+     * set the a timestamp of when the message is received.
      */
     @XmlAttribute(name="add_timestamp_header")
     public String add_timestamp_header;
 
+    /**
+     * If set, the configured header will be added to message
+     * sent to consumer if the message is a redelivery.  It will be
+     * set to the number of re-deliveries that have occurred.
+     */
+    @XmlAttribute(name="add_redeliveries_header")
+    public String add_redeliveries_header;
+
     @XmlAttribute(name="max_header_length")
     public Integer max_header_length;
 

Modified: 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: 
http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1161668&r1=1161667&r2=1161668&view=diff
==============================================================================
--- 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md 
(original)
+++ 
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md 
Thu Aug 25 17:51:02 2011
@@ -973,6 +973,10 @@ following configuration attributes:
   message received.  The value of the header will be set to the time the 
message
   was received.  The time will be represented as the number of milliseconds 
elapsed
   since the UNIX epoch in GMT.  Not set by default.
+* `add_redeliveries_header` :  Name of the header which will be added to 
messages
+  sent to consumers if the messages has been redelivered.  The value of the 
header 
+  will be set to the number of times the message has been redeliverd.  Not set 
+  by default.
 * `max_header_length` : The maximum allowed length of a STOMP header. Defaults 
   to 10240 (10k).
 * `max_headers` : The maximum number of allowed headers in a frame.  Defaults 


Reply via email to