Repository: activemq Updated Branches: refs/heads/master 47f5c0857 -> b3bf8e74f
https://issues.apache.org/jira/browse/AMQ-5723 Ensure that we settle the delivery state of incoming deliveries that are already remotely settled so that the resources associated are freed. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b3bf8e74 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b3bf8e74 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b3bf8e74 Branch: refs/heads/master Commit: b3bf8e74f29bf30930da20c503ce11b3b780deaf Parents: 47f5c08 Author: Timothy Bish <[email protected]> Authored: Tue Apr 14 11:18:32 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Apr 14 11:18:32 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpReceiver.java | 6 ++-- .../transport/amqp/client/AmqpSender.java | 9 +++--- .../transport/amqp/client/AmqpSession.java | 19 +++++++++++- .../transport/amqp/interop/AmqpSenderTest.java | 32 ++++++++++++++++++++ 4 files changed, 58 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 9ab7ebe..6051cde 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -219,7 +219,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver { rejected.setError(condition); delivery.disposition(rejected); } else { - if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) { LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); @@ -234,10 +233,9 @@ public class AmqpReceiver extends AmqpAbstractReceiver { } else { delivery.disposition(Accepted.getInstance()); } - - delivery.settle(); } + delivery.settle(); session.pumpProtonToSocket(); } }); @@ -247,6 +245,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver { getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); session.pumpProtonToSocket(); } + + delivery.settle(); sendToActiveMQ(message); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index c8829e0..e8e5c8b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -377,13 +377,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { outcome = (Outcome) state; } else { LOG.warn("Message send updated with unsupported state: {}", state); - continue; + outcome = null; } AsyncResult request = (AsyncResult) delivery.getContext(); if (outcome instanceof Accepted) { - toRemove.add(delivery); LOG.trace("Outcome of delivery was accepted: {}", delivery); tagGenerator.returnTag(delivery.getTag()); if (request != null && !request.isComplete()) { @@ -391,7 +390,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { } } else if (outcome instanceof Rejected) { Exception remoteError = getRemoteError(); - toRemove.add(delivery); LOG.trace("Outcome of delivery was rejected: {}", delivery); tagGenerator.returnTag(delivery.getTag()); if (request != null && !request.isComplete()) { @@ -399,9 +397,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> { } else { connection.fireClientException(getRemoteError()); } - } else { + } else if (outcome != null) { LOG.warn("Message send updated with unsupported outcome: {}", outcome); } + + delivery.settle(); + toRemove.add(delivery); } pending.removeAll(toRemove); http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 8af362b..b7ebeec 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -54,16 +54,33 @@ public class AmqpSession extends AmqpAbstractResource<Session> { * Create a sender instance using the given address * * @param address - * the address to which the sender will produce its messages. + * the address to which the sender will produce its messages. * * @return a newly created sender that is ready for use. * * @throws Exception if an error occurs while creating the sender. */ public AmqpSender createSender(final String address) throws Exception { + return createSender(address, false); + } + + /** + * Create a sender instance using the given address + * + * @param address + * the address to which the sender will produce its messages. + * @param presettle + * controls if the created sender produces message that have already been marked settled. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, boolean presettle) throws Exception { checkClosed(); final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId()); + sender.setPresettle(presettle); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java index 3f6a454..886a42e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java @@ -18,14 +18,17 @@ package org.apache.activemq.transport.amqp.interop; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; import org.junit.Test; /** @@ -91,4 +94,33 @@ public class AmqpSenderTest extends AmqpClientTestSupport { sender.close(); connection.close(); } + + @Test(timeout = 60000) + public void testPresettledSender() throws Exception { + final int MSG_COUNT = 1000; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("topic://" + getTestName(), true); + + for (int i = 0; i < MSG_COUNT; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + i); + sender.send(message); + } + + final TopicViewMBean topic = getProxyToTopic(getTestName()); + assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return topic.getEnqueueCount() == MSG_COUNT; + } + })); + + sender.close(); + connection.close(); + } } \ No newline at end of file
