Repository: qpid-jms Updated Branches: refs/heads/master 6573d8df8 -> 9f6ecd3b4
https://issues.apache.org/jira/browse/QPIDJMS-92 Makes the pull consumer actually pull from the remote peer and clears any granted credit if the pull did not succeed in getting any messages. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9f6ecd3b Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9f6ecd3b Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9f6ecd3b Branch: refs/heads/master Commit: 9f6ecd3b407880e4283e403ebbe6aff6665c3813 Parents: 6573d8d Author: Timothy Bish <[email protected]> Authored: Fri Aug 21 10:43:31 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Aug 21 10:43:31 2015 -0400 ---------------------------------------------------------------------- .../org/apache/qpid/jms/JmsMessageConsumer.java | 118 +++++++++++++-- .../org/apache/qpid/jms/provider/Provider.java | 6 + .../qpid/jms/provider/amqp/AmqpConnection.java | 21 +++ .../qpid/jms/provider/amqp/AmqpConsumer.java | 127 ++++++++++++++-- .../jms/provider/amqp/AmqpFixedProducer.java | 10 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 10 +- .../qpid/jms/provider/amqp/AmqpSession.java | 21 +++ .../provider/amqp/AmqpTemporaryDestination.java | 4 +- .../qpid/jms/consumer/JmsZeroPrefetchTest.java | 145 ++++++++++++++++++- .../src/test/resources/log4j.properties | 2 +- 10 files changed, 414 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index dff2c83..e8a7066 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -40,12 +40,16 @@ import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.util.FifoMessageQueue; import org.apache.qpid.jms.util.MessageQueue; import org.apache.qpid.jms.util.PriorityMessageQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * implementation of a JMS Message Consumer */ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableConsumer, JmsMessageDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumer.class); + protected final JmsSession session; protected final JmsConnection connection; protected JmsConsumerInfo consumerInfo; @@ -215,18 +219,22 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC public Message receive(long timeout) throws JMSException { checkClosed(); checkMessageListener(); - sendPullCommand(timeout); - long wait = timeout; + // Configure for infinite wait when timeout is zero (JMS Spec) if (timeout == 0) { - wait = -1; + timeout = -1; } - try { - return copy(ackFromReceive(this.messageQueue.dequeue(wait))); - } catch (InterruptedException e) { - throw JmsExceptionSupport.create(e); + sendPullCommand(timeout); + + JmsInboundMessageDispatch envelope = null; + if (isPullConsumer()) { + envelope = dequeue(-1); // Let server tell us if empty. + } else { + envelope = dequeue(timeout); // Check local prefetch only. } + + return copy(ackFromReceive(envelope)); } /** @@ -238,9 +246,82 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC public Message receiveNoWait() throws JMSException { checkClosed(); checkMessageListener(); - sendPullCommand(-1); + sendPullCommand(0); + + JmsInboundMessageDispatch envelope = null; + if (isPullConsumer()) { + envelope = dequeue(-1); // Let server tell us if empty. + } else { + envelope = dequeue(0); // Check local prefetch only. + } - return copy(ackFromReceive(this.messageQueue.dequeueNoWait())); + return copy(ackFromReceive(envelope)); + } + + /** + * Used to get an enqueued message from the unconsumedMessages list. The + * amount of time this method blocks is based on the timeout value. + * + * timeout < 0 then it blocks until a message is received. + * timeout = 0 then it returns a message or null if none available + * timeout > 0 then it blocks up to timeout amount of time. + * + * This method may consume messages that are expired or exceed a configured + * delivery count value but will continue to wait for the configured timeout. + * + * @throws JMSException + * @return null if we timeout or if the consumer is closed. + */ + private JmsInboundMessageDispatch dequeue(long timeout) throws JMSException { + + try { + long deadline = 0; + if (timeout > 0) { + deadline = System.currentTimeMillis() + timeout; + } + + while (true) { + JmsInboundMessageDispatch envelope = messageQueue.dequeue(timeout); + if (envelope == null) { + if (timeout > 0 && !messageQueue.isClosed()) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } else { + if (failureCause != null && !messageQueue.isClosed()) { + LOG.debug("{} receive failed: {}", getConsumerId(), failureCause.getMessage()); + throw JmsExceptionSupport.create(failureCause); + } else { + return null; + } + } + } else if (envelope.getMessage() == null) { + LOG.trace("{} no message was available for this consumer: {}", getConsumerId()); + return null; + } else if (redeliveryExceeded(envelope)) { + LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), envelope); + // TODO - Future + // Reject this delivery as not deliverable here + if (timeout > 0) { + timeout = Math.max(deadline - System.currentTimeMillis(), 0); + } + sendPullCommand(timeout); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(getConsumerId() + " received message: " + envelope); + } + return envelope; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw JmsExceptionSupport.create(e); + } + } + + protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) { + // TODO - Future + // Check for message that have been redelivered to see if they exceed + // some set maximum redelivery count + return false; } protected void checkClosed() throws IllegalStateException { @@ -500,6 +581,10 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC return false; } + public boolean isPullConsumer() { + return getPrefetchSize() == 0; + } + @Override public void setAvailableListener(JmsMessageAvailableListener availableListener) { this.availableListener = availableListener; @@ -511,6 +596,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } protected void onConnectionInterrupted() { + // TODO - We might want to wake all blocking receive calls + // to interrupt pull consumers, although that also + // wakes infinite wait receivers so how to deal with that? messageQueue.clear(); } @@ -530,21 +618,19 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC } /** - * Triggers a pull request from the connected Provider. An attempt is made to set - * a timeout on the pull request however some providers will not honor this value - * and the pull will remain active until a message is dispatched. + * Triggers a pull request from the connected Provider with the given timeout value. * <p> * The timeout value can be one of: * <br> - * {@literal < 0} to indicate that the request should expire immediately if no message.<br> - * {@literal = 0} to indicate that the request should never time out.<br> - * {@literal > 1} to indicate that the request should expire after the given time in milliseconds. + * {@literal < 0} to indicate that the request should never time out.<br> + * {@literal = 0} to indicate that the request should expire immediately if no message.<br> + * {@literal > 0} to indicate that the request should expire after the given time in milliseconds. * * @param timeout * The amount of time the pull request should remain valid. */ protected void sendPullCommand(long timeout) throws JMSException { - if (messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) { + if (!messageQueue.isClosed() && messageQueue.isEmpty() && (getPrefetchSize() == 0 || isBrowser())) { connection.pull(getConsumerId(), timeout); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java index d56e3b1..e2f4f56 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java @@ -283,6 +283,12 @@ public interface Provider { * consumer. If the consumer has a set prefetch that's greater than zero this method * should just return without performing and action. * + * timeout < 0 then it should remain open until a message is received. + * timeout = 0 then it returns a message or null if none available + * timeout > 0 then it should remain open for timeout amount of time. + * + * The timeout value when positive is given in milliseconds. + * * @param timeout * the amount of time to tell the remote peer to keep this pull request valid. * @param request http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index af2b431..ebec59a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -22,6 +22,8 @@ import java.net.URI; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.JMSSecurityException; @@ -330,6 +332,25 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn return properties; } + /** + * Allows a connection resource to schedule a task for future execution. + * + * @param task + * The Runnable task to be executed after the given delay. + * @param delay + * The delay in milliseconds to schedule the given task for execution. + * + * @return a ScheduledFuture instance that can be used to cancel the task. + */ + public ScheduledFuture<?> schedule(final Runnable task, long delay) { + if (task == null) { + LOG.trace("Resource attempted to schedule a null task."); + return null; + } + + return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS); + } + @Override public String toString() { return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index a5405ac..efbc9cb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.ListIterator; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidDestinationException; @@ -79,8 +80,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private static final Modified MODIFIED_FAILED = new Modified(); private static final Modified MODIFIED_UNDELIVERABLE = new Modified(); - static - { + + static { MODIFIED_FAILED.setDeliveryFailed(true); MODIFIED_UNDELIVERABLE.setDeliveryFailed(true); @@ -96,6 +97,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver private final AtomicLong incomingSequence = new AtomicLong(0); private AsyncResult stopRequest; + private AsyncResult pullRequest; public AmqpConsumer(AmqpSession session, JmsConsumerInfo info) { super(info); @@ -127,7 +129,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver stopRequest = request; } } else { - // TODO: We dont actually want the additional messages that could be sent while + // TODO: We don't actually want the additional messages that could be sent while // draining. We could explicitly reduce credit first, or possibly use 'echo' instead // of drain if it was supported. We would first need to understand what happens // if we reduce credit below the number of messages already in-flight before @@ -149,12 +151,26 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } } + if (pullRequest != null) { + Receiver receiver = getEndpoint(); + if (receiver.getRemoteCredit() <= 0) { + if (receiver.getQueued() <= 0) { + pullRequest.onFailure(null); + } else { + pullRequest.onSuccess(); + } + pullRequest = null; + } + } + + LOG.trace("Consumer {} flow updated, remote credit = {}", getConsumerId(), getEndpoint().getRemoteCredit()); + super.processFlowUpdates(provider); } @Override protected void doOpen() { - JmsDestination destination = resource.getDestination(); + JmsDestination destination = resource.getDestination(); String subscription = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection()); Source source = new Source(); @@ -188,8 +204,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver @Override protected void doOpenCompletion() { // Verify the attach response contained a non-null Source - org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); - if (s != null) { + org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource(); + if (source != null) { super.doOpenCompletion(); } else { // No link terminus was created, the peer will now detach/close us. @@ -199,8 +215,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver @Override protected Exception getOpenAbortException() { // Verify the attach response contained a non-null Source - org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); - if (s != null) { + org.apache.qpid.proton.amqp.transport.Source source = getEndpoint().getRemoteSource(); + if (source != null) { return super.getOpenAbortException(); } else { // No link terminus was created, the peer has detach/closed us, create IDE. @@ -377,15 +393,41 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } /** - * For a consumer whose prefetch value is set to zero this method will attempt to solicite - * a new message dispatch from the broker. + * Request a remote peer send a Message to this client. + * + * timeout < 0 then it should remain open until a message is received. + * timeout = 0 then it returns a message or null if none available + * timeout > 0 then it should remain open for timeout amount of time. + * + * The timeout value when positive is given in milliseconds. * * @param timeout + * the amount of time to tell the remote peer to keep this pull request valid. */ - public void pull(long timeout) { + public void pull(final long timeout) { + LOG.trace("Pull called on consumer {} with timouet = {}", getConsumerId(), timeout); if (resource.getPrefetchSize() == 0 && getEndpoint().getCredit() == 0) { - // expand the credit window by one. - getEndpoint().flow(1); + if (timeout < 0) { + getEndpoint().flow(1); + } else if (timeout == 0) { + pullRequest = new DrainingPullRequest(); + getEndpoint().drain(1); + } else if (timeout > 0) { + // We need to turn off the credit and signal the consumer + // that there was no message. + final ScheduledFuture<?> future = getSession().schedule(new Runnable() { + + @Override + public void run() { + if (getEndpoint().getRemoteCredit() != 0) { + getEndpoint().drain(0); + } + } + }, timeout); + + getEndpoint().flow(1); + pullRequest = new TimedPullRequest(future); + } } } @@ -397,6 +439,12 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver if (incoming != null) { if (incoming.isReadable() && !incoming.isPartial()) { LOG.trace("{} has incoming Message(s).", this); + + if (pullRequest != null) { + pullRequest.onSuccess(); + pullRequest = null; + } + try { processDelivery(incoming); } catch (Exception e) { @@ -407,11 +455,20 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver incoming = null; } } else { + LOG.info("Incoming null delivery"); + // We have exhausted the locally queued messages on this link. // Check if we tried to stop and have now run out of credit. - if (stopRequest != null && getEndpoint().getRemoteCredit() <= 0) { - stopRequest.onSuccess(); - stopRequest = null; + if (getEndpoint().getRemoteCredit() <= 0) { + if (stopRequest != null) { + stopRequest.onSuccess(); + stopRequest = null; + } + + if (pullRequest != null) { + pullRequest.onFailure(null); + pullRequest = null; + } } } } while (incoming != null); @@ -583,4 +640,42 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver */ public void postRollback() throws Exception { } + + private class DrainingPullRequest implements AsyncResult { + + @Override + public void onFailure(Throwable result) { + JmsInboundMessageDispatch pullDone = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber()); + pullDone.setConsumerId(getConsumerId()); + try { + deliver(pullDone); + } catch (Exception e) { + getSession().reportError(IOExceptionSupport.create(e)); + } + } + + @Override + public void onSuccess() { + // Nothing to do here. + } + + @Override + public boolean isComplete() { + return false; + } + } + + private class TimedPullRequest extends DrainingPullRequest { + + private final ScheduledFuture<?> completionTask; + + public TimedPullRequest(ScheduledFuture<?> completionTask) { + this.completionTask = completionTask; + } + + @Override + public void onSuccess() { + completionTask.cancel(false); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 6c17587..8625e26 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -258,7 +258,7 @@ public class AmqpFixedProducer extends AmqpProducer { Target target = new Target(); target.setAddress(targetAddress); Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(destination); - if(typeCapability != null) { + if (typeCapability != null) { target.setCapabilities(typeCapability); } @@ -282,8 +282,8 @@ public class AmqpFixedProducer extends AmqpProducer { @Override protected void doOpenCompletion() { // Verify the attach response contained a non-null target - org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); - if (t != null) { + org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget(); + if (target != null) { super.doOpenCompletion(); } else { // No link terminus was created, the peer will now detach/close us. @@ -293,8 +293,8 @@ public class AmqpFixedProducer extends AmqpProducer { @Override protected Exception getOpenAbortException() { // Verify the attach response contained a non-null target - org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); - if (t != null) { + org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget(); + if (target != null) { return super.getOpenAbortException(); } else { // No link terminus was created, the peer has detach/closed us, create IDE. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 5db3472..15c3cc2 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -1061,6 +1061,10 @@ public class AmqpProvider implements Provider, TransportListener { return remoteURI; } + ScheduledExecutorService getScheduler() { + return this.serializer; + } + private final class IdleTimeoutCheck implements Runnable { @Override public void run() { @@ -1089,7 +1093,7 @@ public class AmqpProvider implements Provider, TransportListener { LOG.trace("IdleTimeoutCheck skipping check, connection is not active."); } - if(!checkScheduled) { + if (!checkScheduled) { nextIdleTimeoutCheck = null; LOG.trace("IdleTimeoutCheck exiting"); } @@ -1097,7 +1101,7 @@ public class AmqpProvider implements Provider, TransportListener { } Principal getLocalPrincipal() { - if(transport instanceof SSLTransport) { + if (transport instanceof SSLTransport) { return ((SSLTransport) transport).getLocalPrincipal(); } @@ -1105,7 +1109,7 @@ public class AmqpProvider implements Provider, TransportListener { } private static void setHostname(Sasl sasl, String hostname) { - //TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method. + // TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method. try { Field field = sasl.getClass().getDeclaredField("_hostname"); field.setAccessible(true); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 694ce2c..2dd72fb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -18,6 +18,8 @@ package org.apache.qpid.jms.provider.amqp; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import javax.jms.IllegalStateException; @@ -213,6 +215,25 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { getTransactionContext().rollback(request); } + /** + * Allows a session resource to schedule a task for future execution. + * + * @param task + * The Runnable task to be executed after the given delay. + * @param delay + * The delay in milliseconds to schedule the given task for execution. + * + * @return a ScheduledFuture instance that can be used to cancel the task. + */ + public ScheduledFuture<?> schedule(final Runnable task, long delay) { + if (task == null) { + LOG.trace("Resource attempted to schedule a null task."); + return null; + } + + return getProvider().getScheduler().schedule(task, delay, TimeUnit.MILLISECONDS); + } + void addResource(AmqpConsumer consumer) { consumers.put(consumer.getConsumerId(), consumer); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java index 131e656..2759e7a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java @@ -125,8 +125,8 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD @Override protected void doOpenCompletion() { // Verify the attach response contained a non-null target - org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); - if (t != null) { + org.apache.qpid.proton.amqp.transport.Target target = getEndpoint().getRemoteTarget(); + if (target != null) { super.doOpenCompletion(); } else { // No link terminus was created, the peer will now detach/close us. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java index 084ae13..bfcdf87 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java @@ -37,7 +37,7 @@ import org.apache.qpid.jms.support.Wait; import org.junit.Test; /** - * + * Test for MessageConsumer that has a prefetch value of zero. */ public class JmsZeroPrefetchTest extends AmqpTestSupport { @@ -47,7 +47,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); + Queue queue = session.createQueue(getDestinationName()); MessageConsumer consumer = session.createConsumer(queue); MessageListener listener = new MessageListener() { @@ -61,13 +61,117 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { } @Test(timeout = 60000) - public void testPullConsumerWorks() throws Exception { + public void testBlockingReceivesUnBlocksOnMessageSend() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + final MessageProducer producer = session.createProducer(queue); + + Thread producerThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + Thread.sleep(1500); + producer.send(session.createTextMessage("Hello World! 1")); + } catch (Exception e) { + } + } + }); + producerThread.start(); + + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(); + assertNotNull("Should have received a message!", answer); + + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + // Assert that we only pulled one message and that we didn't cause + // the other message to be dispatched. + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testReceiveTimesOutAndRemovesCredit() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(100); + assertNull("Should have not received a message!", answer); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World! 1")); + + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + // Assert that we only pulled one message and that we didn't cause + // the other message to be dispatched. + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 1; + } + })); + + assertEquals(0, queueView.getInFlightCount()); + } + + @Test(timeout = 60000) + public void testReceiveNoWaitWaitForSever() throws Exception { connection = createAmqpConnection(); ((JmsConnection)connection).getPrefetchPolicy().setAll(0); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World! 1")); + + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receiveNoWait(); + assertNotNull("Should have received a message!", answer); + + // Send another, it should not get dispatched. + producer.send(session.createTextMessage("Hello World! 2")); + + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + + // Assert that we only pulled one message and that we didn't cause + // the other message to be dispatched. + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return queueView.getQueueSize() == 1; + } + })); + + assertEquals(0, queueView.getInFlightCount()); + } + + @Test(timeout = 60000) + public void testRepeatedPullAttempts() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Hello World!")); @@ -75,6 +179,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { MessageConsumer consumer = session.createConsumer(queue); Message answer = consumer.receive(5000); assertNotNull("Should have received a message!", answer); + // check if method will return at all and will return a null answer = consumer.receive(1); assertNull("Should have not received a message!", answer); @@ -89,12 +194,12 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); + Queue queue = session.createQueue(getDestinationName()); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Hello World! 1")); producer.send(session.createTextMessage("Hello World! 2")); - final QueueViewMBean queueView = getProxyToQueue(name.getMethodName()); + final QueueViewMBean queueView = getProxyToQueue(getDestinationName()); // Check initial Queue State assertEquals(2, queueView.getQueueSize()); @@ -125,7 +230,7 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(name.getMethodName()); + Queue queue = session.createQueue(getDestinationName()); MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Msg1")); @@ -144,4 +249,30 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport { answer = (TextMessage)consumer2.receiveNoWait(); assertNull("Should have not received a message!", answer); } + + @Test(timeout = 60000) + public void testConsumerWithNoMessageDoesNotHogMessages() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + // Try and receive one message which will fail + MessageConsumer consumer1 = session.createConsumer(queue); + assertNull(consumer1.receive(10)); + + // Now Producer a message + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + + // now lets receive it with the second consumer, the first should + // not be accepting messages now and the broker should give it to + // consumer 2. + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer2.receive(3000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9f6ecd3b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties index 2b107ef..ce9b95c 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties @@ -21,7 +21,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.qpid.jms=INFO -log4j.logger.org.apache.qpid.jms.provider=DEBUG +log4j.logger.org.apache.qpid.jms.provider=INFO # Tune the ActiveMQ and it's AMQP transport as needed for debugging. log4j.logger.org.apache.activemq=INFO --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
