Author: kwall
Date: Tue Jun 26 14:05:04 2012
New Revision: 1354023

URL: http://svn.apache.org/viewvc?rev=1354023&view=rev
Log:
NO-JIRA: Expose producer flow blocked through Session model object

Wire-up producerFlowBlocked attribute to the JMX Connection MBean.  Bolster the 
ManagedConnectionMBeanTests to test the
correct behaviour of the attribute as a producing session cycles through not 
blocked -> blocked -> not blocked.

This attribute is temporary; eventually we need attribute(s) that expose the 
'blocked' or 'credit' status of both
consuming and producing sessions.

Modified:
    
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
    
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
 Tue Jun 26 14:05:04 2012
@@ -125,16 +125,14 @@ public class ConnectionMBean extends Abs
         {
             Statistics statistics = session.getStatistics();
             Long txnBegins = (Long) 
statistics.getStatistic(Session.LOCAL_TRANSACTION_BEGINS);
-
-            boolean isTransactional = (txnBegins>0l);
-
+            Integer channelId = (Integer) 
session.getAttribute(Session.CHANNEL_ID);
             int unacknowledgedSize = ((Number) 
statistics.getStatistic(Session.UNACKNOWLEDGED_MESSAGES)).intValue();
-
-            boolean blocked = false; // TODO - implement query as to whether 
session is blocked
+            boolean blocked = (Boolean) 
session.getAttribute(Session.PRODUCER_FLOW_BLOCKED);
+            boolean isTransactional = (txnBegins>0l);
 
             Object[] itemValues =
                     {
-                            (Integer) session.getAttribute(Session.CHANNEL_ID),
+                            channelId,
                             isTransactional,
                             null, // TODO - default queue (which is 
meaningless)
                             unacknowledgedSize,
@@ -164,7 +162,7 @@ public class ConnectionMBean extends Abs
         getConfiguredObject().delete();
     }
 
-    public synchronized boolean isStatisticsEnabled()
+    public boolean isStatisticsEnabled()
     {
         return true;
     }

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
 Tue Jun 26 14:05:04 2012
@@ -102,8 +102,9 @@ public class ConnectionMBeanTest extends
         int unacknowledgedMessages = 2;
         long localTransactionBegins = 1;
         boolean transactional = true;
+        boolean blocked = false;
 
-        Session mockSession = createMockedSession(channelId, 
unacknowledgedMessages, localTransactionBegins);
+        Session mockSession = createMockedSession(channelId, 
unacknowledgedMessages, localTransactionBegins, blocked);
 
         
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
 
@@ -111,7 +112,7 @@ public class ConnectionMBeanTest extends
         assertEquals("Unexpected number of rows in table", 1, table.size());
 
         final CompositeData row = table.get(new Integer[] {channelId} );
-        assertRow(row, channelId, unacknowledgedMessages, transactional);
+        assertChannelRow(row, channelId, unacknowledgedMessages, 
transactional, blocked);
     }
 
     public void testChannelsWithSingleNonTransactionalSession() throws 
Exception
@@ -120,8 +121,9 @@ public class ConnectionMBeanTest extends
         int unacknowledgedMessages = 2;
         long localTransactionBegins = 0;
         boolean transactional  = false;
+        boolean blocked = false;
 
-        Session mockSession = createMockedSession(channelId, 
unacknowledgedMessages, localTransactionBegins);
+        Session mockSession = createMockedSession(channelId, 
unacknowledgedMessages, localTransactionBegins, blocked);
 
         
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
 
@@ -129,7 +131,26 @@ public class ConnectionMBeanTest extends
         assertEquals("Unexpected number of rows in table", 1, table.size());
 
         final CompositeData row = table.get(new Integer[] {channelId} );
