Repository: qpid-jms Updated Branches: refs/heads/master abc75a35e -> 24d03437f
QPIDJMS-420 Use local tracking in AmqpConsumer for prefetch refill Track the number of message that are currently in the prefetch buffer using local counters that mesh well with the already added delivered message counts to reduce access to synchronized code in the message queue. Reduce code paths where a message settlement might be missed or credit not replenished on error. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/24d03437 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/24d03437 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/24d03437 Branch: refs/heads/master Commit: 24d03437fb1695b2096989c3561007312d4169ee Parents: abc75a3 Author: Timothy Bish <[email protected]> Authored: Tue Oct 30 15:48:26 2018 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 30 15:48:26 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 2 +- .../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +- .../apache/qpid/jms/meta/JmsConsumerInfo.java | 11 +- .../qpid/jms/provider/amqp/AmqpConsumer.java | 156 +++++++++++-------- .../qpid/jms/meta/JmsConsumerInfoTest.java | 32 ++-- .../jms/meta/JmsDefaultResourceVisitorTest.java | 2 +- .../amqp/AmqpSubscriptionTrackerTest.java | 2 +- .../provider/amqp/message/AmqpCodecTest.java | 2 +- .../message/AmqpJmsMessageTypesTestCase.java | 2 +- .../failover/FailoverProviderTestSupport.java | 2 +- 10 files changed, 113 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index b4e2682..da5f704 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -471,7 +471,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection messageQueue = new FifoMessageQueue(configuredPrefetch); } - JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue, null); + JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), null); consumerInfo.setExplicitClientID(isExplicitClientID()); consumerInfo.setSelector(messageSelector); consumerInfo.setDurable(durable); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 5662b2e..d616faf 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -100,7 +100,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe this.messageQueue = new FifoMessageQueue(configuredPrefetch); } - consumerInfo = new JmsConsumerInfo(consumerId, messageQueue, this); + consumerInfo = new JmsConsumerInfo(consumerId, this); consumerInfo.setExplicitClientID(connection.isExplicitClientID()); consumerInfo.setSelector(selector); consumerInfo.setDurable(isDurableSubscription()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index 7addbb7..edb928a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -22,7 +22,6 @@ import org.apache.qpid.jms.policy.JmsDefaultDeserializationPolicy; import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy; import org.apache.qpid.jms.policy.JmsDeserializationPolicy; import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; -import org.apache.qpid.jms.util.MessageQueue; public final class JmsConsumerInfo extends JmsAbstractResource implements Comparable<JmsConsumerInfo> { @@ -42,7 +41,6 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar private boolean connectionConsumer; private int maxMessages; private volatile boolean listener; - private final MessageQueue messageQueue; private JmsRedeliveryPolicy redeliveryPolicy; private JmsDeserializationPolicy deserializationPolicy; @@ -52,17 +50,16 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar private final JmsMessageDispatcher dispatcher; - public JmsConsumerInfo(JmsConsumerId consumerId, MessageQueue messageQueue, JmsMessageDispatcher dispatcher) { + public JmsConsumerInfo(JmsConsumerId consumerId, JmsMessageDispatcher dispatcher) { if (consumerId == null) { throw new IllegalArgumentException("Consumer ID cannot be null"); } this.consumerId = consumerId; - this.messageQueue = messageQueue; this.dispatcher = dispatcher; } public JmsConsumerInfo copy() { - JmsConsumerInfo info = new JmsConsumerInfo(consumerId, messageQueue, dispatcher); + JmsConsumerInfo info = new JmsConsumerInfo(consumerId, dispatcher); copy(info); return info; } @@ -86,10 +83,6 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar info.maxMessages = maxMessages; } - public int getPrefetchedMessageCount() { - return messageQueue.size(); - } - @Override public JmsConsumerId getId() { return consumerId; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index d2d7c39..7edbaa7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -58,7 +58,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver protected AsyncResult stopRequest; protected AsyncResult pullRequest; protected long incomingSequence; - protected long deliveredCount; + protected int deliveredCount; + protected int dispatchedCount; protected boolean deferredClose; public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver receiver) { @@ -206,28 +207,29 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext(); if (envelope.isDelivered()) { + final DeliveryState disposition; + switch (ackType) { case ACCEPTED: - current.disposition(Accepted.getInstance()); + disposition = Accepted.getInstance(); break; case RELEASED: - current.disposition(Released.getInstance()); + disposition = Released.getInstance(); break; case REJECTED: - current.disposition(REJECTED); + disposition = REJECTED; break; case MODIFIED_FAILED: - current.disposition(MODIFIED_FAILED); + disposition = MODIFIED_FAILED; break; case MODIFIED_FAILED_UNDELIVERABLE: - current.disposition(MODIFIED_FAILED_UNDELIVERABLE); + disposition = MODIFIED_FAILED_UNDELIVERABLE; break; default: throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType); } - current.settle(); - deliveredCount--; + handleDisposition(envelope, current, disposition); } } @@ -252,60 +254,78 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver return; } - if (ackType.equals(ACK_TYPE.DELIVERED)) { - LOG.debug("Delivered Ack of message: {}", envelope); - deliveredCount++; - envelope.setDelivered(true); - delivery.setDefaultDeliveryState(MODIFIED_FAILED); - sendFlowIfNeeded(); - return; - } else if (ackType.equals(ACK_TYPE.ACCEPTED)) { - // A Consumer may not always send a DELIVERED ack so we need to - // check to ensure we don't add too much credit to the link. - if (!envelope.isDelivered()) { - sendFlowIfNeeded(); - } - LOG.debug("Accepted Ack of message: {}", envelope); - if (!delivery.remotelySettled()) { - if (session.isTransacted() && !getResourceInfo().isBrowser()) { + switch (ackType) { + case DELIVERED: + handleDelivered(envelope, delivery); + break; + case ACCEPTED: + handleAccepted(envelope, delivery); + break; + case REJECTED: + handleDisposition(envelope, delivery, REJECTED); + break; + case RELEASED: + handleDisposition(envelope, delivery, Released.getInstance()); + break; + case MODIFIED_FAILED: + handleDisposition(envelope, delivery, MODIFIED_FAILED); + break; + case MODIFIED_FAILED_UNDELIVERABLE: + handleDisposition(envelope, delivery, MODIFIED_FAILED_UNDELIVERABLE); + break; + default: + LOG.warn("Unsupported Ack Type for message: {}", envelope); + throw new IllegalArgumentException("Unknown Acknowledgement type"); + } - if (session.isTransactionFailed()) { - LOG.trace("Skipping ack of message {} in failed transaction.", envelope); - return; - } + sendFlowIfNeeded(); + tryCompleteDeferredClose(); + } - Binary txnId = session.getTransactionContext().getAmqpTransactionId(); - if (txnId != null) { - delivery.disposition(session.getTransactionContext().getTxnAcceptState()); - delivery.settle(); - session.getTransactionContext().registerTxConsumer(this); - } - } else { - delivery.disposition(Accepted.getInstance()); + private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery delivery) { + LOG.debug("Delivered Ack of message: {}", envelope); + deliveredCount++; + envelope.setDelivered(true); + delivery.setDefaultDeliveryState(MODIFIED_FAILED); + } + + private void handleAccepted(JmsInboundMessageDispatch envelope, Delivery delivery) { + LOG.debug("Accepted Ack of message: {}", envelope); + if (!delivery.remotelySettled()) { + if (session.isTransacted() && !getResourceInfo().isBrowser()) { + + if (session.isTransactionFailed()) { + LOG.trace("Skipping ack of message {} in failed transaction.", envelope); + return; + } + + Binary txnId = session.getTransactionContext().getAmqpTransactionId(); + if (txnId != null) { + delivery.disposition(session.getTransactionContext().getTxnAcceptState()); delivery.settle(); + session.getTransactionContext().registerTxConsumer(this); } } else { + delivery.disposition(Accepted.getInstance()); delivery.settle(); } - } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED)) { - settleDelivery(delivery, MODIFIED_FAILED); - } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) { - settleDelivery(delivery, MODIFIED_FAILED_UNDELIVERABLE); - } else if (ackType.equals(ACK_TYPE.REJECTED)) { - settleDelivery(delivery, REJECTED); - } else if (ackType.equals(ACK_TYPE.RELEASED)) { - delivery.disposition(Released.getInstance()); - delivery.settle(); } else { - LOG.warn("Unsupported Ack Type for message: {}", envelope); - return; + delivery.settle(); } if (envelope.isDelivered()) { deliveredCount--; } + dispatchedCount--; + } - tryCompleteDeferredClose(); + private void handleDisposition(JmsInboundMessageDispatch envelope, Delivery delivery, DeliveryState outcome) { + delivery.disposition(outcome); + delivery.settle(); + if (envelope.isDelivered()) { + deliveredCount--; + } + dispatchedCount--; } /** @@ -325,11 +345,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver int currentCredit = getEndpoint().getCredit(); if (currentCredit <= prefetchSize * 0.5) { - int prefetchedMessageCount = getResourceInfo().getPrefetchedMessageCount(); + int potentialPrefetch = currentCredit + (dispatchedCount - deliveredCount); - int potentialPrefetch = currentCredit + prefetchedMessageCount; if (potentialPrefetch <= prefetchSize * 0.7) { - int additionalCredit = prefetchSize - currentCredit - prefetchedMessageCount; + int additionalCredit = prefetchSize - potentialPrefetch; LOG.trace("Consumer {} granting additional credit: {}", getConsumerId(), additionalCredit); getEndpoint().flow(additionalCredit); @@ -377,6 +396,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } + // Previously delivered messages should be tagged as dispatched messages again so we + // can properly compute the next credit refresh, so subtract them from both the delivered + // and dispatched counts and then dispatch them again as a new message. + deliveredCount -= redispatchList.size(); + dispatchedCount -= redispatchList.size(); + ListIterator<JmsInboundMessageDispatch> reverseIterator = redispatchList.listIterator(redispatchList.size()); while (reverseIterator.hasPrevious()) { deliver(reverseIterator.previous()); @@ -481,7 +506,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver // In the future once the JMS mapping is complete we should be // able to convert everything to some message even if its just // a bytes messages as a fall back. - settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE); + incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE); + incoming.settle(); + // TODO: this flows credit, which we might not want, e.g if + // a drain was issued to stop the link. + sendFlowIfNeeded(); return false; } @@ -552,19 +581,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver return "AmqpConsumer { " + getResourceInfo().getId() + " }"; } - protected void settleDelivery(Delivery incoming, DeliveryState state) { - incoming.disposition(state); - incoming.settle(); - // TODO: this flows credit, which we might not want, e.g if - // a drain was issued to stop the link. - sendFlowIfNeeded(); - } - protected void deliver(JmsInboundMessageDispatch envelope) throws Exception { if (!deferredClose) { ProviderListener listener = session.getProvider().getProviderListener(); if (listener != null) { LOG.debug("Dispatching received message: {}", envelope); + dispatchedCount++; listener.onInboundMessage(envelope); } else { LOG.error("Provider listener is not set, message will be dropped: {}", envelope); @@ -641,15 +663,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver Delivery current = delivery; delivery = delivery.next(); - if (!(current.getContext() instanceof JmsInboundMessageDispatch)) { + if (current.getContext() instanceof JmsInboundMessageDispatch) { + JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext(); + if (!envelope.isDelivered()) { + handleDisposition(envelope, current, Released.getInstance()); + } + } else { LOG.debug("{} Found incomplete delivery with no context during release processing", AmqpConsumer.this); - continue; - } - - JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) current.getContext(); - if (!envelope.isDelivered()) { - current.disposition(Released.getInstance()); - current.settle(); } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java index 0a2b62d..d7bc544 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java @@ -58,12 +58,12 @@ public class JmsConsumerInfoTest { @Test(expected=IllegalArgumentException.class) public void testExceptionWhenCreatedWithNullConnectionId() { - new JmsConsumerInfo(null, null, null); + new JmsConsumerInfo(null, null); } @Test public void testCreateFromConsumerId() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertSame(firstId, info.getId()); assertSame(firstId.getParentId(), info.getParentId()); assertNotNull(info.toString()); @@ -71,7 +71,7 @@ public class JmsConsumerInfoTest { @Test public void testCopy() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); info.setAcknowledgementMode(1); info.setBrowser(true); @@ -108,7 +108,7 @@ public class JmsConsumerInfoTest { @Test public void testIsDurable() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertFalse(info.isDurable()); info.setDurable(true); assertTrue(info.isDurable()); @@ -116,7 +116,7 @@ public class JmsConsumerInfoTest { @Test public void testIsExplicitClientID() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertFalse(info.isExplicitClientID()); info.setExplicitClientID(true); assertTrue(info.isExplicitClientID()); @@ -124,7 +124,7 @@ public class JmsConsumerInfoTest { @Test public void testIsShared() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertFalse(info.isShared()); info.setShared(true); assertTrue(info.isShared()); @@ -134,7 +134,7 @@ public class JmsConsumerInfoTest { public void testGetSubscriptionName() { String subName = "name"; - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertNull(info.getSubscriptionName()); info.setSubscriptionName(subName); assertEquals(subName, info.getSubscriptionName()); @@ -142,8 +142,8 @@ public class JmsConsumerInfoTest { @Test public void testCompareTo() { - JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null); - JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null); + JmsConsumerInfo first = new JmsConsumerInfo(firstId, null); + JmsConsumerInfo second = new JmsConsumerInfo(secondId, null); assertEquals(-1, first.compareTo(second)); assertEquals(0, first.compareTo(first)); @@ -152,8 +152,8 @@ public class JmsConsumerInfoTest { @Test public void testHashCode() { - JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null); - JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null); + JmsConsumerInfo first = new JmsConsumerInfo(firstId, null); + JmsConsumerInfo second = new JmsConsumerInfo(secondId, null); assertEquals(first.hashCode(), first.hashCode()); assertEquals(second.hashCode(), second.hashCode()); @@ -164,8 +164,8 @@ public class JmsConsumerInfoTest { @SuppressWarnings("unlikely-arg-type") @Test public void testEqualsCode() { - JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null); - JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null); + JmsConsumerInfo first = new JmsConsumerInfo(firstId, null); + JmsConsumerInfo second = new JmsConsumerInfo(secondId, null); assertEquals(first, first); assertEquals(second, second); @@ -179,7 +179,7 @@ public class JmsConsumerInfoTest { @Test public void testVisit() throws Exception { - final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null); + final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null); final AtomicBoolean visited = new AtomicBoolean(); @@ -197,7 +197,7 @@ public class JmsConsumerInfoTest { @Test public void testGetRedeliveryPolicyDefaults() { - final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertNotNull(info.getRedeliveryPolicy()); info.setRedeliveryPolicy(null); @@ -207,7 +207,7 @@ public class JmsConsumerInfoTest { @Test public void testIsListener() { - JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null); + JmsConsumerInfo info = new JmsConsumerInfo(firstId, null); assertFalse(info.isListener()); info.setListener(true); assertTrue(info.isListener()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java index 39f5895..00bf2f4 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java @@ -48,7 +48,7 @@ public class JmsDefaultResourceVisitorTest { JmsDefaultResourceVisitor visitor = new JmsDefaultResourceVisitor(); visitor.processConnectionInfo(new JmsConnectionInfo(connectionId)); visitor.processSessionInfo(new JmsSessionInfo(sessionId)); - visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null, null)); + visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null)); visitor.processProducerInfo(new JmsProducerInfo(producerId)); visitor.processDestination(new JmsTemporaryTopic("Test")); visitor.processTransactionInfo(new JmsTransactionInfo(sessionId, transactionId)); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java index 755bd21..f60af09 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java @@ -45,7 +45,7 @@ public class AmqpSubscriptionTrackerTest { JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter.incrementAndGet()); JmsTopic topic = new JmsTopic(topicName); - JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null, null); + JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null); consumerInfo.setSubscriptionName(subscriptionName); consumerInfo.setDestination(topic); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java index 858cc7f..472d756 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java @@ -84,7 +84,7 @@ public class AmqpCodecTest extends QpidJmsTestCase { JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, 1); mockConnection = Mockito.mock(AmqpConnection.class); mockConsumer = Mockito.mock(AmqpConsumer.class); - Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null, null)); + Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null)); } //----- AmqpHeader encode and decode -------------------------------------// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java index 1db2744..dede595 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java @@ -131,7 +131,7 @@ public class AmqpJmsMessageTypesTestCase extends QpidJmsTestCase { AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class); Mockito.when(consumer.getConnection()).thenReturn(connection); Mockito.when(consumer.getDestination()).thenReturn(consumerDestination); - Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null, null)); + Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null)); return consumer; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java index ebce5d7..19f6d3e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java @@ -78,7 +78,7 @@ public class FailoverProviderTestSupport extends QpidJmsTestCase { protected JmsConsumerInfo createConsumerInfo(JmsSessionInfo session) { JmsConsumerId id = new JmsConsumerId(session.getId(), nextConsumerId.incrementAndGet()); - JmsConsumerInfo consumer = new JmsConsumerInfo(id, null, null); + JmsConsumerInfo consumer = new JmsConsumerInfo(id, null); return consumer; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
