Author: robbie
Date: Wed Feb  1 15:19:32 2012
New Revision: 1239166

URL: http://svn.apache.org/viewvc?rev=1239166&view=rev
Log:
QPID-3790: Add a method AMQSession.getQueueDepth(AMQDestionation, boolean) to 
sync session (if specified) before sending QueueQuery command

Applied patch from Andrew MacBean <[email protected]> and Oleksandr 
Rudyy<[email protected]>.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1239166&r1=1239165&r2=1239166&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Feb  1 15:19:32 2012
@@ -2752,18 +2752,38 @@ public abstract class AMQSession<C exten
     public long getQueueDepth(final AMQDestination amqd)
             throws AMQException
     {
-        return new FailoverNoopSupport<Long, AMQException>(
-                new FailoverProtectedOperation<Long, AMQException>()
-                {
-                    public Long execute() throws AMQException, 
FailoverException
-                    {
-                        return requestQueueDepth(amqd);
-                    }
-                }, _connection).execute();
+        return getQueueDepth(amqd, false);
+    }
 
+    /**
+     * Returns the number of messages currently queued by the given
+     * destination. Syncs session before receiving the queue depth if sync is
+     * set to true.
+     *
+     * @param amqd AMQ destination to get the depth value
+     * @param sync flag to sync session before receiving the queue depth
+     * @return queue depth
+     * @throws AMQException
+     */
+    public long getQueueDepth(final AMQDestination amqd, final boolean sync) 
throws AMQException
+    {
+        return new FailoverNoopSupport<Long, AMQException>(new 
FailoverProtectedOperation<Long, AMQException>()
+        {
+            public Long execute() throws AMQException, FailoverException
+            {
+                try
+                {
+                    return requestQueueDepth(amqd, sync);
+                }
+                catch (TransportException e)
+                {
+                    throw new 
AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+                }
+            }
+        }, _connection).execute();
     }
 
-    protected abstract Long requestQueueDepth(AMQDestination amqd) throws 
AMQException, FailoverException;
+    protected abstract Long requestQueueDepth(AMQDestination amqd, boolean 
sync) throws AMQException, FailoverException;
 
     /**
      * Declares the named exchange and type of exchange.

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1239166&r1=1239165&r2=1239166&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Feb  1 15:19:32 2012
@@ -951,9 +951,13 @@ public class AMQSession_0_10 extends AMQ
                 }, getAMQConnection()).execute();
     }
 
-    protected Long requestQueueDepth(AMQDestination amqd)
+    protected Long requestQueueDepth(AMQDestination amqd, boolean sync)
     {
         flushAcknowledgments();
+        if (sync)
+        {
+            getQpidSession().sync();
+        }
         return 
getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
     }
 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1239166&r1=1239165&r2=1239166&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Wed Feb  1 15:19:32 2012
@@ -587,7 +587,7 @@ public class AMQSession_0_8 extends AMQS
         }
     }
 
-    protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, 
FailoverException
+    protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws 
AMQException, FailoverException
     {
         AMQFrame queueDeclare =
             getMethodRegistry().createQueueDeclareBody(getTicket(),

Modified: 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1239166&r1=1239165&r2=1239166&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
 Wed Feb  1 15:19:32 2012
@@ -18,9 +18,8 @@
  */
 package org.apache.qpid.client;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.Connection.SessionFactory;
 import org.apache.qpid.transport.Connection.State;
@@ -39,7 +38,7 @@ import java.util.List;
  * {@link SessionException} is not thrown from methods of
  * {@link AMQSession_0_10}.
  */
-public class AMQSession_0_10Test extends TestCase
+public class AMQSession_0_10Test extends QpidTestCase
 {
 
     public void testExceptionOnCommit()
@@ -460,6 +459,28 @@ public class AMQSession_0_10Test extends
         assertNotNull("ExchangeDeclare event was not sent", event);
     }
 
+    public void testGetQueueDepthWithSync()
+    {
+        // slow down a flush thread
+        setTestSystemProperty("qpid.session.max_ack_delay", "10000");
+        AMQSession_0_10 session =  createAMQSession_0_10(false, 
javax.jms.Session.DUPS_OK_ACKNOWLEDGE);
+        try
+        {
+            session.acknowledgeMessage(-1, false);
+            session.getQueueDepth(createDestination(), true);
+        }
+        catch (Exception e)
+        {
+            fail("Unexpected exception is cought:" + e.getMessage());
+        }
+        ProtocolEvent command = findSentProtocolEventOfClass(session, 
MessageAccept.class, false);
+        assertNotNull("MessageAccept command was not sent", command);
+        command = findSentProtocolEventOfClass(session, ExecutionSync.class, 
false);
+        assertNotNull("ExecutionSync command was not sent", command);
+        command = findSentProtocolEventOfClass(session, QueueQuery.class, 
false);
+        assertNotNull("QueueQuery command was not sent", command);
+    }
+
     private AMQAnyDestination createDestination()
     {
         AMQAnyDestination destination = null;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to