-        assertRow(row, channelId, unacknowledgedMessages, transactional);
+        assertChannelRow(row, channelId, unacknowledgedMessages, 
transactional, blocked);
+    }
+
+    public void testChannelsWithSessionBlocked() throws Exception
+    {
+        int channelId = 10;
+        int unacknowledgedMessages = 2;
+        long localTransactionBegins = 0;
+        boolean transactional  = false;
+        boolean blocked = true;
+
+        Session mockSession = createMockedSession(channelId, 
unacknowledgedMessages, localTransactionBegins, blocked);
+
+        
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
+
+        TabularData table = _connectionMBean.channels();
+        assertEquals("Unexpected number of rows in table", 1, table.size());
+
+        final CompositeData row = table.get(new Integer[] {channelId} );
+        assertChannelRow(row, channelId, unacknowledgedMessages, 
transactional, blocked);
     }
 
     public void testParentObjectIsVirtualHost()
@@ -190,15 +211,16 @@ public class ConnectionMBeanTest extends
         assertEquals("Unexpected " + jmxAttributeName, expectedValue, 
actualValue);
     }
 
-    private void assertRow(final CompositeData row, int channelId, int 
unacknowledgedMessages, boolean isTransactional)
+    private void assertChannelRow(final CompositeData row, int channelId, int 
unacknowledgedMessages, boolean isTransactional, boolean flowBlocked)
     {
         assertNotNull("No row for channel id " + channelId, row);
         assertEquals("Unexpected channel id", channelId, 
row.get(ManagedConnection.CHAN_ID));
         assertEquals("Unexpected transactional flag", isTransactional, 
row.get(ManagedConnection.TRANSACTIONAL));
         assertEquals("Unexpected unacknowledged message count", 
unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT));
+        assertEquals("Unexpected flow blocked", flowBlocked, 
row.get(ManagedConnection.FLOW_BLOCKED));
     }
 
-    private Session createMockedSession(int channelId, int 
unacknowledgedMessages, long localTransactionBegins)
+    private Session createMockedSession(int channelId, int 
unacknowledgedMessages, long localTransactionBegins, boolean blocked)
     {
         Session mockSession = mock(Session.class);
         Statistics mockSessionStatistics = mock(Statistics.class);
@@ -207,6 +229,7 @@ public class ConnectionMBeanTest extends
 
         when(mockSession.getStatistics()).thenReturn(mockSessionStatistics);
         
when(mockSession.getAttribute(Session.CHANNEL_ID)).thenReturn(channelId);
+        
when(mockSession.getAttribute(Session.PRODUCER_FLOW_BLOCKED)).thenReturn(blocked);
         return mockSession;
     }
 }

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Jun 26 14:05:04 2012
@@ -140,7 +140,7 @@ public class AMQChannel implements Sessi
 
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new 
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
-    // Set of messages being acknoweledged in the current transaction
+    // Set of messages being acknowledged in the current transaction
     private SortedSet<QueueEntry> _acknowledgedMessages = new 
TreeSet<QueueEntry>();
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -445,7 +445,7 @@ public class AMQChannel implements Sessi
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
-     * @param noLocal   Flag stopping own messages being receivied.
+     * @param noLocal   Flag stopping own messages being received.
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used 
in subsequent unsubscribe requests
      *
@@ -1434,6 +1434,7 @@ public class AMQChannel implements Sessi
         _session.writeFrame(responseBody.generateFrame(_channelId));
     }
 
+    @Override
     public boolean getBlocking()
     {
         return _blocking.get();

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
 Tue Jun 26 14:05:04 2012
@@ -61,6 +61,9 @@ public interface Session extends Configu
     public static final String UPDATED = "updated";
 
     public static final String CHANNEL_ID = "channelId";
+    // PRODUCER_FLOW_BLOCKED is exposed as an interim step.  We will expose 
attribute(s) that exposing
+    // available credit of both producer and consumer sides.
+    public static final String PRODUCER_FLOW_BLOCKED = "producerFlowBlocked";
 
     public static final Collection<String> AVAILABLE_ATTRIBUTES =
             Collections.unmodifiableCollection(Arrays.asList(ID,
@@ -71,8 +74,8 @@ public interface Session extends Configu
                                                              TIME_TO_LIVE,
                                                              CREATED,
                                                              UPDATED,
-                                                             CHANNEL_ID));
-
+                                                             CHANNEL_ID,
+                                                             
PRODUCER_FLOW_BLOCKED));
 
     Collection<Consumer> getSubscriptions();
     Collection<Publisher> getPublishers();

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 Tue Jun 26 14:05:04 2012
@@ -132,6 +132,10 @@ final class SessionAdapter extends Abstr
         {
             return _session.getChannelId();
         }
