Author: rgodfrey
Date: Mon Nov 16 09:59:25 2015
New Revision: 1714533
URL: http://svn.apache.org/viewvc?rev=1714533&view=rev
Log:
QPID-6853 : Ensure a reference is kept to messages on the consumertarget _queue
to stop them being removed from the store before they have been sent down the
wire
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
Mon Nov 16 09:59:25 2015
@@ -207,21 +207,64 @@ public abstract class AbstractConsumerTa
@Override
public void sendNextMessage()
{
- ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ ConsumerMessageInstancePair consumerMessage = _queue.poll();
if (consumerMessage != null)
{
- _queue.poll();
+ try
+ {
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+ finally
+ {
+ consumerMessage.release();
+ }
+ }
+
+ }
- ConsumerImpl consumer = consumerMessage.getConsumer();
- MessageInstance entry = consumerMessage.getEntry();
- boolean batch = consumerMessage.isBatch();
- doSend(consumer, entry, batch);
+ final public boolean close()
+ {
+ boolean closed = false;
+ State state = getState();
- if (consumer.acquires())
+ getSendLock();
+ try
+ {
+ while(!closed && state != State.CLOSED)
{
- entry.unlockAcquisition();
+ closed = updateState(state, State.CLOSED);
+ if(!closed)
+ {
+ state = getState();
+ }
}
+ ConsumerMessageInstancePair instance;
+ while((instance = _queue.poll()) != null)
+ {
+ instance.release();
+ }
+ doCloseInternal();
}
+ finally
+ {
+ releaseSendLock();
+ }
+
+ afterCloseInternal();
+ return closed;
}
+
+ protected abstract void afterCloseInternal();
+
+ protected abstract void doCloseInternal();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
Mon Nov 16 09:59:25 2015
@@ -20,18 +20,21 @@
package org.apache.qpid.server.consumer;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
public class ConsumerMessageInstancePair
{
private final ConsumerImpl _consumer;
private final MessageInstance _entry;
private final boolean _batch;
+ private final MessageReference _reference;
public ConsumerMessageInstancePair(final ConsumerImpl consumer, final
MessageInstance entry, final boolean batch)
{
_consumer = consumer;
_entry = entry;
_batch = batch;
+ _reference = entry.getMessage().newReference();
}
@@ -49,4 +52,9 @@ public class ConsumerMessageInstancePair
{
return _batch;
}
+
+ public void release()
+ {
+ _reference.release();
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
Mon Nov 16 09:59:25 2015
@@ -117,35 +117,20 @@ public class ConsumerTarget_0_10 extends
return getState()!=State.ACTIVE || _deleted.get() ||
_session.isClosing() || _session.getAMQPConnection().isConnectionStopped(); //
TODO check for Session suspension
}
- public boolean close()
+ @Override
+ protected void afterCloseInternal()
{
- boolean closed = false;
- State state = getState();
-
- getSendLock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = updateState(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- }
- _creditManager.removeListener(this);
- }
- finally
- {
- releaseSendLock();
- }
for (ConsumerImpl consumer : _consumers)
{
consumer.close();
}
- return closed;
+ }
+ @Override
+ protected void doCloseInternal()
+ {
+ _creditManager.removeListener(this);
}
public void creditStateChanged(boolean hasCredit)
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Mon Nov 16 09:59:25 2015
@@ -389,32 +389,16 @@ public abstract class ConsumerTarget_0_8
}
@Override
- public boolean close()
+ protected void afterCloseInternal()
{
- boolean closed = false;
- State state = getState();
- getSendLock();
-
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = updateState(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- }
- _creditManager.removeListener(this);
- return closed;
- }
- finally
- {
- releaseSendLock();
- }
}
+ @Override
+ protected void doCloseInternal()
+ {
+ _creditManager.removeListener(this);
+ }
public boolean allocateCredit(ServerMessage msg)
{
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1714533&r1=1714532&r2=1714533&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Mon Nov 16 09:59:25 2015
@@ -97,28 +97,16 @@ class ConsumerTarget_1_0 extends Abstrac
}
- public boolean close()
+ @Override
+ protected void afterCloseInternal()
+ {
+
+ }
+
+ @Override
+ protected void doCloseInternal()
{
- boolean closed = false;
- State state = getState();
- getSendLock();
- try
- {
- while(!closed && state != State.CLOSED)
- {
- closed = updateState(state, State.CLOSED);
- if(!closed)
- {
- state = getState();
- }
- }
- return closed;
- }
- finally
- {
- releaseSendLock();
- }
}
public void doSend(final ConsumerImpl consumer, final MessageInstance
entry, boolean batch)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]