Repository: activemq Updated Branches: refs/heads/trunk 4e3499e41 -> 0ca376d54
https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size and redelivery header problem Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0ca376d5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0ca376d5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0ca376d5 Branch: refs/heads/trunk Commit: 0ca376d540e1f73da92494ed1ce34d4b64746480 Parents: 4e3499e Author: Dejan Bosanac <[email protected]> Authored: Thu Nov 27 14:40:56 2014 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Thu Nov 27 14:42:39 2014 +0100 ---------------------------------------------------------------------- .../amqp/AMQPProtocolDiscriminator.java | 4 +- .../transport/amqp/AmqpProtocolConverter.java | 66 +++++++++++--------- .../activemq/transport/amqp/JMSClientTest.java | 40 ++++++++++-- 3 files changed, 74 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index a7607af..b484500 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -28,12 +28,12 @@ import org.apache.activemq.command.Command; */ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { - private static final int DEFAULT_PREFETCH = 100; + public static final int DEFAULT_PREFETCH = 1000; private final AmqpTransport transport; private final BrokerService brokerService; - private int prefetch = DEFAULT_PREFETCH; + private int prefetch = 0; private int producerCredit = DEFAULT_PREFETCH; interface Discriminator { http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 9a95725..444cdb5 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -69,14 +69,7 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Modified; -import org.apache.qpid.proton.amqp.messaging.Outcome; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.*; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transaction.Declare; import org.apache.qpid.proton.amqp.transaction.Declared; @@ -322,10 +315,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { protected void processLinkFlow(Link link) throws Exception { Object context = link.getContext(); int credit = link.getRemoteCredit(); - if (context != null && context instanceof ConsumerContext) { + if (context instanceof ConsumerContext) { ConsumerContext consumerContext = (ConsumerContext)context; - // change ActiveMQ consumer prefetch if needed - if (consumerContext.credit == 0 && consumerContext.consumerPrefetch != credit && credit > 0) { + // change consumer prefetch if it's not been already set using + // transport connector property or consumer preference + if (consumerContext.consumerPrefetch == 0 && credit > 0) { ConsumerControl control = new ConsumerControl(); control.setConsumerId(consumerContext.consumerId); control.setDestination(consumerContext.destination); @@ -612,6 +606,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final ActiveMQDestination destination; private boolean closed; private final boolean anonymous; + private MessageId lastDispatched; public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) { this.producerId = producerId; @@ -688,9 +683,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { rejected.setError(condition); delivery.disposition(rejected); } else { - if (receiver.getCredit() <= (prefetch * .2)) { - LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId); - receiver.flow(prefetch - receiver.getCredit()); + if (receiver.getCredit() <= (producerCredit * .2)) { + LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId); + receiver.flow(producerCredit - receiver.getCredit()); } if (remoteState != null && remoteState instanceof TransactionalState) { @@ -710,9 +705,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } }); } else { - if (receiver.getCredit() <= (prefetch * .2)) { - LOG.trace("Sending more credit ({}) to producer: {}", prefetch - receiver.getCredit(), producerId); - receiver.flow(prefetch - receiver.getCredit()); + if (receiver.getCredit() <= (producerCredit * .2)) { + LOG.trace("Sending more credit ({}) to producer: {}", producerCredit - receiver.getCredit(), producerId); + receiver.flow(producerCredit - receiver.getCredit()); pumpProtonToSocket(); } sendToActiveMQ(message, null); @@ -838,10 +833,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { // Client is producing to this receiver object org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget(); int flow = producerCredit; - // use client's preference if set - if (receiver.getRemoteCredit() != 0) { - flow = receiver.getRemoteCredit(); - } try { if (remoteTarget instanceof Coordinator) { pumpProtonToSocket(); @@ -934,7 +925,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private boolean endOfBrowse = false; public ActiveMQDestination destination; public int credit; - public int consumerPrefetch; + public int consumerPrefetch = 0; + private long lastDeliveredSequenceId; protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); @@ -978,8 +970,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (session != null) { session.consumers.remove(info.getConsumerId()); } - - sendToActiveMQ(new RemoveInfo(consumerId), null); + RemoveInfo removeCommand = new RemoveInfo(consumerId); + removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); + sendToActiveMQ(removeCommand, null); } } @@ -1003,7 +996,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public void pumpOutbound() throws Exception { while (!closed) { - while (currentBuffer != null) { int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); if (sent > 0) { @@ -1089,6 +1081,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { onMessageDispatch((MessageDispatch) delivery.getContext()); } else { MessageDispatch md = (MessageDispatch) delivery.getContext(); + lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); MessageAck ack = new MessageAck(); ack.setConsumerId(consumerId); ack.setFirstMessageId(md.getMessage().getMessageId()); @@ -1110,6 +1103,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { dispatchedInTx.addFirst(md); } + LOG.trace("Sending Ack to ActiveMQ: {}", ack); sendToActiveMQ(ack, new ResponseHandler() { @@ -1335,9 +1329,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(dest); consumerContext.destination = dest; - consumerInfo.setPrefetchSize(sender.getRemoteCredit()); - consumerContext.credit = sender.getRemoteCredit(); - consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize(); + int senderCredit = sender.getRemoteCredit(); + if (prefetch != 0) { + // use the value configured on the transport connector + // this value will not be changed to the consumer's preference + consumerInfo.setPrefetchSize(prefetch); + consumerContext.consumerPrefetch = prefetch; + } else { + if (senderCredit != 0) { + // set the prefetch to the value of the remote credit + // and ignore the later changes + consumerInfo.setPrefetchSize(senderCredit); + consumerContext.consumerPrefetch = senderCredit; + } else { + // set default value for now and change to the consumer's preference + // on the first flow packet + consumerInfo.setPrefetchSize(AMQPProtocolDiscriminator.DEFAULT_PREFETCH); + } + } + consumerContext.credit = senderCredit; consumerInfo.setDispatchAsync(true); if (source.getDistributionMode() == COPY && dest.isQueue()) { consumerInfo.setBrowser(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/0ca376d5/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 64a4f3c..0380a87 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.transport.amqp; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.util.ArrayList; import java.util.Enumeration; import java.util.concurrent.CountDownLatch; @@ -55,6 +49,8 @@ import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.junit.Assert.*; + public class JMSClientTest extends JMSClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class); @@ -952,4 +948,36 @@ public class JMSClientTest extends JMSClientTestSupport { } catch (JMSException ex) { } } + + @Test(timeout=30000) + public void testRedeliveredHeader() throws Exception { + connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i = 1; i < 100; i++) { + Message m = session.createTextMessage(i + ". Sample text"); + producer.send(m); + } + + MessageConsumer consumer = session.createConsumer(queue); + receiveMessages(consumer); + consumer.close(); + + consumer = session.createConsumer(queue); + receiveMessages(consumer); + consumer.close(); + } + + protected void receiveMessages(MessageConsumer consumer) throws Exception { + for (int i = 0; i < 10; i++) { + Message message = consumer.receive(1000); + assertNotNull(message); + assertFalse(message.getJMSRedelivered()); + } + } }