+        else if(name.equals(PRODUCER_FLOW_BLOCKED))
+        {
+            return _session.getBlocking();
+        }
         return super.getAttribute(name);    //TODO - Implement
     }
 

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 Tue Jun 26 14:05:04 2012
@@ -72,6 +72,7 @@ public interface AMQSessionModel extends
 
     void unblock();
 
+    boolean getBlocking();
 
     boolean onSameConnection(InboundMessage inbound);
 

Modified: 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
 Tue Jun 26 14:05:04 2012
@@ -1,5 +1,3 @@
-package org.apache.qpid.server.subscription;
-
 /*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,10 +19,11 @@ package org.apache.qpid.server.subscript
 *
 */
 
+package org.apache.qpid.server.subscription;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.message.InboundMessage;
@@ -83,21 +82,6 @@ public class MockSubscription implements
         _state = State.CLOSED;
     }
 
-    public boolean filtersMessages()
-    {
-        return false;
-    }
-
-    public AMQChannel getChannel()
-    {
-        return null;
-    }
-
-    public AMQShortString getConsumerTag()
-    {
-        return tag;
-    }
-
     public String getConsumerName()
     {
         return tag == null ? null : tag.asString();
@@ -191,11 +175,6 @@ public class MockSubscription implements
         return _isActive ;
     }
 
-    public void confirmAutoClose()
-    {
-
-    }
-
     public void set(String key, Object value)
     {
     }
@@ -210,11 +189,6 @@ public class MockSubscription implements
         return false;
     }
 
-    public boolean isBrowser()
-    {
-        return false;
-    }
-
     public boolean isClosed()
     {
         return _closed;
@@ -244,10 +218,6 @@ public class MockSubscription implements
         _stateChangeLock.unlock();
     }
 
-    public void resend(QueueEntry entry) throws AMQException
-    {
-    }
-
     public void onDequeue(QueueEntry queueEntry)
     {
     }
@@ -387,6 +357,12 @@ public class MockSubscription implements
         }
 
         @Override
