Author: rgodfrey
Date: Sat Mar 17 11:15:44 2012
New Revision: 1301919
URL: http://svn.apache.org/viewvc?rev=1301919&view=rev
Log:
NO-JIRA : [Java Config] updated connection support to allow JMX Mbean to work
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/NoStatistics.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/StatisticsAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
Sat Mar 17 11:15:44 2012
@@ -22,16 +22,49 @@
package org.apache.qpid.server.jmx.mbeans;
import java.io.IOException;
+import java.util.Collection;
import java.util.Date;
import javax.management.JMException;
import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.Statistics;
public class ConnectionMBean extends
AbstractStatisticsGatheringMBean<Connection> implements ManagedConnection
{
+ private static final OpenType[] CHANNEL_ATTRIBUTE_TYPES =
+ { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING,
SimpleType.INTEGER, SimpleType.BOOLEAN };
+ private static final CompositeType CHANNEL_TYPE;
+ private static final TabularType CHANNELS_TYPE;
+
+ static
+ {
+ try
+ {
+ CHANNEL_TYPE = new CompositeType("Channel", "Channel Details",
COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
+
COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
+ CHANNEL_ATTRIBUTE_TYPES);
+ CHANNELS_TYPE = new TabularType("Channels", "Channels",
CHANNEL_TYPE, (String[]) TABULAR_UNIQUE_INDEX.toArray(new
String[TABULAR_UNIQUE_INDEX.size()]));
+ }
+ catch (JMException ex)
+ {
+ // This is not expected to ever occur.
+ throw new RuntimeException("Got JMException in static
initializer.", ex);
+ }
+ }
+
+
private final VirtualHostMBean _virtualHostMBean;
public ConnectionMBean(Connection conn, VirtualHostMBean virtualHostMBean)
throws JMException
@@ -74,7 +107,7 @@ public class ConnectionMBean extends Abs
public Date getLastIoTime()
{
- return null; // TODO - Implement
+ return null; // TODO - Implement connection getLastIoTime
}
public Long getMaximumNumberOfChannels()
@@ -84,28 +117,56 @@ public class ConnectionMBean extends Abs
public TabularData channels() throws IOException, JMException
{
- return null; // TODO - Implement
+ TabularDataSupport sessionTable = new
TabularDataSupport(CHANNELS_TYPE);
+ Collection<Session> list = getConfiguredObject().getSessions();
+
+ for (Session session : list)
+ {
+ Statistics statistics = session.getStatistics();
+ Long txnBegins = (Long)
statistics.getStatistic(Session.LOCAL_TRANSACTION_BEGINS);
+
+ boolean isTransactional = (txnBegins>0l);
+
+ int unacknowledgedSize = ((Number)
statistics.getStatistic(Session.UNACKNOWLEDGED_MESSAGES)).intValue();
+
+ boolean blocked = false; // TODO - implement query as to whether
session is blocked
+
+ Object[] itemValues =
+ {
+ (Integer) session.getAttribute(Session.CHANNEL_ID),
+ isTransactional,
+ null, // TODO - default queue (which is
meaningless)
+ unacknowledgedSize,
+ blocked
+ };
+
+ CompositeData sessionData = new CompositeDataSupport(CHANNEL_TYPE,
+
COMPOSITE_ITEM_NAMES_DESC.toArray(new
String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
+ sessionTable.put(sessionData);
+ }
+
+ return sessionTable;
}
public void commitTransactions(int channelId) throws JMException
{
- // TODO - Implement
+ // TODO - Commiting transaction on a channel makes *no* sense
}
public void rollbackTransactions(int channelId) throws JMException
{
- // TODO - Implement
+ // TODO - rollin back a transaction on a channel makes *no* sense
}
public void closeConnection() throws Exception
{
- // TODO - Implement
+ // TODO - Implement close connection
}
public void setStatisticsEnabled(boolean enabled)
{
- // TODO - Implement
+ // TODO - Implement setStatisticsEnables
updateStats();
}
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker-plugins/jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
Sat Mar 17 11:15:44 2012
@@ -140,7 +140,7 @@ public class QueueMBean extends AMQManag
private Number getStatisticValue(String name)
{
- final Number statistic = _queue.getStatistics().getStatistic(name);
+ final Number statistic = (Number)
_queue.getStatistics().getStatistic(name);
return statistic == null ? Integer.valueOf(0) : statistic;
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Sat Mar 17 11:15:44 2012
@@ -1396,6 +1396,11 @@ public class AMQChannel implements Sessi
return false;
}
+ public int getUnacknowledgedMessageCount()
+ {
+ return getUnacknowledgedMessageMap().size();
+ }
+
private void flow(boolean flow)
{
MethodRegistry methodRegistry = _session.getMethodRegistry();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Session.java
Sat Mar 17 11:15:44 2012
@@ -20,10 +20,43 @@
*/
package org.apache.qpid.server.model;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
public interface Session extends ConfiguredObject
{
+ // Statistics
+
+ public static final String BYTES_IN = "bytesIn";
+ public static final String BYTES_OUT = "bytesOut";
+ public static final String CONSUMER_COUNT =
"consumerCount";
+ public static final String LOCAL_TRANSACTION_BEGINS =
"localTransactionBegins";
+ public static final String LOCAL_TRANSACTION_OPEN =
"localTransactionOpen";
+ public static final String LOCAL_TRANSACTION_ROLLBACKS =
"localTransactionRollbacks";
+ public static final String STATE_CHANGED =
"stateChanged";
+ public static final String UNACKNOWLEDGED_BYTES =
"unacknowledgedBytes";
+ public static final String UNACKNOWLEDGED_MESSAGES =
"unacknowledgedMessages";
+ public static final String XA_TRANSACTION_BRANCH_ENDS =
"xaTransactionBranchEnds";
+ public static final String XA_TRANSACTION_BRANCH_STARTS =
"xaTransactionBranchStarts";
+ public static final String XA_TRANSACTION_BRANCH_SUSPENDS =
"xaTransactionBranchSuspends";
+
+ public static final Collection<String> AVAILABLE_STATISTICS =
+ Collections.unmodifiableCollection(Arrays.asList(BYTES_IN,
BYTES_OUT, CONSUMER_COUNT,
+
LOCAL_TRANSACTION_BEGINS,
+
LOCAL_TRANSACTION_OPEN,
+
LOCAL_TRANSACTION_ROLLBACKS, STATE_CHANGED,
+
UNACKNOWLEDGED_BYTES, UNACKNOWLEDGED_MESSAGES,
+
XA_TRANSACTION_BRANCH_ENDS, XA_TRANSACTION_BRANCH_STARTS,
+
XA_TRANSACTION_BRANCH_SUSPENDS));
+
+
+ public static final String CHANNEL_ID = "channelId";
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableCollection(Arrays.asList(CHANNEL_ID));
+
+
Collection<Subscription> getSubscriptions();
Collection<Publisher> getPublishers();
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
Sat Mar 17 11:15:44 2012
@@ -21,5 +21,5 @@ import java.util.Collection;
public interface Statistics
{
Collection<String> getStatisticNames();
- public Number getStatistic(String name);
+ public Object getStatistic(String name);
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/NoStatistics.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/NoStatistics.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/NoStatistics.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/NoStatistics.java
Sat Mar 17 11:15:44 2012
@@ -34,7 +34,7 @@ public class NoStatistics implements Sta
return Collections.emptyList();
}
- public Number getStatistic(String name)
+ public Object getStatistic(String name)
{
return null;
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
Sat Mar 17 11:15:44 2012
@@ -378,7 +378,7 @@ final class QueueAdapter extends Abstrac
return Queue.AVAILABLE_STATISTICS;
}
- public Number getStatistic(String name)
+ public Object getStatistic(String name)
{
if(BINDING_COUNT.equals(name))
{
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
Sat Mar 17 11:15:44 2012
@@ -21,7 +21,11 @@
package org.apache.qpid.server.model.adapter;
import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Session;
@@ -32,11 +36,16 @@ import org.apache.qpid.server.protocol.A
final class SessionAdapter extends AbstractAdapter implements Session
{
+ // Attributes
+
+
private AMQSessionModel _session;
+ private SessionStatistics _statistics;
public SessionAdapter(final AMQSessionModel session)
{
_session = session;
+ _statistics = new SessionStatistics();
}
public Collection<Subscription> getSubscriptions()
@@ -51,7 +60,7 @@ final class SessionAdapter extends Abstr
public String getName()
{
- return String.valueOf(_session.getID());
+ return String.valueOf(_session.getChannelId());
}
public String setName(final String currentName, final String desiredName)
@@ -98,8 +107,96 @@ final class SessionAdapter extends Abstr
return 0; //TODO
}
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ Collection<String> names = new
HashSet<String>(super.getAttributeNames());
+ names.addAll(AVAILABLE_ATTRIBUTES);
+
+ return Collections.unmodifiableCollection(names);
+ }
+
+ @Override
+ public Object getAttribute(String name)
+ {
+ if(name.equals(CHANNEL_ID))
+ {
+ return _session.getChannelId();
+ }
+ return super.getAttribute(name); //TODO - Implement
+ }
+
+ @Override
+ public Object setAttribute(String name, Object expected, Object desired)
+ throws IllegalStateException, AccessControlException,
IllegalArgumentException
+ {
+ return super.setAttribute(name, expected, desired); //TODO -
Implement
+ }
+
public Statistics getStatistics()
{
- return NoStatistics.getInstance();
+ return _statistics;
+ }
+
+ private class SessionStatistics implements Statistics
+ {
+
+ public SessionStatistics()
+ {
+ }
+
+ public Collection<String> getStatisticNames()
+ {
+ return AVAILABLE_STATISTICS;
+ }
+
+ public Object getStatistic(String name)
+ {
+ if(name.equals(BYTES_IN))
+ {
+ }
+ else if(name.equals(BYTES_OUT))
+ {
+ }
+ else if(name.equals(CONSUMER_COUNT))
+ {
+ return getSubscriptions().size();
+ }
+ else if(name.equals(LOCAL_TRANSACTION_BEGINS))
+ {
+ return _session.getTxnCount();
+ }
+ else if(name.equals(LOCAL_TRANSACTION_OPEN))
+ {
+ long open = _session.getTxnCount() - (_session.getTxnCommits()
+ _session.getTxnRejects());
+ return (Boolean) (open > 0l);
+ }
+ else if(name.equals(LOCAL_TRANSACTION_ROLLBACKS))
+ {
+ return _session.getTxnCommits();
+ }
+ else if(name.equals(STATE_CHANGED))
+ {
+ }
+ else if(name.equals(UNACKNOWLEDGED_BYTES))
+ {
+ }
+ else if(name.equals(UNACKNOWLEDGED_MESSAGES))
+ {
+ return _session.getUnacknowledgedMessageCount();
+ }
+ else if(name.equals(XA_TRANSACTION_BRANCH_ENDS))
+ {
+ }
+ else if(name.equals(XA_TRANSACTION_BRANCH_STARTS))
+ {
+ }
+ else if(name.equals(XA_TRANSACTION_BRANCH_SUSPENDS))
+ {
+
+ }
+
+ return null; // TODO - Implement
+ }
}
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/StatisticsAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/StatisticsAdapter.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/StatisticsAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/StatisticsAdapter.java
Sat Mar 17 11:15:44 2012
@@ -56,7 +56,7 @@ class StatisticsAdapter implements Stati
return STATISTIC_NAMES;
}
- public Number getStatistic(String name)
+ public Object getStatistic(String name)
{
StatisticsCounter counter = _statistics.get(name);
return counter == null ? null : counter.getTotal();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Sat Mar 17 11:15:44 2012
@@ -68,4 +68,12 @@ public interface AMQSessionModel extends
boolean onSameConnection(InboundMessage inbound);
+
+ int getUnacknowledgedMessageCount();
+
+ Long getTxnCount();
+ Long getTxnCommits();
+ Long getTxnRejects();
+
+ int getChannelId();
}
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1301919&r1=1301918&r2=1301919&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Sat Mar 17 11:15:44 2012
@@ -620,6 +620,11 @@ public class ServerSession extends Sessi
return _txnRejects.get();
}
+ public int getChannelId()
+ {
+ return getChannel();
+ }
+
public Long getTxnCount()
{
return _txnCount.get();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]