QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment 
deliveryCount when MessageAcquireMode.NOT_ACQUIRED


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a6408e15
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a6408e15
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a6408e15

Branch: refs/heads/master
Commit: a6408e156a717e8e91aa95e3220ed300c71c52bd
Parents: 7851928
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:50:42 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:50:42 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/ConsumerTarget_0_10.java     | 128 ++++++++++++++++---
 ...ExplicitAcceptDispositionChangeListener.java |  73 -----------
 ...ImplicitAcceptDispositionChangeListener.java |  80 ------------
 .../server/protocol/v0_10/ServerSession.java    |   6 +-
 4 files changed, 110 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 21156de..8d42d2b 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -305,6 +305,13 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
 
 
         _postIdSettingAction.setXfr(xfr);
+        _postIdSettingAction.setAction(null);
+
+        if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+        {
+            entry.incrementDeliveryCount();
+        }
+
         if(_acceptMode == MessageAcceptMode.EXPLICIT)
         {
             _postIdSettingAction.setAction(new 
ExplicitAcceptDispositionChangeListener(entry, this, consumer));
@@ -313,11 +320,6 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         {
             _postIdSettingAction.setAction(new 
ImplicitAcceptDispositionChangeListener(entry, this, consumer));
         }
-        else
-        {
-            _postIdSettingAction.setAction(null);
-        }
-
 
         _session.sendMessage(xfr, _postIdSettingAction);
         xfr.dispose();
@@ -328,7 +330,6 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         _postIdSettingAction.setAction(null);
         _postIdSettingAction.setXfr(null);
 
-        entry.incrementDeliveryCount();
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == 
MessageAcquireMode.PRE_ACQUIRED)
         {
             forceDequeue(entry, false);
@@ -403,27 +404,14 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
 
     void reject(final MessageInstanceConsumer consumer, final MessageInstance 
entry)
     {
-        entry.setRedelivered();
         if (entry.makeAcquisitionUnstealable(consumer))
         {
             entry.routeToAlternate(null, null);
         }
     }
 
-    void release(final MessageInstanceConsumer consumer,
-                 final MessageInstance entry,
-                 final boolean setRedelivered)
+    void release(final MessageInstanceConsumer consumer, final MessageInstance 
entry)
     {
-        if (setRedelivered)
-        {
-            entry.setRedelivered();
-        }
-
-        if (getSession().isClosing() || !setRedelivered)
-        {
-            entry.decrementDeliveryCount();
-        }
-
         if (isMaxDeliveryLimitReached(entry))
         {
             sendToDLQOrDiscard(consumer, entry);
@@ -434,7 +422,7 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
-    protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, 
MessageInstance entry)
+    private void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, 
MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 
@@ -625,4 +613,102 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
+    static abstract class AbstractDispositionChangeListener implements 
ServerSession.MessageDispositionChangeListener
+    {
+        final MessageInstance _entry;
+        final ConsumerTarget_0_10 _target;
+        final MessageInstanceConsumer _consumer;
+
+        AbstractDispositionChangeListener(final MessageInstance entry,
+                                          final ConsumerTarget_0_10 target,
+                                          final MessageInstanceConsumer 
consumer)
+        {
+            _entry = entry;
+            _target = target;
+            _consumer = consumer;
+        }
+
+        @Override
+        public final void onRelease(boolean setRedelivered, final boolean 
closing)
+        {
+            _target.release(_consumer, _entry);
+
+            if (setRedelivered)
+            {
+                _entry.setRedelivered();
+            }
+
+            if (closing || !setRedelivered)
+            {
+                _entry.decrementDeliveryCount();
+            }
+        }
+
+        @Override
+        public final void onReject()
+        {
+            _entry.setRedelivered();
+            _target.reject(_consumer, _entry);
+        }
+    }
+
+    static class ImplicitAcceptDispositionChangeListener extends 
AbstractDispositionChangeListener
+    {
+
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+        ImplicitAcceptDispositionChangeListener(final MessageInstance entry,
+                                                final ConsumerTarget_0_10 
target,
+                                                final MessageInstanceConsumer 
consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            LOGGER.warn("MessageAccept received for message which is using 
NONE as the accept mode (likely client error)");
+        }
+
+        @Override
+        public boolean acquire()
+        {
+            boolean acquired = _entry.acquire(_consumer);
+            if(acquired)
+            {
+                _entry.incrementDeliveryCount();
+                _target.addUnacknowledgedMessage(_entry);
+            }
+            return acquired;
+        }
+    }
+
+    static class ExplicitAcceptDispositionChangeListener extends 
AbstractDispositionChangeListener
+    {
+
+        ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+                                                ConsumerTarget_0_10 target,
+                                                final MessageInstanceConsumer 
consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            _target.acknowledge(_consumer, _entry);
+        }
+        @Override
+        public boolean acquire()
+        {
+            final boolean acquired = _entry.acquire(_consumer);
+            if (acquired)
+            {
+                _entry.incrementDeliveryCount();
+            }
+            return acquired;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index 427355b..0000000
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-
-class ExplicitAcceptDispositionChangeListener implements 
ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ExplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final 
MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        _target.acknowledge(_consumer, _entry);
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        return _entry.acquire(_consumer);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index bcfa205..0000000
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-class ImplicitAcceptDispositionChangeListener implements 
ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final 
MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        LOGGER.warn("MessageAccept received for message which is using NONE as 
the accept mode (likely client error)");
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        boolean acquired = _entry.acquire(_consumer);
-        if(acquired)
-        {
-            _target.addUnacknowledgedMessage(_entry);
-        }
-        return acquired;
-
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 32a2474..22d539b 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -860,7 +860,7 @@ public class ServerSession extends SessionInvoker
     {
         void onAccept();
 
-        void onRelease(boolean setRedelivered);
+        void onRelease(boolean setRedelivered, final boolean closing);
 
         void onReject();
 
@@ -1012,7 +1012,7 @@ public class ServerSession extends SessionInvoker
                                           @Override
                                           public void 
performAction(MessageDispositionChangeListener listener)
                                           {
-                                              
listener.onRelease(setRedelivered);
+                                              
listener.onRelease(setRedelivered, false);
                                           }
                                       });
     }
@@ -1158,7 +1158,7 @@ public class ServerSession extends SessionInvoker
 
         for(MessageDispositionChangeListener listener : 
_messageDispositionListenerMap.values())
         {
-            listener.onRelease(true);
+            listener.onRelease(true, true);
         }
         _messageDispositionListenerMap.clear();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to