Author: rgodfrey
Date: Mon Nov 16 09:59:25 2015
New Revision: 1714533

URL: http://svn.apache.org/viewvc?rev=1714533&view=rev
Log:
QPID-6853 : Ensure a reference is kept to messages on the consumertarget _queue 
to stop them being removed from the store before they have been sent down the 
wire

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 Mon Nov 16 09:59:25 2015
@@ -207,21 +207,64 @@ public abstract class AbstractConsumerTa
     @Override
     public void sendNextMessage()
     {
-        ConsumerMessageInstancePair consumerMessage = _queue.peek();
+        ConsumerMessageInstancePair consumerMessage = _queue.poll();
         if (consumerMessage != null)
         {
-            _queue.poll();
+            try
+            {
+
+                ConsumerImpl consumer = consumerMessage.getConsumer();
+                MessageInstance entry = consumerMessage.getEntry();
+                boolean batch = consumerMessage.isBatch();
+                doSend(consumer, entry, batch);
+
+                if (consumer.acquires())
+                {
+                    entry.unlockAcquisition();
+                }
+            }
+            finally
+            {
+                consumerMessage.release();
+            }
+        }
+
+    }
 
-            ConsumerImpl consumer = consumerMessage.getConsumer();
-            MessageInstance entry = consumerMessage.getEntry();
-            boolean batch = consumerMessage.isBatch();
-            doSend(consumer, entry, batch);
+    final public boolean close()
+    {
+        boolean closed = false;
+        State state = getState();
 
-            if (consumer.acquires())
+        getSendLock();
+        try
+        {
+            while(!closed && state != State.CLOSED)
             {
-                entry.unlockAcquisition();
+                closed = updateState(state, State.CLOSED);
+                if(!closed)
+                {
+                    state = getState();
+                }
             }
+            ConsumerMessageInstancePair instance;
+            while((instance = _queue.poll()) != null)
+            {
+                instance.release();
+            }
+            doCloseInternal();
         }
+        finally
+        {
+            releaseSendLock();
+        }
+
+        afterCloseInternal();
+        return closed;
 
     }
+
+    protected abstract void afterCloseInternal();
+
+    protected abstract void doCloseInternal();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
 Mon Nov 16 09:59:25 2015
@@ -20,18 +20,21 @@
 package org.apache.qpid.server.consumer;
 
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
 
 public class ConsumerMessageInstancePair
 {
     private final ConsumerImpl _consumer;
     private final MessageInstance _entry;
     private final boolean _batch;
+    private final MessageReference _reference;
 
     public ConsumerMessageInstancePair(final ConsumerImpl consumer, final 
MessageInstance entry, final boolean batch)
     {
         _consumer = consumer;
         _entry = entry;
         _batch = batch;
+        _reference = entry.getMessage().newReference();
 
     }
 
@@ -49,4 +52,9 @@ public class ConsumerMessageInstancePair
     {
         return _batch;
     }
+
+    public void release()
+    {
+        _reference.release();
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 Mon Nov 16 09:59:25 2015
@@ -117,35 +117,20 @@ public class ConsumerTarget_0_10 extends
         return getState()!=State.ACTIVE || _deleted.get() || 
_session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); // 
TODO check for Session suspension
     }
 
-    public boolean close()
+    @Override
+    protected void afterCloseInternal()
     {
-        boolean closed = false;
-        State state = getState();
-
-        getSendLock();
-        try
-        {
-            while(!closed && state != State.CLOSED)
-            {
-                closed = updateState(state, State.CLOSED);
-                if(!closed)
-                {
-                    state = getState();
-                }
-            }
-            _creditManager.removeListener(this);
-            }
-        finally
-        {
-            releaseSendLock();
-        }
 
         for (ConsumerImpl consumer : _consumers)
         {
             consumer.close();
         }
-        return closed;
+    }
 
+    @Override
+    protected void doCloseInternal()
+    {
+        _creditManager.removeListener(this);
     }
 
     public void creditStateChanged(boolean hasCredit)

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 Mon Nov 16 09:59:25 2015
@@ -389,32 +389,16 @@ public abstract class ConsumerTarget_0_8
     }
 
     @Override
-    public boolean close()
+    protected void afterCloseInternal()
     {
-        boolean closed = false;
-        State state = getState();
 
-        getSendLock();
-
-        try
-        {
-            while(!closed && state != State.CLOSED)
-            {
-                closed = updateState(state, State.CLOSED);
-                if(!closed)
-                {
-                    state = getState();
-                }
-            }
-            _creditManager.removeListener(this);
-            return closed;
-        }
-        finally
-        {
-            releaseSendLock();
-        }
     }
 
+    @Override
+    protected void doCloseInternal()
+    {
+        _creditManager.removeListener(this);
+    }
 
     public boolean allocateCredit(ServerMessage msg)
     {

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Mon Nov 16 09:59:25 2015
@@ -97,28 +97,16 @@ class ConsumerTarget_1_0 extends Abstrac
 
     }
 
-    public boolean close()
+    @Override
+    protected void afterCloseInternal()
+    {
+
+    }
+
+    @Override
+    protected void doCloseInternal()
     {
-        boolean closed = false;
-        State state = getState();
 
-        getSendLock();
-        try
-        {
-            while(!closed && state != State.CLOSED)
-            {
-                closed = updateState(state, State.CLOSED);
-                if(!closed)
-                {
-                    state = getState();
-                }
-            }
-            return closed;
-        }
-        finally
-        {
-            releaseSendLock();
-        }
     }
 
     public void doSend(final ConsumerImpl consumer, final MessageInstance 
entry, boolean batch)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to