Author: jstrachan
Date: Mon Dec 19 04:22:09 2005
New Revision: 357683
URL: http://svn.apache.org/viewcvs?rev=357683&view=rev
Log:
added enqueue/dequeue statistics at the Connection/Connector level in JMX
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
(with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/DestinationStatistics.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Command.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/KeepAliveInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/MessageAck.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/WireFormatInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/jndi/ActiveMQInitialContextFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/management/JMSConnectionStatsImpl.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
Mon Dec 19 04:22:09 2005
@@ -26,6 +26,7 @@
import java.util.List;
import org.activemq.Service;
+import org.activemq.broker.region.ConnectionStatistics;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.BrokerInfo;
import org.activemq.command.Command;
@@ -78,11 +79,13 @@
protected final TaskRunner taskRunner;
protected final Connector connector;
protected boolean demandForwardingBridge;
+ private ConnectionStatistics statistics = new ConnectionStatistics();
protected final ConcurrentHashMap connectionStates = new
ConcurrentHashMap();
private WireFormatInfo wireFormatInfo;
protected boolean disposed=false;
+
static class ConnectionState extends org.activemq.state.ConnectionState {
private final ConnectionContext context;
@@ -108,11 +111,15 @@
this.connector = connector;
this.broker = broker;
+ this.statistics.setParent(connector.getStatistics());
- if( taskRunnerFactory != null )
+ if( taskRunnerFactory != null ) {
taskRunner = taskRunnerFactory.createTaskRunner( this );
- else
- taskRunner = null;
+ }
+ else {
+ taskRunner = null;
+ }
+
}
/**
@@ -560,5 +567,12 @@
abstract protected void dispatch(Command command);
+
+ /**
+ * Returns the statistics for this connection
+ */
+ public ConnectionStatistics getStatistics() {
+ return statistics;
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connection.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connection.java
Mon Dec 19 04:22:09 2005
@@ -19,6 +19,7 @@
package org.activemq.broker;
import org.activemq.Service;
+import org.activemq.broker.region.ConnectionStatistics;
import org.activemq.command.Command;
import org.activemq.command.Response;
@@ -86,4 +87,10 @@
* Returns the number of messages to be dispatched to this connection
*/
public int getDispatchQueueSize();
+
+ /**
+ * Returns the statistics for this connection
+ */
+ public ConnectionStatistics getStatistics();
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connector.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/Connector.java
Mon Dec 19 04:22:09 2005
@@ -1,37 +1,42 @@
/**
-* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
-*
-* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-**/
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activemq.broker;
import org.activemq.Service;
+import org.activemq.broker.region.ConnectorStatistics;
import org.activemq.command.BrokerInfo;
/**
- * A connector creates and manages client connections that talk to the Broker.
+ * A connector creates and manages client connections that talk to the Broker.
*
* @version $Revision: 1.3 $
*/
public interface Connector extends Service {
-
+
/**
*
* @return
*/
public BrokerInfo getBrokerInfo();
-
+
+ /**
+ * @return the statistics for this connector
+ */
+ public ConnectorStatistics getStatistics();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
Mon Dec 19 04:22:09 2005
@@ -66,7 +66,7 @@
serviceTransportException(exception);
}
});
- connected =true;
+ connected = true;
}
public void start() throws Exception {
@@ -81,6 +81,7 @@
super.stop();
}
+
/**
* @return Returns the blockedCandidate.
*/
@@ -186,6 +187,7 @@
try{
setMarkedCandidate(true);
transport.oneway(command);
+ getStatistics().onCommand(command);
}catch(IOException e){
serviceException(e);
}finally{
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
Mon Dec 19 04:22:09 2005
@@ -27,6 +27,7 @@
import javax.management.ObjectName;
import org.activemq.broker.jmx.ManagedTransportConnector;
+import org.activemq.broker.region.ConnectorStatistics;
import org.activemq.command.BrokerInfo;
import org.activemq.command.ConnectionInfo;
import org.activemq.thread.TaskRunnerFactory;
@@ -60,8 +61,8 @@
protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
protected TransportStatusDetector statusDector;
private DiscoveryAgent discoveryAgent;
+ private ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri;
-
private URI connectUri;
/**
@@ -162,6 +163,13 @@
public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
this.taskRunnerFactory = taskRunnerFactory;
+ }
+
+ /**
+ * @return the statistics for this connector
+ */
+ public ConnectorStatistics getStatistics() {
+ return statistics;
}
public void start() throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionView.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionView.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionView.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionView.java
Mon Dec 19 04:22:09 2005
@@ -72,4 +72,31 @@
public int getDispatchQueueSize() {
return connection.getDispatchQueueSize();
}
+
+ /**
+ * Resets the statistics
+ */
+ public void resetStatistics() {
+ connection.getStatistics().reset();
+ }
+
+ /**
+ * Returns the number of messages enqueued on this connection
+ *
+ * @return the number of messages enqueued on this connection
+ */
+ public long getEnqueueCount() {
+ return connection.getStatistics().getEnqueues().getCount();
+
+ }
+
+ /**
+ * Returns the number of messages dequeued on this connection
+ *
+ * @return the number of messages dequeued on this connection
+ */
+ public long getDequeueCount() {
+ return connection.getStatistics().getDequeues().getCount();
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionViewMBean.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionViewMBean.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionViewMBean.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectionViewMBean.java
Mon Dec 19 04:22:09 2005
@@ -45,4 +45,24 @@
* Returns the number of messages to be dispatched to this connection
*/
public int getDispatchQueueSize();
+
+ /**
+ * Resets the statistics
+ */
+ public void resetStatistics();
+
+ /**
+ * Returns the number of messages enqueued on this connection
+ *
+ * @return the number of messages enqueued on this connection
+ */
+ public long getEnqueueCount();
+
+ /**
+ * Returns the number of messages dequeued on this connection
+ *
+ * @return the number of messages dequeued on this connection
+ */
+ public long getDequeueCount();
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorView.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorView.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorView.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorView.java
Mon Dec 19 04:22:09 2005
@@ -90,5 +90,31 @@
}
return redeliveryPolicy;
}
+
+ /**
+ * Resets the statistics
+ */
+ public void resetStatistics() {
+ connector.getStatistics().reset();
+ }
+
+ /**
+ * Returns the number of messages enqueued on this connector
+ *
+ * @return the number of messages enqueued on this connector
+ */
+ public long getEnqueueCount() {
+ return connector.getStatistics().getEnqueues().getCount();
+
+ }
+
+ /**
+ * Returns the number of messages dequeued on this connector
+ *
+ * @return the number of messages dequeued on this connector
+ */
+ public long getDequeueCount() {
+ return connector.getStatistics().getDequeues().getCount();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorViewMBean.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorViewMBean.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorViewMBean.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/jmx/ConnectorViewMBean.java
Mon Dec 19 04:22:09 2005
@@ -19,8 +19,6 @@
package org.activemq.broker.jmx;
import org.activemq.Service;
-import org.activemq.command.BrokerInfo;
-import org.activemq.command.RedeliveryPolicy;
public interface ConnectorViewMBean extends Service {
@@ -39,5 +37,24 @@
public void setMaximumRedeliveries(int maximumRedeliveries);
public void setUseExponentialBackOff(boolean useExponentialBackOff);
+
+ /**
+ * Resets the statistics
+ */
+ public void resetStatistics();
+
+ /**
+ * Returns the number of messages enqueued on this connector
+ *
+ * @return the number of messages enqueued on this connector
+ */
+ public long getEnqueueCount();
+
+ /**
+ * Returns the number of messages dequeued on this connector
+ *
+ * @return the number of messages dequeued on this connector
+ */
+ public long getDequeueCount();
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java?rev=357683&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
Mon Dec 19 04:22:09 2005
@@ -0,0 +1,83 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+
+package org.activemq.broker.region;
+
+import org.activemq.command.Command;
+import org.activemq.command.Message;
+import org.activemq.management.CountStatisticImpl;
+import org.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for the Connection.
+ *
+ * @version $Revision$
+ */
+public class ConnectionStatistics extends StatsImpl {
+
+ private CountStatisticImpl enqueues;
+ private CountStatisticImpl dequeues;
+
+ public ConnectionStatistics() {
+
+ enqueues = new CountStatisticImpl("enqueues", "The number of messages
that have been sent to the connection");
+ dequeues = new CountStatisticImpl("dequeues", "The number of messages
that have been dispatched from the connection");
+
+ addStatistic("enqueues", enqueues);
+ addStatistic("dequeues", dequeues);
+ }
+
+ public CountStatisticImpl getEnqueues() {
+ return enqueues;
+ }
+
+ public CountStatisticImpl getDequeues() {
+ return dequeues;
+ }
+
+ public void reset() {
+ super.reset();
+ enqueues.reset();
+ dequeues.reset();
+ }
+
+ public void setParent(ConnectorStatistics parent) {
+ if (parent != null) {
+ enqueues.setParent(parent.getEnqueues());
+ dequeues.setParent(parent.getDequeues());
+ }
+ else {
+ enqueues.setParent(null);
+ dequeues.setParent(null);
+ }
+ }
+
+ /**
+ * Updates the statistics as a command is dispatched into the connection
+ */
+ public void onCommand(Command command) {
+ if (command.isMessageDispatch()) {
+ enqueues.increment();
+ }
+ }
+
+ public void onMessageDequeue(Message message) {
+ dequeues.increment();
+ }
+}
\ No newline at end of file
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectionStatistics.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java?rev=357683&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
Mon Dec 19 04:22:09 2005
@@ -0,0 +1,94 @@
+/**
+* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+*
+* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+**/
+
+package org.activemq.broker.region;
+import org.activemq.management.CountStatisticImpl;
+import org.activemq.management.PollCountStatisticImpl;
+import org.activemq.management.StatsImpl;
+
+/**
+ * The J2EE Statistics for the a Destination.
+ *
+ * @version $Revision$
+ */
+public class ConnectorStatistics extends StatsImpl {
+
+ protected CountStatisticImpl enqueues;
+ protected CountStatisticImpl dequeues;
+ protected CountStatisticImpl consumers;
+ protected CountStatisticImpl messages;
+ protected PollCountStatisticImpl messagesCached;
+
+ public ConnectorStatistics() {
+
+ enqueues = new CountStatisticImpl("enqueues", "The number of messages
that have been sent to the destination");
+ dequeues = new CountStatisticImpl("dequeues", "The number of messages
that have been dispatched from the destination");
+ consumers = new CountStatisticImpl("consumers", "The number of
consumers that that are subscribing to messages from the destination");
+ messages = new CountStatisticImpl("messages", "The number of messages
that that are being held by the destination");
+ messagesCached = new PollCountStatisticImpl("messagesCached", "The
number of messages that are held in the destination's memory cache");
+
+ addStatistic("enqueues", enqueues);
+ addStatistic("dequeues", dequeues);
+ addStatistic("consumers", consumers);
+ addStatistic("messages", messages);
+ addStatistic("messagesCached", messagesCached);
+ }
+
+ public CountStatisticImpl getEnqueues() {
+ return enqueues;
+ }
+ public CountStatisticImpl getDequeues() {
+ return dequeues;
+ }
+ public CountStatisticImpl getConsumers() {
+ return consumers;
+ }
+ public PollCountStatisticImpl getMessagesCached() {
+ return messagesCached;
+ }
+ public CountStatisticImpl getMessages() {
+ return messages;
+ }
+
+ public void reset() {
+ super.reset();
+ enqueues.reset();
+ dequeues.reset();
+ }
+
+ public void setParent(ConnectorStatistics parent) {
+ if( parent!=null ) {
+ enqueues.setParent(parent.enqueues);
+ dequeues.setParent(parent.dequeues);
+ consumers.setParent(parent.consumers);
+ messagesCached.setParent(parent.messagesCached);
+ messages.setParent(parent.messages);
+ } else {
+ enqueues.setParent(null);
+ dequeues.setParent(null);
+ consumers.setParent(null);
+ messagesCached.setParent(null);
+ messages.setParent(null);
+ }
+ }
+
+ public void setMessagesCached(PollCountStatisticImpl messagesCached) {
+ this.messagesCached = messagesCached;
+ }
+}
\ No newline at end of file
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/ConnectorStatistics.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/DestinationStatistics.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/DestinationStatistics.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/DestinationStatistics.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/DestinationStatistics.java
Mon Dec 19 04:22:09 2005
@@ -1,23 +1,25 @@
/**
-* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
-*
-* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-**/
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activemq.broker.region;
+
+import org.activemq.command.Message;
import org.activemq.management.CountStatisticImpl;
import org.activemq.management.PollCountStatisticImpl;
import org.activemq.management.StatsImpl;
@@ -28,7 +30,7 @@
* @version $Revision$
*/
public class DestinationStatistics extends StatsImpl {
-
+
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl consumers;
@@ -36,32 +38,36 @@
protected PollCountStatisticImpl messagesCached;
public DestinationStatistics() {
-
+
enqueues = new CountStatisticImpl("enqueues", "The number of messages
that have been sent to the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages
that have been dispatched from the destination");
consumers = new CountStatisticImpl("consumers", "The number of
consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages
that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The
number of messages that are held in the destination's memory cache");
-
+
addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
}
-
+
public CountStatisticImpl getEnqueues() {
return enqueues;
}
+
public CountStatisticImpl getDequeues() {
return dequeues;
}
+
public CountStatisticImpl getConsumers() {
return consumers;
}
+
public PollCountStatisticImpl getMessagesCached() {
return messagesCached;
}
+
public CountStatisticImpl getMessages() {
return messages;
}
@@ -71,15 +77,16 @@
enqueues.reset();
dequeues.reset();
}
-
+
public void setParent(DestinationStatistics parent) {
- if( parent!=null ) {
+ if (parent != null) {
enqueues.setParent(parent.enqueues);
dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
- } else {
+ }
+ else {
enqueues.setParent(null);
dequeues.setParent(null);
consumers.setParent(null);
@@ -90,5 +97,17 @@
public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
+ }
+
+ /**
+ * Called when a message is enqueued to update the statistics.
+ */
+ public void onMessageEnqueue(Message message) {
+ getEnqueues().increment();
+ getMessages().increment();
+ }
+
+ public void onMessageDequeue(Message message) {
+ getDequeues().increment();
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/PrefetchSubscription.java
Mon Dec 19 04:22:09 2005
@@ -277,7 +277,8 @@
node.decrementReferenceCount();
if( node.getRegionDestination() !=null ) {
-
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+ context.getConnection().getStatistics().onMessageDequeue(message);
if( wasFull && !isFull() ) {
try {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Queue.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Queue.java
Mon Dec 19 04:22:09 2005
@@ -344,8 +344,7 @@
dispatchValve.increment();
MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
- destinationStatistics.getEnqueues().increment();
- destinationStatistics.getMessages().increment();
+ destinationStatistics.onMessageEnqueue(message);
synchronized (messages) {
messages.add(node);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
Mon Dec 19 04:22:09 2005
@@ -60,6 +60,10 @@
public boolean isMarshallAware() {
return false;
}
+
+ public boolean isMessageAck() {
+ return false;
+ }
/**
* @openwire:property version=1
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Command.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Command.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Command.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Command.java
Mon Dec 19 04:22:09 2005
@@ -43,6 +43,7 @@
boolean isBrokerInfo();
boolean isWireFormatInfo();
boolean isMessage();
+ boolean isMessageAck();
Response visit( CommandVisitor visitor) throws Throwable;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/KeepAliveInfo.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/KeepAliveInfo.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/KeepAliveInfo.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/KeepAliveInfo.java
Mon Dec 19 04:22:09 2005
@@ -58,6 +58,10 @@
return false;
}
+ public boolean isMessageAck() {
+ return false;
+ }
+
public boolean isBrokerInfo() {
return false;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/MessageAck.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/MessageAck.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/MessageAck.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/MessageAck.java
Mon Dec 19 04:22:09 2005
@@ -84,6 +84,10 @@
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
+
+ public boolean isMessageAck() {
+ return true;
+ }
public boolean isPoisonAck() {
return ackType==POSION_ACK_TYPE;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/WireFormatInfo.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/WireFormatInfo.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/WireFormatInfo.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/WireFormatInfo.java
Mon Dec 19 04:22:09 2005
@@ -1,50 +1,50 @@
/**
-* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
-*
-* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*
-**/
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
package org.activemq.command;
-import java.util.Arrays;
-
import org.activemq.state.CommandVisitor;
+import java.util.Arrays;
+
/**
*
* @openwire:marshaller
* @version $Revision$
*/
public class WireFormatInfo implements Command {
-
- public static final byte DATA_STRUCTURE_TYPE=CommandTypes.WIREFORMAT_INFO;
- static final private byte MAGIC[] = new
byte[]{'A','c','t','i','v','e','M','Q'};
-
- static final public long STACK_TRACE_MASK = 0x00000001;
- static final public long TCP_NO_DELAY_MASK = 0x00000002;
- static final public long CACHE_MASK = 0x00000004;
- static final public long COMPRESSION_MASK = 0x00000008;
+
+ public static final byte DATA_STRUCTURE_TYPE =
CommandTypes.WIREFORMAT_INFO;
+ static final private byte MAGIC[] = new byte[] { 'A', 'c', 't', 'i', 'v',
'e', 'M', 'Q' };
+
+ static final public long STACK_TRACE_MASK = 0x00000001;
+ static final public long TCP_NO_DELAY_MASK = 0x00000002;
+ static final public long CACHE_MASK = 0x00000004;
+ static final public long COMPRESSION_MASK = 0x00000008;
protected int version;
protected byte magic[] = MAGIC;
protected int options;
-
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
-
+
public boolean isWireFormatInfo() {
return true;
}
@@ -55,6 +55,7 @@
public byte[] getMagic() {
return magic;
}
+
public void setMagic(byte[] magic) {
this.magic = magic;
}
@@ -71,7 +72,7 @@
}
public boolean isValid() {
- return magic!=null && Arrays.equals(magic, MAGIC);
+ return magic != null && Arrays.equals(magic, MAGIC);
}
public void setCommandId(short value) {
@@ -92,20 +93,24 @@
public boolean isBrokerInfo() {
return false;
}
-
+
public boolean isMessageDispatch() {
return false;
}
+
public boolean isMessage() {
return false;
}
+ public boolean isMessageAck() {
+ return false;
+ }
public void setResponseRequired(boolean responseRequired) {
}
-
+
public String toString() {
- return "WireFormatInfo {version="+version+"}";
+ return "WireFormatInfo {version=" + version + "}";
}
/**
@@ -118,47 +123,52 @@
public void setOptions(int options) {
this.options = options;
}
-
-
+
public boolean isStackTraceEnabled() {
- return (options & STACK_TRACE_MASK)!=0;
- }
+ return (options & STACK_TRACE_MASK) != 0;
+ }
+
public void setStackTraceEnabled(boolean enable) {
- if( enable ) {
+ if (enable) {
options |= STACK_TRACE_MASK;
- } else {
+ }
+ else {
options &= ~STACK_TRACE_MASK;
}
}
-
+
public boolean isTcpNoDelayEnabled() {
- return (options & TCP_NO_DELAY_MASK)!=0;
- }
+ return (options & TCP_NO_DELAY_MASK) != 0;
+ }
+
public void setTcpNoDelayEnabled(boolean enable) {
- if( enable ) {
+ if (enable) {
options |= TCP_NO_DELAY_MASK;
- } else {
+ }
+ else {
options &= ~TCP_NO_DELAY_MASK;
}
}
-
+
public boolean isCacheEnabled() {
- return (options & CACHE_MASK)!=0;
- }
+ return (options & CACHE_MASK) != 0;
+ }
+
public void setCacheEnabled(boolean enable) {
- if( enable ) {
+ if (enable) {
options |= CACHE_MASK;
- } else {
+ }
+ else {
options &= ~CACHE_MASK;
}
}
public Response visit(CommandVisitor visitor) throws Throwable {
- return visitor.processWireFormat( this );
+ return visitor.processWireFormat(this);
}
public boolean isMarshallAware() {
return false;
}
-
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/jndi/ActiveMQInitialContextFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/jndi/ActiveMQInitialContextFactory.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/jndi/ActiveMQInitialContextFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/jndi/ActiveMQInitialContextFactory.java
Mon Dec 19 04:22:09 2005
@@ -62,8 +62,6 @@
public Context getInitialContext(Hashtable environment) throws
NamingException {
// lets create a factory
Map data = new ConcurrentHashMap();
- Broker broker = null;
-
String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++) {
ActiveMQConnectionFactory factory =null;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/management/JMSConnectionStatsImpl.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/management/JMSConnectionStatsImpl.java?rev=357683&r1=357682&r2=357683&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/management/JMSConnectionStatsImpl.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/management/JMSConnectionStatsImpl.java
Mon Dec 19 04:22:09 2005
@@ -18,11 +18,10 @@
**/
package org.activemq.management;
-import java.util.List;
-
import org.activemq.ActiveMQSession;
import org.activemq.util.IndentPrinter;
-import javax.management.j2ee.statistics.Statistic;
+
+import java.util.List;
/**
* Statistics for a JMS connection
*