+        public boolean getBlocking()
+        {
+            return false;
+        }
+
+        @Override
         public boolean onSameConnection(InboundMessage inbound)
         {
             return false;

Modified: 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
--- 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
 (original)
+++ 
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
 Tue Jun 26 14:05:04 2012
@@ -18,26 +18,29 @@
  */
 package org.apache.qpid.management.jmx;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
 import javax.management.openmbean.TabularData;
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
 {
@@ -56,30 +59,72 @@ public class ManagedConnectionMBeanTest 
 
     public void tearDown() throws Exception
     {
-        if (_jmxUtils != null)
+        try
+        {
+            if (_jmxUtils != null)
+            {
+                _jmxUtils.close();
+            }
+        }
+        finally
         {
-            _jmxUtils.close();
+            super.tearDown();
         }
-        super.tearDown();
     }
 
-    public void testTransactedSession() throws Exception
+    public void 
testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws 
Exception
     {
+        assertEquals("Expected no managed connections", 0, 
getManagedConnections().size());
+
         _connection = getConnection();
-        final Session session = _connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        assertEquals("Expected one managed connection", 1, 
getManagedConnections().size());
 
-        receiveTwoMessagesWithoutCommit(session);
+        _connection.close();
+        assertEquals("Expected no managed connections after client connection 
closed", 0, getManagedConnections().size());
+    }
+
+    public void testGetAttributes() throws Exception
+    {
+        _connection = getConnection();
+        final ManagedConnection mBean = getConnectionMBean();
+
+        checkAuthorisedId(mBean);
+        checkClientVersion(mBean);
+        checkClientId(mBean);
+    }
+
+    public void testNonTransactedSession() throws Exception
+    {
+        _connection = getConnection();
+
+        boolean transactional = false;
+        boolean flowBlocked = false;
+
+        _connection.createSession(transactional, Session.AUTO_ACKNOWLEDGE);
 
         final ManagedConnection mBean = getConnectionMBean();
+        final CompositeDataSupport row = getTheOneChannelRow(mBean);
+        assertChannelRowData(row, 0, transactional, flowBlocked);
+    }
+
+    public void testTransactedSessionWithUnackMessages() throws Exception
+    {
+        _connection = getConnection();
+        _connection.start();
+
+        boolean transactional = true;
+        int numberOfMessages = 2;
+        final Session session = _connection.createSession(transactional, 
Session.SESSION_TRANSACTED);
+        final Destination destination = 
session.createQueue(getTestQueueName());
+        final MessageConsumer consumer = session.createConsumer(destination);
+
+        sendMessage(session, destination, numberOfMessages);
+        receiveMessagesWithoutCommit(consumer, numberOfMessages);
 
-        // check session details in the row
+        final ManagedConnection mBean = getConnectionMBean();
         final CompositeDataSupport row = getTheOneChannelRow(mBean);
-        final Number unackCount = (Number) 
row.get(ManagedConnection.UNACKED_COUNT);
-        final Boolean transactional = (Boolean) 
row.get(ManagedConnection.TRANSACTIONAL);
-        final Boolean flowBlocked = (Boolean) 
row.get(ManagedConnection.FLOW_BLOCKED);
-        assertEquals("Unexpected number of unacknowledged messages", 2, 
unackCount);
-        assertTrue("Unexpected transaction flag", transactional);
-        assertFalse("Unexpected value of flow blocked flag", flowBlocked);
+        boolean flowBlocked = false;
+        assertChannelRowData(row, numberOfMessages, transactional, 
flowBlocked);
 
         // check that commit advances the lastIoTime
         final Date initialLastIOTime = mBean.getLastIoTime();
@@ -92,25 +137,66 @@ public class ManagedConnectionMBeanTest 
         assertEquals("Unexpected number of unacknowledged messages", 0, 
unackCountAfterCommit);
     }
 
-    public void 
testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws 
Exception
-    {
-        assertEquals("Expected no managed connections", 0, 
getManagedConnections().size());
 
+    public void testProducerFlowBlocked() throws Exception
+    {
         _connection = getConnection();
-        assertEquals("Expected one managed connection", 1, 
getManagedConnections().size());
+        _connection.start();
 
-        _connection.close();
-        assertEquals("Expected no managed connections after client connection 
closed", 0, getManagedConnections().size());
+        String queueName = getTestQueueName();
+        Session session = _connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(queueName);
+        createQueueOnBroker(session, queue);
+
+        ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+        managedQueue.setFlowResumeCapacity(DEFAULT_MESSAGE_SIZE * 2l);
+        managedQueue.setCapacity(DEFAULT_MESSAGE_SIZE * 3l);
+
+        final ManagedConnection managedConnection = getConnectionMBean();
+
+        // Check that producer flow is not block before test
+        final CompositeDataSupport rowBeforeSend = 
getTheOneChannelRow(managedConnection);
+        assertFlowBlocked(rowBeforeSend, false);
+
+
+        // Check that producer flow does not become block too soon
+        sendMessage(session, queue, 3);
+        final CompositeDataSupport rowBeforeFull = 
getTheOneChannelRow(managedConnection);
+        assertFlowBlocked(rowBeforeFull, false);
+
+        // Fourth message will over-fill the queue (but as we are not sending 
more messages, client thread wont't block)
+        sendMessage(session, queue, 1);
+        final CompositeDataSupport rowAfterFull = 
getTheOneChannelRow(managedConnection);
+        assertFlowBlocked(rowAfterFull, true);
+
+        // Consume two to bring the queue down to the resume capacity
+        MessageConsumer consumer = session.createConsumer(queue);
+        assertNotNull("Could not receive first message", 
consumer.receive(1000));
+        assertNotNull("Could not receive second message", 
consumer.receive(1000));
+        session.commit();
+
+        // Check that producer flow is no longer blocked
+        final CompositeDataSupport rowAfterReceive = 
getTheOneChannelRow(managedConnection);
+        assertFlowBlocked(rowAfterReceive, false);
     }
 
-    public void testGetAttributes() throws Exception
+    private void createQueueOnBroker(Session session, Destination destination) 
throws JMSException
     {
-        _connection = getConnection();
-        final ManagedConnection mBean = getConnectionMBean();
+        session.createConsumer(destination).close(); // Create a consumer only 
to cause queue creation
+    }
 
-        checkAuthorisedId(mBean);
-        checkClientVersion(mBean);
-        checkClientId(mBean);
+    private void assertChannelRowData(final CompositeData row, int 
unacknowledgedMessages, boolean isTransactional, boolean flowBlocked)
+    {
+        assertNotNull(row);
+        assertEquals("Unexpected transactional flag", isTransactional, 
row.get(ManagedConnection.TRANSACTIONAL));
+        assertEquals("Unexpected unacknowledged message count", 
unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT));
+        assertEquals("Unexpected flow blocked", flowBlocked, 
row.get(ManagedConnection.FLOW_BLOCKED));
+    }
+
+    private void assertFlowBlocked(final CompositeData row, boolean 
flowBlocked)
+    {
+        assertNotNull(row);
+        assertEquals("Unexpected flow blocked", flowBlocked, 
row.get(ManagedConnection.FLOW_BLOCKED));
     }
 
     private void checkAuthorisedId(ManagedConnection mBean) throws Exception
@@ -144,9 +230,15 @@ public class ManagedConnectionMBeanTest 
         return mBean;
     }
 
-    private CompositeDataSupport getTheOneChannelRow(final ManagedConnection 
mBean) throws IOException, JMException
+    private List<ManagedConnection> getManagedConnections()
     {
-        TabularData channelsData = mBean.channels();
+        return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME);
+    }
+
+    private CompositeDataSupport getTheOneChannelRow(final ManagedConnection 
mBean) throws Exception
+    {
+        TabularData channelsData = getChannelsDataWithRetry(mBean);
+
         assertEquals("Unexpected number of rows in channel table", 1, 
channelsData.size());
 
         @SuppressWarnings("unchecked")
@@ -155,17 +247,8 @@ public class ManagedConnectionMBeanTest 
         return row;
     }
 
-    private void receiveTwoMessagesWithoutCommit(final Session session) throws 
JMSException, Exception
+    private void receiveMessagesWithoutCommit(final MessageConsumer consumer, 
int numberOfMessages) throws Exception
     {
-        final Destination destination = 
session.createQueue(getTestQueueName());
-        final MessageConsumer consumer = session.createConsumer(destination);
-
-        // send two messages ...
-        final int numberOfMessages = 2;
-        sendMessage(session, destination, numberOfMessages);
-        _connection.start();
-
-        // receive both messages but don't commit
         for (int i = 0; i < numberOfMessages; i++)
         {
             final Message m = consumer.receive(1000l);
@@ -173,9 +256,28 @@ public class ManagedConnectionMBeanTest 
         }
     }
 
-    private List<ManagedConnection> getManagedConnections()
+    private TabularData getChannelsDataWithRetry(final ManagedConnection mBean)
+            throws IOException, JMException
     {
-        return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME);
+        TabularData channelsData = mBean.channels();
+        int retries = 0;
+        while(channelsData.size() == 0 && retries < 5)
+        {
+            sleep();
+            channelsData = mBean.channels();
+            retries++;
+        }
+        return channelsData;
     }
 
-}
+    private void sleep()
+    {
+        try
+        {
+            Thread.sleep(50);
+        }
+        catch (InterruptedException ie)
+        {
+            Thread.currentThread().interrupt();
+        }
+    }}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to