Author: kwall
Date: Tue Jun 26 14:05:04 2012
New Revision: 1354023
URL: http://svn.apache.org/viewvc?rev=1354023&view=rev
Log:
NO-JIRA: Expose producer flow blocked through Session model object
Wire-up producerFlowBlocked attribute to the JMX Connection MBean. Bolster the
ManagedConnectionMBeanTests to test the
correct behaviour of the attribute as a producing session cycles through not
blocked -> blocked -> not blocked.
This attribute is temporary; eventually we need attribute(s) that expose the
'blocked' or 'credit' status of both
consuming and producing sessions.
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
Tue Jun 26 14:05:04 2012
@@ -125,16 +125,14 @@ public class ConnectionMBean extends Abs
{
Statistics statistics = session.getStatistics();
Long txnBegins = (Long)
statistics.getStatistic(Session.LOCAL_TRANSACTION_BEGINS);
-
- boolean isTransactional = (txnBegins>0l);
-
+ Integer channelId = (Integer)
session.getAttribute(Session.CHANNEL_ID);
int unacknowledgedSize = ((Number)
statistics.getStatistic(Session.UNACKNOWLEDGED_MESSAGES)).intValue();
-
- boolean blocked = false; // TODO - implement query as to whether
session is blocked
+ boolean blocked = (Boolean)
session.getAttribute(Session.PRODUCER_FLOW_BLOCKED);
+ boolean isTransactional = (txnBegins>0l);
Object[] itemValues =
{
- (Integer) session.getAttribute(Session.CHANNEL_ID),
+ channelId,
isTransactional,
null, // TODO - default queue (which is
meaningless)
unacknowledgedSize,
@@ -164,7 +162,7 @@ public class ConnectionMBean extends Abs
getConfiguredObject().delete();
}
- public synchronized boolean isStatisticsEnabled()
+ public boolean isStatisticsEnabled()
{
return true;
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBeanTest.java
Tue Jun 26 14:05:04 2012
@@ -102,8 +102,9 @@ public class ConnectionMBeanTest extends
int unacknowledgedMessages = 2;
long localTransactionBegins = 1;
boolean transactional = true;
+ boolean blocked = false;
- Session mockSession = createMockedSession(channelId,
unacknowledgedMessages, localTransactionBegins);
+ Session mockSession = createMockedSession(channelId,
unacknowledgedMessages, localTransactionBegins, blocked);
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
@@ -111,7 +112,7 @@ public class ConnectionMBeanTest extends
assertEquals("Unexpected number of rows in table", 1, table.size());
final CompositeData row = table.get(new Integer[] {channelId} );
- assertRow(row, channelId, unacknowledgedMessages, transactional);
+ assertChannelRow(row, channelId, unacknowledgedMessages,
transactional, blocked);
}
public void testChannelsWithSingleNonTransactionalSession() throws
Exception
@@ -120,8 +121,9 @@ public class ConnectionMBeanTest extends
int unacknowledgedMessages = 2;
long localTransactionBegins = 0;
boolean transactional = false;
+ boolean blocked = false;
- Session mockSession = createMockedSession(channelId,
unacknowledgedMessages, localTransactionBegins);
+ Session mockSession = createMockedSession(channelId,
unacknowledgedMessages, localTransactionBegins, blocked);
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
@@ -129,7 +131,26 @@ public class ConnectionMBeanTest extends
assertEquals("Unexpected number of rows in table", 1, table.size());
final CompositeData row = table.get(new Integer[] {channelId} );
- assertRow(row, channelId, unacknowledgedMessages, transactional);
+ assertChannelRow(row, channelId, unacknowledgedMessages,
transactional, blocked);
+ }
+
+ public void testChannelsWithSessionBlocked() throws Exception
+ {
+ int channelId = 10;
+ int unacknowledgedMessages = 2;
+ long localTransactionBegins = 0;
+ boolean transactional = false;
+ boolean blocked = true;
+
+ Session mockSession = createMockedSession(channelId,
unacknowledgedMessages, localTransactionBegins, blocked);
+
+
when(_mockConnection.getSessions()).thenReturn(Collections.singletonList(mockSession));
+
+ TabularData table = _connectionMBean.channels();
+ assertEquals("Unexpected number of rows in table", 1, table.size());
+
+ final CompositeData row = table.get(new Integer[] {channelId} );
+ assertChannelRow(row, channelId, unacknowledgedMessages,
transactional, blocked);
}
public void testParentObjectIsVirtualHost()
@@ -190,15 +211,16 @@ public class ConnectionMBeanTest extends
assertEquals("Unexpected " + jmxAttributeName, expectedValue,
actualValue);
}
- private void assertRow(final CompositeData row, int channelId, int
unacknowledgedMessages, boolean isTransactional)
+ private void assertChannelRow(final CompositeData row, int channelId, int
unacknowledgedMessages, boolean isTransactional, boolean flowBlocked)
{
assertNotNull("No row for channel id " + channelId, row);
assertEquals("Unexpected channel id", channelId,
row.get(ManagedConnection.CHAN_ID));
assertEquals("Unexpected transactional flag", isTransactional,
row.get(ManagedConnection.TRANSACTIONAL));
assertEquals("Unexpected unacknowledged message count",
unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT));
+ assertEquals("Unexpected flow blocked", flowBlocked,
row.get(ManagedConnection.FLOW_BLOCKED));
}
- private Session createMockedSession(int channelId, int
unacknowledgedMessages, long localTransactionBegins)
+ private Session createMockedSession(int channelId, int
unacknowledgedMessages, long localTransactionBegins, boolean blocked)
{
Session mockSession = mock(Session.class);
Statistics mockSessionStatistics = mock(Statistics.class);
@@ -207,6 +229,7 @@ public class ConnectionMBeanTest extends
when(mockSession.getStatistics()).thenReturn(mockSessionStatistics);
when(mockSession.getAttribute(Session.CHANNEL_ID)).thenReturn(channelId);
+
when(mockSession.getAttribute(Session.PRODUCER_FLOW_BLOCKED)).thenReturn(blocked);
return mockSession;
}
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Tue Jun 26 14:05:04 2012
@@ -140,7 +140,7 @@ public class AMQChannel implements Sessi
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknoweledged in the current transaction
+ // Set of messages being acknowledged in the current transaction
private SortedSet<QueueEntry> _acknowledgedMessages = new
TreeSet<QueueEntry>();
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -445,7 +445,7 @@ public class AMQChannel implements Sessi
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being receivied.
+ * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used
in subsequent unsubscribe requests
*
@@ -1434,6 +1434,7 @@ public class AMQChannel implements Sessi
_session.writeFrame(responseBody.generateFrame(_channelId));
}
+ @Override
public boolean getBlocking()
{
return _blocking.get();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
Tue Jun 26 14:05:04 2012
@@ -61,6 +61,9 @@ public interface Session extends Configu
public static final String UPDATED = "updated";
public static final String CHANNEL_ID = "channelId";
+ // PRODUCER_FLOW_BLOCKED is exposed as an interim step. We will expose
attribute(s) that exposing
+ // available credit of both producer and consumer sides.
+ public static final String PRODUCER_FLOW_BLOCKED = "producerFlowBlocked";
public static final Collection<String> AVAILABLE_ATTRIBUTES =
Collections.unmodifiableCollection(Arrays.asList(ID,
@@ -71,8 +74,8 @@ public interface Session extends Configu
TIME_TO_LIVE,
CREATED,
UPDATED,
- CHANNEL_ID));
-
+ CHANNEL_ID,
+
PRODUCER_FLOW_BLOCKED));
Collection<Consumer> getSubscriptions();
Collection<Publisher> getPublishers();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
Tue Jun 26 14:05:04 2012
@@ -132,6 +132,10 @@ final class SessionAdapter extends Abstr
{
return _session.getChannelId();
}
+ else if(name.equals(PRODUCER_FLOW_BLOCKED))
+ {
+ return _session.getBlocking();
+ }
return super.getAttribute(name); //TODO - Implement
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Tue Jun 26 14:05:04 2012
@@ -72,6 +72,7 @@ public interface AMQSessionModel extends
void unblock();
+ boolean getBlocking();
boolean onSameConnection(InboundMessage inbound);
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Tue Jun 26 14:05:04 2012
@@ -1,5 +1,3 @@
-package org.apache.qpid.server.subscription;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,10 +19,11 @@ package org.apache.qpid.server.subscript
*
*/
+package org.apache.qpid.server.subscription;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
@@ -83,21 +82,6 @@ public class MockSubscription implements
_state = State.CLOSED;
}
- public boolean filtersMessages()
- {
- return false;
- }
-
- public AMQChannel getChannel()
- {
- return null;
- }
-
- public AMQShortString getConsumerTag()
- {
- return tag;
- }
-
public String getConsumerName()
{
return tag == null ? null : tag.asString();
@@ -191,11 +175,6 @@ public class MockSubscription implements
return _isActive ;
}
- public void confirmAutoClose()
- {
-
- }
-
public void set(String key, Object value)
{
}
@@ -210,11 +189,6 @@ public class MockSubscription implements
return false;
}
- public boolean isBrowser()
- {
- return false;
- }
-
public boolean isClosed()
{
return _closed;
@@ -244,10 +218,6 @@ public class MockSubscription implements
_stateChangeLock.unlock();
}
- public void resend(QueueEntry entry) throws AMQException
- {
- }
-
public void onDequeue(QueueEntry queueEntry)
{
}
@@ -387,6 +357,12 @@ public class MockSubscription implements
}
@Override
+ public boolean getBlocking()
+ {
+ return false;
+ }
+
+ @Override
public boolean onSameConnection(InboundMessage inbound)
{
return false;
Modified:
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1354023&r1=1354022&r2=1354023&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
Tue Jun 26 14:05:04 2012
@@ -18,26 +18,29 @@
*/
package org.apache.qpid.management.jmx;
-import org.apache.commons.lang.StringUtils;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import org.apache.commons.lang.StringUtils;
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
{
@@ -56,30 +59,72 @@ public class ManagedConnectionMBeanTest
public void tearDown() throws Exception
{
- if (_jmxUtils != null)
+ try
+ {
+ if (_jmxUtils != null)
+ {
+ _jmxUtils.close();
+ }
+ }
+ finally
{
- _jmxUtils.close();
+ super.tearDown();
}
- super.tearDown();
}
- public void testTransactedSession() throws Exception
+ public void
testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws
Exception
{
+ assertEquals("Expected no managed connections", 0,
getManagedConnections().size());
+
_connection = getConnection();
- final Session session = _connection.createSession(true,
Session.SESSION_TRANSACTED);
+ assertEquals("Expected one managed connection", 1,
getManagedConnections().size());
- receiveTwoMessagesWithoutCommit(session);
+ _connection.close();
+ assertEquals("Expected no managed connections after client connection
closed", 0, getManagedConnections().size());
+ }
+
+ public void testGetAttributes() throws Exception
+ {
+ _connection = getConnection();
+ final ManagedConnection mBean = getConnectionMBean();
+
+ checkAuthorisedId(mBean);
+ checkClientVersion(mBean);
+ checkClientId(mBean);
+ }
+
+ public void testNonTransactedSession() throws Exception
+ {
+ _connection = getConnection();
+
+ boolean transactional = false;
+ boolean flowBlocked = false;
+
+ _connection.createSession(transactional, Session.AUTO_ACKNOWLEDGE);
final ManagedConnection mBean = getConnectionMBean();
+ final CompositeDataSupport row = getTheOneChannelRow(mBean);
+ assertChannelRowData(row, 0, transactional, flowBlocked);
+ }
+
+ public void testTransactedSessionWithUnackMessages() throws Exception
+ {
+ _connection = getConnection();
+ _connection.start();
+
+ boolean transactional = true;
+ int numberOfMessages = 2;
+ final Session session = _connection.createSession(transactional,
Session.SESSION_TRANSACTED);
+ final Destination destination =
session.createQueue(getTestQueueName());
+ final MessageConsumer consumer = session.createConsumer(destination);
+
+ sendMessage(session, destination, numberOfMessages);
+ receiveMessagesWithoutCommit(consumer, numberOfMessages);
- // check session details in the row
+ final ManagedConnection mBean = getConnectionMBean();
final CompositeDataSupport row = getTheOneChannelRow(mBean);
- final Number unackCount = (Number)
row.get(ManagedConnection.UNACKED_COUNT);
- final Boolean transactional = (Boolean)
row.get(ManagedConnection.TRANSACTIONAL);
- final Boolean flowBlocked = (Boolean)
row.get(ManagedConnection.FLOW_BLOCKED);
- assertEquals("Unexpected number of unacknowledged messages", 2,
unackCount);
- assertTrue("Unexpected transaction flag", transactional);
- assertFalse("Unexpected value of flow blocked flag", flowBlocked);
+ boolean flowBlocked = false;
+ assertChannelRowData(row, numberOfMessages, transactional,
flowBlocked);
// check that commit advances the lastIoTime
final Date initialLastIOTime = mBean.getLastIoTime();
@@ -92,25 +137,66 @@ public class ManagedConnectionMBeanTest
assertEquals("Unexpected number of unacknowledged messages", 0,
unackCountAfterCommit);
}
- public void
testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws
Exception
- {
- assertEquals("Expected no managed connections", 0,
getManagedConnections().size());
+ public void testProducerFlowBlocked() throws Exception
+ {
_connection = getConnection();
- assertEquals("Expected one managed connection", 1,
getManagedConnections().size());
+ _connection.start();
- _connection.close();
- assertEquals("Expected no managed connections after client connection
closed", 0, getManagedConnections().size());
+ String queueName = getTestQueueName();
+ Session session = _connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(queueName);
+ createQueueOnBroker(session, queue);
+
+ ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
+ managedQueue.setFlowResumeCapacity(DEFAULT_MESSAGE_SIZE * 2l);
+ managedQueue.setCapacity(DEFAULT_MESSAGE_SIZE * 3l);
+
+ final ManagedConnection managedConnection = getConnectionMBean();
+
+ // Check that producer flow is not block before test
+ final CompositeDataSupport rowBeforeSend =
getTheOneChannelRow(managedConnection);
+ assertFlowBlocked(rowBeforeSend, false);
+
+
+ // Check that producer flow does not become block too soon
+ sendMessage(session, queue, 3);
+ final CompositeDataSupport rowBeforeFull =
getTheOneChannelRow(managedConnection);
+ assertFlowBlocked(rowBeforeFull, false);
+
+ // Fourth message will over-fill the queue (but as we are not sending
more messages, client thread wont't block)
+ sendMessage(session, queue, 1);
+ final CompositeDataSupport rowAfterFull =
getTheOneChannelRow(managedConnection);
+ assertFlowBlocked(rowAfterFull, true);
+
+ // Consume two to bring the queue down to the resume capacity
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull("Could not receive first message",
consumer.receive(1000));
+ assertNotNull("Could not receive second message",
consumer.receive(1000));
+ session.commit();
+
+ // Check that producer flow is no longer blocked
+ final CompositeDataSupport rowAfterReceive =
getTheOneChannelRow(managedConnection);
+ assertFlowBlocked(rowAfterReceive, false);
}
- public void testGetAttributes() throws Exception
+ private void createQueueOnBroker(Session session, Destination destination)
throws JMSException
{
- _connection = getConnection();
- final ManagedConnection mBean = getConnectionMBean();
+ session.createConsumer(destination).close(); // Create a consumer only
to cause queue creation
+ }
- checkAuthorisedId(mBean);
- checkClientVersion(mBean);
- checkClientId(mBean);
+ private void assertChannelRowData(final CompositeData row, int
unacknowledgedMessages, boolean isTransactional, boolean flowBlocked)
+ {
+ assertNotNull(row);
+ assertEquals("Unexpected transactional flag", isTransactional,
row.get(ManagedConnection.TRANSACTIONAL));
+ assertEquals("Unexpected unacknowledged message count",
unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT));
+ assertEquals("Unexpected flow blocked", flowBlocked,
row.get(ManagedConnection.FLOW_BLOCKED));
+ }
+
+ private void assertFlowBlocked(final CompositeData row, boolean
flowBlocked)
+ {
+ assertNotNull(row);
+ assertEquals("Unexpected flow blocked", flowBlocked,
row.get(ManagedConnection.FLOW_BLOCKED));
}
private void checkAuthorisedId(ManagedConnection mBean) throws Exception
@@ -144,9 +230,15 @@ public class ManagedConnectionMBeanTest
return mBean;
}
- private CompositeDataSupport getTheOneChannelRow(final ManagedConnection
mBean) throws IOException, JMException
+ private List<ManagedConnection> getManagedConnections()
{
- TabularData channelsData = mBean.channels();
+ return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME);
+ }
+
+ private CompositeDataSupport getTheOneChannelRow(final ManagedConnection
mBean) throws Exception
+ {
+ TabularData channelsData = getChannelsDataWithRetry(mBean);
+
assertEquals("Unexpected number of rows in channel table", 1,
channelsData.size());
@SuppressWarnings("unchecked")
@@ -155,17 +247,8 @@ public class ManagedConnectionMBeanTest
return row;
}
- private void receiveTwoMessagesWithoutCommit(final Session session) throws
JMSException, Exception
+ private void receiveMessagesWithoutCommit(final MessageConsumer consumer,
int numberOfMessages) throws Exception
{
- final Destination destination =
session.createQueue(getTestQueueName());
- final MessageConsumer consumer = session.createConsumer(destination);
-
- // send two messages ...
- final int numberOfMessages = 2;
- sendMessage(session, destination, numberOfMessages);
- _connection.start();
-
- // receive both messages but don't commit
for (int i = 0; i < numberOfMessages; i++)
{
final Message m = consumer.receive(1000l);
@@ -173,9 +256,28 @@ public class ManagedConnectionMBeanTest
}
}
- private List<ManagedConnection> getManagedConnections()
+ private TabularData getChannelsDataWithRetry(final ManagedConnection mBean)
+ throws IOException, JMException
{
- return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME);
+ TabularData channelsData = mBean.channels();
+ int retries = 0;
+ while(channelsData.size() == 0 && retries < 5)
+ {
+ sleep();
+ channelsData = mBean.channels();
+ retries++;
+ }
+ return channelsData;
}
-}
+ private void sleep()
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]