Repository: activemq Updated Branches: refs/heads/master 72839b78a -> 6a6ef45ee
https://issues.apache.org/jira/browse/AMQ-5661 Always honor the link credit as true prefetch value for the subscription. Enables previously failing test to verify. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6a6ef45e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6a6ef45e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6a6ef45e Branch: refs/heads/master Commit: 6a6ef45ee04d332ed3905f79ea87527fa6264d94 Parents: 72839b7 Author: Timothy Bish <[email protected]> Authored: Fri Mar 13 18:20:26 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Mar 13 18:20:26 2015 -0400 ---------------------------------------------------------------------- .../amqp/AMQPProtocolDiscriminator.java | 7 --- .../transport/amqp/AmqpProtocolConverter.java | 46 +++++--------------- .../transport/amqp/AmqpTransportFilter.java | 5 ++- .../transport/amqp/IAmqpProtocolConverter.java | 2 - .../amqp/interop/AmqpReceiverTest.java | 1 - 5 files changed, 16 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index b484500..f5b457b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -33,7 +33,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { private final AmqpTransport transport; private final BrokerService brokerService; - private int prefetch = 0; private int producerCredit = DEFAULT_PREFETCH; interface Discriminator { @@ -90,7 +89,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { } IAmqpProtocolConverter next = match.create(transport, brokerService); - next.setPrefetch(prefetch); next.setProducerCredit(producerCredit); transport.setProtocolConverter(next); for (Command send : pendingCommands) { @@ -117,11 +115,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { } @Override - public void setPrefetch(int prefetch) { - this.prefetch = prefetch; - } - - @Override public void setProducerCredit(int producerCredit) { this.producerCredit = producerCredit; } http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 39c8c2b..3661f3d 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -155,7 +155,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final BrokerService brokerService; private AuthenticationBroker authenticator; - protected int prefetch; protected int producerCredit; protected Transport protonTransport = Proton.transport(); protected Connection protonConnection = Proton.connection(); @@ -410,17 +409,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { int credit = link.getCredit(); if (context instanceof ConsumerContext) { ConsumerContext consumerContext = (ConsumerContext)context; - // change consumer prefetch if it's not been already set using - // transport connector property or consumer preference - if (consumerContext.consumerPrefetch == 0 && credit > 0) { + + if (credit != consumerContext.credit) { + consumerContext.credit = credit >= 0 ? credit : 0; ConsumerControl control = new ConsumerControl(); control.setConsumerId(consumerContext.consumerId); control.setDestination(consumerContext.destination); - control.setPrefetch(credit); - consumerContext.consumerPrefetch = credit; + control.setPrefetch(consumerContext.credit); sendToActiveMQ(control, null); } - consumerContext.credit = credit; } ((AmqpDeliveryListener) link.getContext()).drainCheck(); } @@ -1061,7 +1058,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { public ConsumerInfo info; private boolean endOfBrowse = false; public int credit; - public int consumerPrefetch = 0; private long lastDeliveredSequenceId; protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); @@ -1481,33 +1477,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { destination = createDestination(source); } + int senderCredit = sender.getRemoteCredit(); + subscriptionsByConsumerId.put(id, consumerContext); ConsumerInfo consumerInfo = new ConsumerInfo(id); - consumerContext.info = consumerInfo; consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(destination); - consumerContext.setDestination(destination); - int senderCredit = sender.getRemoteCredit(); - if (prefetch != 0) { - // use the value configured on the transport connector - // this value will not be changed to the consumer's preference - consumerInfo.setPrefetchSize(prefetch); - consumerContext.consumerPrefetch = prefetch; - } else { - if (senderCredit != 0) { - // set the prefetch to the value of the remote credit - // and ignore the later changes - consumerInfo.setPrefetchSize(senderCredit); - consumerContext.consumerPrefetch = senderCredit; - } else { - // set zero value for now and change to the consumer's preference - // on the first flow packet - consumerInfo.setPrefetchSize(0); - } - } - consumerContext.credit = senderCredit; + consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0); consumerInfo.setDispatchAsync(true); + if (source.getDistributionMode() == COPY && destination.isQueue()) { consumerInfo.setBrowser(true); } @@ -1521,6 +1500,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setNoLocal(true); } + consumerContext.info = consumerInfo; + consumerContext.setDestination(destination); + consumerContext.credit = senderCredit; + sendToActiveMQ(consumerInfo, new ResponseHandler() { @Override public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { @@ -1657,11 +1640,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } @Override - public void setPrefetch(int prefetch) { - this.prefetch = prefetch; - } - - @Override public void setProducerCredit(int producerCredit) { this.producerCredit = producerCredit; } http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index fb7542b..5dfdf75 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -187,8 +187,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor this.protocolConverter = protocolConverter; } + /** + * @deprecated AMQP receiver configures it's prefetch via flow, remove on next release. + */ + @Deprecated public void setPrefetch(int prefetch) { - protocolConverter.setPrefetch(prefetch); } public void setProducerCredit(int producerCredit) { http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java index 3e365ae..8296ef2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java @@ -32,7 +32,5 @@ public interface IAmqpProtocolConverter { void updateTracer(); - void setPrefetch(int prefetch); - void setProducerCredit(int producerCredit); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java index 1245811..1bc3d66 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java @@ -141,7 +141,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { connection.close(); } - @Ignore("Fails due to issues with accept and no credit") @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception { int MSG_COUNT = 4;
