Repository: activemq
Updated Branches:
  refs/heads/trunk 7948d6905 -> dbb1d8b83


https://issues.apache.org/jira/browse/AMQ-5513 - have lastDeliveredSequence -1 
indicate nothing was received, respect this value in queue sub remove and 
durable sub deactivate. 0 still constitutes no info - so old clients see same 
behaviour as before


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dbb1d8b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dbb1d8b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dbb1d8b8

Branch: refs/heads/trunk
Commit: dbb1d8b83d2ee5c5a38c55834a70cd21a2dd2ad0
Parents: 05f6cd6
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Jan 9 13:28:58 2015 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Fri Jan 9 13:30:59 2015 +0000

----------------------------------------------------------------------
 .../broker/region/DurableTopicSubscription.java | 14 +++---
 .../apache/activemq/broker/region/Queue.java    |  4 +-
 .../apache/activemq/broker/region/Topic.java    |  2 +-
 .../activemq/broker/region/TopicRegion.java     |  4 +-
 .../activemq/ActiveMQMessageConsumer.java       |  2 +-
 .../org/apache/activemq/JmsRedeliveredTest.java | 47 ++++++++++++++++++++
 6 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 4c19c62..8df2819 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -184,7 +184,7 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
         }
     }
 
-    public void deactivate(boolean keepDurableSubsActive) throws Exception {
+    public void deactivate(boolean keepDurableSubsActive, long 
lastDeliveredSequenceId) throws Exception {
         LOG.debug("Deactivating keepActive={}, {}", keepDurableSubsActive, 
this);
         active.set(false);
         offlineTimestamp.set(System.currentTimeMillis());
@@ -214,11 +214,13 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 
                 for (final MessageReference node : dispatched) {
                     // Mark the dispatched messages as redelivered for next 
time.
-                    Integer count = 
redeliveredMessages.get(node.getMessageId());
-                    if (count != null) {
-                        redeliveredMessages.put(node.getMessageId(), 
Integer.valueOf(count.intValue() + 1));
-                    } else {
-                        redeliveredMessages.put(node.getMessageId(), 
Integer.valueOf(1));
+                    if (lastDeliveredSequenceId == 0 || 
(lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= 
lastDeliveredSequenceId)) {
+                        Integer count = 
redeliveredMessages.get(node.getMessageId());
+                        if (count != null) {
+                            redeliveredMessages.put(node.getMessageId(), 
Integer.valueOf(count.intValue() + 1));
+                        } else {
+                            redeliveredMessages.put(node.getMessageId(), 
Integer.valueOf(1));
+                        }
                     }
                     if (keepDurableSubsActive && pending.isTransient()) {
                         pending.addMessageFirst(node);

http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index f5f2efe..89b9081 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -547,12 +547,12 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 List<MessageReference> unAckedMessages = sub.remove(context, 
this);
 
                 // locate last redelivered in unconsumed list (list in 
delivery rather than seq order)
-                if (lastDeiveredSequenceId != 0) {
+                if (lastDeiveredSequenceId > 0) {
                     for (MessageReference ref : unAckedMessages) {
                         if (ref.getMessageId().getBrokerSequenceId() == 
lastDeiveredSequenceId) {
                             lastDeliveredRef = ref;
                             markAsRedelivered = true;
-                            LOG.debug("found lastDeliveredSeqID: {}, message 
reference: {}", lastDeiveredSequenceId, ref.getMessageId());
+                            LOG.error("found lastDeliveredSeqID: {}, message 
reference: {}", lastDeiveredSequenceId, ref.getMessageId());
                             break;
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index cd144c3..eff9619 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -203,7 +203,7 @@ public class Topic extends BaseDestination implements Task {
             if (removed != null) {
                 destinationStatistics.getConsumers().decrement();
                 // deactivate and remove
-                removed.deactivate(false);
+                removed.deactivate(false, 0l);
                 consumers.remove(removed);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index cea5bb7..383f240 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -155,7 +155,7 @@ public class TopicRegion extends AbstractRegion {
                     if ((sub.context != context) || (sub.info != info)) {
                         sub.info = info;
                         sub.context = context;
-                        sub.deactivate(keepDurableSubsActive);
+                        sub.deactivate(keepDurableSubsActive, 
info.getLastDeliveredSequenceId());
                     }
                     subscriptions.put(info.getConsumerId(), sub);
                 }
@@ -185,7 +185,7 @@ public class TopicRegion extends AbstractRegion {
                 // as what is in the sub. otherwise, during linksteal
                 // sub will get new context, but will be removed here
                 if (sub.getContext() == context)
-                    sub.deactivate(keepDurableSubsActive);
+                    sub.deactivate(keepDurableSubsActive, 
info.getLastDeliveredSequenceId());
             }
         } else {
             super.removeConsumer(context, info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index f43b56d..0808ead 100755
--- 
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -147,7 +147,7 @@ public class ActiveMQMessageConsumer implements 
MessageAvailableConsumer, StatsC
     AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
     private MessageAck pendingAck;
-    private long lastDeliveredSequenceId;
+    private long lastDeliveredSequenceId = -1;
 
     private IOException failureError;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/dbb1d8b8/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
index 63c5911..72a1a28 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -400,6 +401,52 @@ public class JmsRedeliveredTest extends TestCase {
         session.close();
     }
 
+    public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws 
Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        TimeUnit.SECONDS.sleep(1);
+        consumer.close();
+
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        assertFalse("Message should not be redelivered.", 
msg.getJMSRedelivered());
+        session.close();
+    }
+
+    public void testNoReceiveDurableConsumerDoesNotIncrementRedelivery() 
throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Topic topic = session.createTopic("topic-" + getName());
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"sub");
+
+        MessageProducer producer = createProducer(session, topic);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        TimeUnit.SECONDS.sleep(1);
+        consumer.close();
+
+        consumer = session.createDurableSubscriber(topic, "sub");
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+        assertFalse("Message should not be redelivered.", 
msg.getJMSRedelivered());
+        session.close();
+    }
+
     /**
      * Creates a text message.
      * 

Reply via email to