Author: robbie Date: Wed Feb 1 15:19:32 2012 New Revision: 1239166 URL: http://svn.apache.org/viewvc?rev=1239166&view=rev Log: QPID-3790: Add a method AMQSession.getQueueDepth(AMQDestionation, boolean) to sync session (if specified) before sending QueueQuery command
Applied patch from Andrew MacBean <[email protected]> and Oleksandr Rudyy<[email protected]>. Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1239166&r1=1239165&r2=1239166&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Feb 1 15:19:32 2012 @@ -2752,18 +2752,38 @@ public abstract class AMQSession<C exten public long getQueueDepth(final AMQDestination amqd) throws AMQException { - return new FailoverNoopSupport<Long, AMQException>( - new FailoverProtectedOperation<Long, AMQException>() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); + return getQueueDepth(amqd, false); + } + /** + * Returns the number of messages currently queued by the given + * destination. Syncs session before receiving the queue depth if sync is + * set to true. + * + * @param amqd AMQ destination to get the depth value + * @param sync flag to sync session before receiving the queue depth + * @return queue depth + * @throws AMQException + */ + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + { + return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>() + { + public Long execute() throws AMQException, FailoverException + { + try + { + return requestQueueDepth(amqd, sync); + } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } + } + }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1239166&r1=1239165&r2=1239166&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Feb 1 15:19:32 2012 @@ -951,9 +951,13 @@ public class AMQSession_0_10 extends AMQ }, getAMQConnection()).execute(); } - protected Long requestQueueDepth(AMQDestination amqd) + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) { flushAcknowledgments(); + if (sync) + { + getQpidSession().sync(); + } return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1239166&r1=1239165&r2=1239166&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Feb 1 15:19:32 2012 @@ -587,7 +587,7 @@ public class AMQSession_0_8 extends AMQS } } - protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(), Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1239166&r1=1239165&r2=1239166&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original) +++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Wed Feb 1 15:19:32 2012 @@ -18,9 +18,8 @@ */ package org.apache.qpid.client; -import junit.framework.TestCase; - import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.*; import org.apache.qpid.transport.Connection.SessionFactory; import org.apache.qpid.transport.Connection.State; @@ -39,7 +38,7 @@ import java.util.List; * {@link SessionException} is not thrown from methods of * {@link AMQSession_0_10}. */ -public class AMQSession_0_10Test extends TestCase +public class AMQSession_0_10Test extends QpidTestCase { public void testExceptionOnCommit() @@ -460,6 +459,28 @@ public class AMQSession_0_10Test extends assertNotNull("ExchangeDeclare event was not sent", event); } + public void testGetQueueDepthWithSync() + { + // slow down a flush thread + setTestSystemProperty("qpid.session.max_ack_delay", "10000"); + AMQSession_0_10 session = createAMQSession_0_10(false, javax.jms.Session.DUPS_OK_ACKNOWLEDGE); + try + { + session.acknowledgeMessage(-1, false); + session.getQueueDepth(createDestination(), true); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent command = findSentProtocolEventOfClass(session, MessageAccept.class, false); + assertNotNull("MessageAccept command was not sent", command); + command = findSentProtocolEventOfClass(session, ExecutionSync.class, false); + assertNotNull("ExecutionSync command was not sent", command); + command = findSentProtocolEventOfClass(session, QueueQuery.class, false); + assertNotNull("QueueQuery command was not sent", command); + } + private AMQAnyDestination createDestination() { AMQAnyDestination destination = null; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
