Author: jlim
Date: Thu Jun 8 22:18:11 2006
New Revision: 412933
URL: http://svn.apache.org/viewvc?rev=412933&view=rev
Log:
systest to check for memory leak
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/pom.xml
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/readme.txt
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/MemtestMojo.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/JMSMemtest.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemConsumer.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemMessageIdList.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemProducer.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemoryMonitoringTool.java
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/ReportGenerator.java
Added: incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/pom.xml?rev=412933&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/pom.xml
(added)
+++ incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/pom.xml Thu
Jun 8 22:18:11 2006
@@ -0,0 +1,66 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>incubator-activemq</groupId>
+ <artifactId>activemq-parent</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
+ <groupId>incubator-activemq</groupId>
+ <artifactId>maven-activemq-memtest-plugin</artifactId>
+ <packaging>maven-plugin</packaging>
+ <name>ActiveMQ :: Memory Usage Test Plugin</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>incubator-activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>incubator-activemq</groupId>
+ <artifactId>activemq-console</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>incubator-activemq</groupId>
+ <artifactId>activeio-core</artifactId>
+ <version>3.0-beta2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derbynet</artifactId>
+ <version>10.1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>backport-util-concurrent</groupId>
+ <artifactId>backport-util-concurrent</artifactId>
+ <version>2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-j2ee-management_1.0_spec</artifactId>
+ <version>1.0</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
Added: incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/readme.txt
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/readme.txt?rev=412933&view=auto
==============================================================================
--- incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/readme.txt
(added)
+++ incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/readme.txt
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,99 @@
+####################################################################################################
+# Running Maven 2 Memory usage Test
+####################################################################################################
+
+Goal | Description
+--------------------------|----------------------------------------------------------
+ activemq-memtest:memtest | Starts the broker, producer, consumer and the
memory monitoring thread all in the same VM and
+ | generate the heap and non-heap memory usage of the
jvm.
+ | The plugin is included by default in the
\activemq-perf module.
+ |
+ | Parameters :
+ |
+ | 1. -DmessageCount - specifies number of
messages to send/receive
+ | - default value : 100000
+ |
+ | 2. -Dtopic - specifies domain type. Valid
value is true or false
+ | - default value : true
+ |
+ | 3. -Ddurable - specifies delivery mode: Valid
value is true or false
+ | - default value : false
+ |
+ | 4. -DconnectionCheckpointSize - specifies size
of messages sent in KB before we close and
+ | start the
producer/consumer to see if there is a memory
+ | leak using
different connections.
+ | - a value of -1
indicates that no checkpoint is set and will
+ | send/consume
messages using one producer/consumer conneciton
+ | - default value :
-1
+ |
+ | 5. -DmessageSize - specifies the message size
in bytes
+ | - default value : 10240 (10KB)
+ |
+ | 6. -DcheckpointInterval - specifies the
interval in seconds on which the monitoring tool
+ | will get the memory
usage of test run.
+ | - default value : 2
(seconds)
+ |
+ | 7. -DprefetchSize - specifies the prefetch size
to be used
+ | - a value of -1 will
indicates that test will use the default prefetch
+ | size (32000)
+ | - default value : -1
+ |
+ | 8. -Durl - species the broker url to use if not
going to be using the embedded broker
+ | - default value : null
+ |
+ | 9. -DreportName - specifies the name of the
output xml file.
+ | - default value :
activemq-memory-usage-report
+ |
+ | 10. -DreportDirectory - specifies the directory
of the output file
+ | - default value :
${project.build.directory}/test-memtest
+ |
+ | 11. -DproducerCount - specifies the number of
producers
+ | - default value : 1
+ |
+ | 12. -DconsumerCount - specifies the number of
consumers
+ | - default value : 1
+
+-----------------------------------------------------------------------------------------------
+|Memory Usage Test sample output
+|-----------------------------------------------------------------------------------------------
+|<test-report>
+| <test-information>
+| <os-name>Windows XP</os-name>
+| <java-version>1.5.0_05</java-version>
+| <jvm_memory_settings>
+| <heap_memory>
+| <committed>9502720</committed>
+| <max>66650112</max>
+| </heap_memory>
+| <non_heap_memory>
+| <committed>30736384</committed>
+| <max>121634816</max>
+| </non_heap_memory>
+| </jvm_memory_settings>
+| <test-settings>
+| <durable>non-durable</durable>
+| <message_size>10240</message_size>
+| <destination_name>FOO.BAR</destination_name>
+| <connection_checkpoint_size>-1</connection_checkpoint_size>
+| <consumer_count>1</consumer_count>
+| <report_name>activemq-memory-usage-report</report_name>
+| <prefetchSize>-1</prefetchSize>
+| <domain>topic</domain>
+| <producer_count>1</producer_count>
+| <connection_checkpoint_size_kb>-1</connection_checkpoint_size_kb>
+| <message_count>100000</message_count>
+|
<report_directory>C:\Projects\logicblaze\activemq\activemq-perftest\target/test-memtest</report_directory>
+| </test-settings>
+| </test-information>
+| <test-result checkpoint_interval_in_sec=5 >
+| <memory_usage index=0 non_heap_mb=21 non_heap_bytes=22963904 heap_mb=6
heap_bytes=7275808/>
+| <memory_usage index=1 non_heap_mb=23 non_heap_bytes=24598560 heap_mb=11
heap_bytes=12474400/>
+| ....
+| ....
+| </test-result>
+|</test-report>
+|
+-------------------------------------------------------------------------------------------------
+
+
+
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/MemtestMojo.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/MemtestMojo.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/MemtestMojo.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/maven/MemtestMojo.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,145 @@
+package org.apache.activemq.maven;
+
+/*
+ * Copyright 2001-2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.activemq.tool.JMSMemtest;
+
+import javax.jms.JMSException;
+
+
+/**
+ * Goal which does a memory usage test to check for any memory leak
+ *
+ * @goal memtest
+ * @phase process-sources
+ */
+public class MemtestMojo
+ extends AbstractMojo {
+
+ /**
+ * @parameter expression="${url}
+ *
+ */
+ private String url;
+
+ /**
+ * @parameter expression="${topic}" default-value="true"
+ * @required
+ */
+ private String topic;
+
+ /**
+ * @parameter expression="${connectionCheckpointSize}" default-value="-1"
+ * @required
+ */
+ private String connectionCheckpointSize;
+
+ /**
+ * @parameter expression="${durable}" default-value="false"
+ * @required
+ */
+ private String durable;
+
+ /**
+ * @parameter expression="${producerCount}" default-value="1"
+ * @required
+ */
+ private String producerCount;
+
+ /**
+ * @parameter expression="${prefetchSize}" default-value="-1"
+ * @required
+ */
+ private String prefetchSize;
+
+
+ /**
+ * @parameter expression="${consumerCount}" default-value="1"
+ * @required
+ */
+ private String consumerCount;
+
+ /**
+ * @parameter expression="${messageCount}" default-value="100000"
+ * @required
+ */
+ private String messageCount;
+
+ /**
+ * @parameter expression="${messageSize}" default-value="10240"
+ * @required
+ */
+ private String messageSize;
+
+ /**
+ * @parameter expression="${checkpointInterval}" default-value="2"
+ * @required
+ */
+ private String checkpointInterval;
+
+ /**
+ * @parameter expression="${destinationName}" default-value="FOO.BAR"
+ * @required
+ */
+ private String destinationName;
+
+ /**
+ * @parameter expression="${reportName}"
default-value="activemq-memory-usage-report"
+ * @required
+ */
+ private String reportName;
+
+ /**
+ * @parameter expression="${reportDirectory}"
default-value="${project.build.directory}/test-memtest"
+ * @required
+ */
+ private String reportDirectory;
+
+
+
+ public void execute()
+ throws MojoExecutionException {
+
+ JMSMemtest.main(createArgument());
+ }
+
+
+
+ public String[] createArgument() {
+
+
+ String[] options = {
+ "url=" + url,
+ "topic=" + topic,
+ "durable=" + durable,
+ "connectionCheckpointSize=" + connectionCheckpointSize,
+ "producerCount=" + producerCount,
+ "consumerCount=" + consumerCount,
+ "messageCount=" + messageCount,
+ "messageSize=" + messageSize,
+ "checkpointInterval=" + checkpointInterval,
+ "destinationName=" + destinationName,
+ "reportName=" + reportName,
+ "prefetchSize=" + prefetchSize,
+ "reportDirectory=" + reportDirectory,
+ };
+ return options;
+ }
+}
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/JMSMemtest.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/JMSMemtest.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/JMSMemtest.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/JMSMemtest.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,307 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.tool;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.tool.MemProducer;
+import org.apache.activemq.tool.MemConsumer;
+import org.apache.activemq.tool.MemoryMonitoringTool;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.*;
+
+
+import java.util.Properties;
+
+
+public class JMSMemtest {
+
+ private static final Log log = LogFactory.getLog(JMSMemtest.class);
+ private static final int DEFAULT_MESSAGECOUNT = 5000;
+ protected BrokerService broker;
+ protected boolean topic = true;
+ protected boolean durable = false;
+
+ protected long messageCount = 0;
+
+ // how large the message in kb before we close/start the
producer/consumer with a new connection. -1 means no connectionCheckpointSize
+ protected int connectionCheckpointSize;
+ protected long connectionInterval;
+
+
+ protected int consumerCount;
+ protected int producerCount;
+ protected int checkpointInterval;
+ protected int prefetchSize;
+ //set 10 kb of payload as default
+ protected int messageSize;
+
+ protected String reportDirectory;
+ protected String reportName;
+
+
+ protected String url = "";
+ protected MemProducer[] producers;
+ protected MemConsumer[] consumers;
+ protected String destinationName;
+ protected boolean allMessagesConsumed = true;
+ protected MemConsumer allMessagesList = new MemConsumer();
+
+ protected Message payload;
+
+ protected ActiveMQConnectionFactory connectionFactory;
+ protected Connection connection;
+ protected Destination destination;
+
+
+ protected boolean createConnectionPerClient = true;
+
+ protected boolean transacted = false;
+ protected boolean useEmbeddedBroker = true;
+ protected MemoryMonitoringTool memoryMonitoringTool;
+
+
+ public static void main(String[] args) {
+
+
+ Properties sysSettings = new Properties();
+
+ for (int i = 0; i < args.length; i++) {
+
+ int index = args[i].indexOf("=");
+ String key = args[i].substring(0, index);
+ String val = args[i].substring(index + 1);
+ sysSettings.setProperty(key, val);
+
+ }
+
+
+ JMSMemtest memtest = new JMSMemtest(sysSettings);
+ try {
+ memtest.start();
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ }
+
+ }
+
+
+ public JMSMemtest(Properties settings) {
+ url = settings.getProperty("url");
+ topic = new Boolean(settings.getProperty("topic")).booleanValue();
+ durable = new Boolean(settings.getProperty("durable")).booleanValue();
+ connectionCheckpointSize = new
Integer(settings.getProperty("connectionCheckpointSize")).intValue();
+ producerCount = new
Integer(settings.getProperty("producerCount")).intValue();
+ consumerCount = new
Integer(settings.getProperty("consumerCount")).intValue();
+ messageCount = new
Integer(settings.getProperty("messageCount")).intValue();
+ messageSize = new
Integer(settings.getProperty("messageSize")).intValue();
+ prefetchSize = new
Integer(settings.getProperty("prefetchSize")).intValue();
+ checkpointInterval = new
Integer(settings.getProperty("checkpointInterval")).intValue() * 1000;
+ producerCount = new
Integer(settings.getProperty("producerCount")).intValue();
+ reportName = settings.getProperty("reportName");
+ destinationName = settings.getProperty("destinationName");
+ reportDirectory = settings.getProperty("reportDirectory");
+ connectionInterval = connectionCheckpointSize * 1024;
+ }
+
+ protected void start() throws Exception {
+ log.info("Starting Monitor");
+ memoryMonitoringTool = new MemoryMonitoringTool();
+ memoryMonitoringTool.setTestSettings(getSysTestSettings());
+ Thread monitorThread = memoryMonitoringTool.startMonitor();
+
+ if (messageCount == 0) {
+ messageCount = DEFAULT_MESSAGECOUNT;
+ }
+
+
+ if (useEmbeddedBroker) {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ }
+
+
+ connectionFactory = (ActiveMQConnectionFactory)
createConnectionFactory();
+ if (prefetchSize > 0) {
+
connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
+
connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
+ }
+
+ connection = connectionFactory.createConnection();
+ Session session = connection.createSession(transacted,
Session.AUTO_ACKNOWLEDGE);
+
+ if (topic) {
+ destination = session.createTopic(destinationName);
+ } else {
+ destination = session.createQueue(destinationName);
+ }
+
+ createPayload(session);
+
+ publishAndConsume();
+
+ log.info("Closing resources");
+ this.close();
+
+ monitorThread.join();
+
+
+ }
+
+
+ protected boolean resetConnection(int counter) {
+ if (connectionInterval > 0) {
+ long totalMsgSizeConsumed = counter * 1024;
+ if (connectionInterval < totalMsgSizeConsumed) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected void publishAndConsume() throws Exception {
+
+ createConsumers();
+ createProducers();
+ int counter = 0;
+ boolean resetCon = false;
+ log.info("Start sending messages ");
+ for (int i = 0; i < messageCount; i++) {
+ if (resetCon == true) {
+ closeConsumers();
+ closeProducers();
+ createConsumers();
+ createProducers();
+ resetCon = false;
+ }
+
+ for (int k = 0; k < producers.length; k++) {
+ producers[k].sendMessage(payload, "counter", counter);
+ counter++;
+ if (resetConnection(counter)) {
+ resetCon = true;
+ break;
+ }
+ }
+ }
+ }
+
+
+ protected void close() throws Exception {
+ connection.close();
+ broker.stop();
+
+ memoryMonitoringTool.stopMonitor();
+ }
+
+ protected void createPayload(Session session) throws JMSException {
+
+ byte[] array = new byte[messageSize];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (byte) i;
+ }
+
+ BytesMessage bystePayload = session.createBytesMessage();
+ bystePayload.writeBytes(array);
+ payload = (Message) bystePayload;
+ }
+
+
+ protected void createProducers() throws JMSException {
+ producers = new MemProducer[producerCount];
+ for (int i = 0; i < producerCount; i++) {
+ producers[i] = new MemProducer(connectionFactory, destination);
+ if (durable) {
+ producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
+ } else {
+ producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+ producers[i].start();
+ }
+
+ }
+
+ protected void createConsumers() throws JMSException {
+ consumers = new MemConsumer[consumerCount];
+ for (int i = 0; i < consumerCount; i++) {
+ consumers[i] = new MemConsumer(connectionFactory, destination);
+ consumers[i].setParent(allMessagesList);
+ consumers[i].start();
+
+
+ }
+ }
+
+ protected void closeProducers() throws JMSException {
+ for (int i = 0; i < producerCount; i++) {
+ producers[i].shutDown();
+ }
+
+ }
+
+ protected void closeConsumers() throws JMSException {
+ for (int i = 0; i < consumerCount; i++) {
+ consumers[i].shutDown();
+ }
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws JMSException {
+
+ if (url == null || url.trim().equals("") || url.trim().equals("null"))
{
+ return new ActiveMQConnectionFactory("vm://localhost");
+ } else {
+ return new ActiveMQConnectionFactory(url);
+ }
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ configureBroker(broker);
+ broker.start();
+ return broker;
+ }
+
+ protected void configureBroker(BrokerService broker) throws Exception {
+ broker.addConnector("vm://localhost");
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+
+ protected Properties getSysTestSettings() {
+ Properties settings = new Properties();
+ settings.setProperty("domain", topic == true ? "topic" : "queue");
+ settings.setProperty("durable", durable == true ? "durable" :
"non-durable");
+ settings.setProperty("connection_checkpoint_size_kb", new
Integer(connectionCheckpointSize).toString());
+ settings.setProperty("producer_count", new
Integer(producerCount).toString());
+ settings.setProperty("consumer_count", new
Integer(consumerCount).toString());
+ settings.setProperty("message_count", new
Long(messageCount).toString());
+ settings.setProperty("message_size", new
Integer(messageSize).toString());
+ settings.setProperty("prefetchSize", new
Integer(prefetchSize).toString());
+ settings.setProperty("checkpoint_interval", new
Integer(checkpointInterval).toString());
+ settings.setProperty("destination_name", destinationName);
+ settings.setProperty("report_name", reportName);
+ settings.setProperty("report_directory", reportDirectory);
+ settings.setProperty("connection_checkpoint_size", new
Integer(connectionCheckpointSize).toString());
+ return settings;
+ }
+
+
+}
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemConsumer.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemConsumer.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemConsumer.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import org.apache.activemq.tool.MemMessageIdList;
+
+import javax.jms.*;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class MemConsumer extends MemMessageIdList implements MessageListener {
+ protected Connection connection;
+ protected MessageConsumer consumer;
+ protected long counter = 0;
+ protected boolean isParent = false;
+ protected boolean inOrder = true;
+
+
+ public MemConsumer() {
+ super();
+ }
+
+ public MemConsumer(ConnectionFactory fac, Destination dest, String
consumerName) throws JMSException {
+ connection = fac.createConnection();
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ if (dest instanceof Topic && consumerName != null &&
consumerName.length() > 0) {
+ consumer = s.createDurableSubscriber((Topic) dest, consumerName);
+ } else {
+ consumer = s.createConsumer(dest);
+ }
+ consumer.setMessageListener(this);
+ }
+
+ public MemConsumer(ConnectionFactory fac, Destination dest) throws
JMSException {
+ this(fac, dest, null);
+ }
+
+ public void start() throws JMSException {
+ connection.start();
+ }
+
+ public void stop() throws JMSException {
+ connection.stop();
+ }
+
+ public void shutDown() throws JMSException {
+ connection.close();
+ }
+
+
+ public Message receive() throws JMSException {
+ return consumer.receive();
+ }
+
+ public Message receive(long wait) throws JMSException {
+ return consumer.receive(wait);
+ }
+
+ static long ctr = 0;
+
+ public void onMessage(Message msg) {
+ super.onMessage(msg);
+
+ if (isParent) {
+ try {
+ long ctr = msg.getLongProperty("counter");
+ if (counter != ctr) {
+ inOrder = false;
+ }
+ counter++;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ public boolean isInOrder() {
+ return inOrder;
+ }
+
+
+ public void setAsParent(boolean isParent) {
+ this.isParent = isParent;
+ }
+
+ public boolean isParent() {
+ return this.isParent;
+ }
+
+
+}
\ No newline at end of file
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemMessageIdList.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemMessageIdList.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemMessageIdList.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemMessageIdList.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,177 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple container of messages for performing testing and rendezvous style
+ * code. You can use this class a [EMAIL PROTECTED] MessageListener} and then
make
+ * assertions about how many messages it has received allowing a certain
maximum
+ * amount of time to ensure that the test does not hang forever.
+ * <p/>
+ * Also you can chain these instances together with the
+ * [EMAIL PROTECTED] #setParent(MessageListener)} method so that you can
aggregate the
+ * total number of messages consumed across a number of consumers.
+ *
+ * @version $Revision: 1.6 $
+ */
+public class MemMessageIdList implements MessageListener {
+
+ protected static final Log log = LogFactory.getLog(MemMessageIdList.class);
+
+ private List messageIds = new ArrayList();
+ private Object semaphore;
+ private boolean verbose;
+ private MessageListener parent;
+ private long maximumDuration = 15000L;
+
+ public MemMessageIdList() {
+ this(new Object());
+ }
+
+ public MemMessageIdList(Object semaphore) {
+ this.semaphore = semaphore;
+ }
+
+ public boolean equals(Object that) {
+ if (that instanceof MemMessageIdList) {
+ MemMessageIdList thatListMem = (MemMessageIdList) that;
+ return getMessageIds().equals(thatListMem.getMessageIds());
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ synchronized (semaphore) {
+ return messageIds.hashCode() + 1;
+ }
+ }
+
+ public String toString() {
+ synchronized (semaphore) {
+ return messageIds.toString();
+ }
+ }
+
+ /**
+ * @return all the messages on the list so far, clearing the buffer
+ */
+ public List flushMessages() {
+ synchronized (semaphore) {
+ List answer = new ArrayList(messageIds);
+ messageIds.clear();
+ return answer;
+ }
+ }
+
+ public synchronized List getMessageIds() {
+ synchronized (semaphore) {
+ return new ArrayList(messageIds);
+ }
+ }
+
+ public void onMessage(Message message) {
+ String id = null;
+ try {
+ id = message.getJMSMessageID();
+ synchronized (semaphore) {
+ messageIds.add(id);
+ semaphore.notifyAll();
+ }
+ if (verbose) {
+ log.info("Received message: " + message);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ if (parent != null) {
+ parent.onMessage(message);
+ }
+ }
+
+ public int getMessageCount() {
+ synchronized (semaphore) {
+ return messageIds.size();
+ }
+ }
+
+ public void waitForMessagesToArrive(int messageCount) {
+ log.info("Waiting for " + messageCount + " message(s) to arrive");
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < messageCount; i++) {
+ try {
+ if (hasReceivedMessages(messageCount)) {
+ break;
+ }
+ long duration = System.currentTimeMillis() - start;
+ if (duration >= maximumDuration) {
+ break;
+ }
+ synchronized (semaphore) {
+ semaphore.wait(maximumDuration - duration);
+ }
+ } catch (InterruptedException e) {
+ log.info("Caught: " + e);
+ }
+ }
+ long end = System.currentTimeMillis() - start;
+
+ log.info("End of wait for " + end + " millis and received: " +
getMessageCount() + " messages");
+ }
+
+
+ public boolean hasReceivedMessage() {
+ return getMessageCount() == 0;
+ }
+
+ public boolean hasReceivedMessages(int messageCount) {
+ return getMessageCount() >= messageCount;
+ }
+
+ public boolean isVerbose() {
+ return verbose;
+ }
+
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ public MessageListener getParent() {
+ return parent;
+ }
+
+ /**
+ * Allows a parent listener to be specified such as to aggregate messages
+ * consumed across consumers
+ */
+ public void setParent(MessageListener parent) {
+ this.parent = parent;
+ }
+
+}
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemProducer.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemProducer.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemProducer.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemProducer.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,74 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class MemProducer {
+ protected Connection connection;
+ protected MessageProducer producer;
+
+ public MemProducer(ConnectionFactory fac, Destination dest) throws
JMSException {
+ connection = fac.createConnection();
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = s.createProducer(dest);
+ }
+
+ public void setDeliveryMode(int mode) throws JMSException {
+ producer.setDeliveryMode(mode);
+ }
+
+ public void start() throws JMSException {
+ connection.start();
+ }
+
+ public void stop() throws JMSException {
+ connection.stop();
+ }
+
+ public void shutDown() throws JMSException {
+ connection.close();
+ }
+
+ public void sendMessage(Message msg) throws JMSException {
+ sendMessage(msg, null, 0);
+ }
+
+ /*
+ * allow producer to attach message counter on its header. This will be
used to verify message order
+ *
+ */
+ public void sendMessage(Message msg, String headerName, long headerValue)
throws JMSException {
+ if (headerName != null) {
+ msg.setLongProperty(headerName, headerValue);
+ }
+
+ producer.send(msg);
+
+ }
+
+}
\ No newline at end of file
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemoryMonitoringTool.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemoryMonitoringTool.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemoryMonitoringTool.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/MemoryMonitoringTool.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,148 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tool;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+
+import java.io.DataOutputStream;
+import java.util.Properties;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.ManagementFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class MemoryMonitoringTool implements Runnable {
+
+ private long checkpointInterval = 5000; // 5 sec sample
checkpointInterval
+ private long resultIndex = 0;
+
+ private AtomicBoolean isRunning = new AtomicBoolean(false);
+ private DataOutputStream dataDoutputStream = null;
+
+ protected Properties testSettings = new Properties();
+ protected ReportGenerator reportGenerator = new ReportGenerator();
+ private MemoryMXBean memoryBean;
+
+ public Properties getTestSettings() {
+ return testSettings;
+ }
+
+ public void setTestSettings(Properties sysTestSettings) {
+ this.testSettings = sysTestSettings;
+ }
+
+ public DataOutputStream getDataOutputStream() {
+ return dataDoutputStream;
+ }
+
+ public void setDataOutputStream(DataOutputStream dataDoutputStream) {
+ this.dataDoutputStream = dataDoutputStream;
+ }
+
+
+ public void stopMonitor() {
+ isRunning.set(false);
+ }
+
+
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public void setCheckpointInterval(long checkpointInterval) {
+ this.checkpointInterval = checkpointInterval;
+ }
+
+
+ public Thread startMonitor() {
+
+ String intervalStr =
this.getTestSettings().getProperty("checkpoint_interval");
+ checkpointInterval = new Integer(intervalStr).intValue();
+ this.getTestSettings().remove("checkpoint_interval");
+
+ memoryBean = ManagementFactory.getMemoryMXBean();
+ reportGenerator.setTestSettings(getTestSettings());
+ addTestInformation();
+
+ Thread t = new Thread(this);
+ t.setName("Memory monitoring tool");
+ isRunning.set(true);
+ t.start();
+
+ return t;
+
+ }
+
+
+ public void addTestInformation() {
+
reportGenerator.setReportName(this.getTestSettings().getProperty("report_name"));
+
reportGenerator.setReportDirectory(this.getTestSettings().getProperty("report_directory"));
+ reportGenerator.startGenerateReport();
+
+ reportGenerator.addTestInformation();
+ reportGenerator.writeWithIndent(4, "<jvm_memory_settings>");
+ reportGenerator.writeWithIndent(6, "<heap_memory>");
+ reportGenerator.writeWithIndent(8, "<committed>" +
memoryBean.getHeapMemoryUsage().getCommitted() + "</committed>");
+ reportGenerator.writeWithIndent(8, "<max>" +
memoryBean.getHeapMemoryUsage().getMax() + "</max>");
+ reportGenerator.writeWithIndent(6, "</heap_memory>");
+ reportGenerator.writeWithIndent(6, "<non_heap_memory>");
+ reportGenerator.writeWithIndent(8, "<committed>" +
memoryBean.getNonHeapMemoryUsage().getCommitted() + "</committed>");
+ reportGenerator.writeWithIndent(8, "<max>" +
memoryBean.getNonHeapMemoryUsage().getMax() + "</max>");
+ reportGenerator.writeWithIndent(6, "</non_heap_memory>");
+ reportGenerator.writeWithIndent(4, "</jvm_memory_settings>");
+
+ reportGenerator.addClientSettings();
+ reportGenerator.endTestInformation();
+ }
+
+
+ public void run() {
+
+ long nonHeapMB = 0;
+ long heapMB = 0;
+ long oneMB = 1024 * 1024;
+
+ reportGenerator.startTestResult(getCheckpointInterval());
+ while (isRunning.get()) {
+
+ try {
+ //wait every check point before getting the next memory usage
+ Thread.sleep(checkpointInterval);
+
+ nonHeapMB = memoryBean.getNonHeapMemoryUsage().getUsed() /
oneMB;
+ heapMB = memoryBean.getHeapMemoryUsage().getUsed() / oneMB;
+
+ reportGenerator.writeWithIndent(6, "<memory_usage index=" +
resultIndex + " non_heap_mb=" + nonHeapMB + " non_heap_bytes=" +
memoryBean.getNonHeapMemoryUsage().getUsed() + " heap_mb=" + heapMB + "
heap_bytes=" + memoryBean.getHeapMemoryUsage().getUsed() + "/>");
+
+ resultIndex++;
+
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+
+
+ }
+ reportGenerator.endTestResult();
+ reportGenerator.stopGenerateReport();
+
+ }
+
+
+}
Added:
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/ReportGenerator.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/ReportGenerator.java?rev=412933&view=auto
==============================================================================
---
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/ReportGenerator.java
(added)
+++
incubator/activemq/trunk/tooling/maven-activemq-memtest-plugin/src/main/java/org/apache/activemq/tool/ReportGenerator.java
Thu Jun 8 22:18:11 2006
@@ -0,0 +1,159 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.tool;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Enumeration;
+import java.util.Properties;
+
+public class ReportGenerator {
+ private static final Log log = LogFactory.getLog(ReportGenerator.class);
+ private String reportDirectory = null;
+ private String reportName = null;
+ private PrintWriter writer = null;
+ private File reportFile = null;
+ private Properties testSettings;
+
+ public ReportGenerator() {
+ }
+
+ public ReportGenerator(String reportDirectory, String reportName) {
+ this.setReportDirectory(reportDirectory);
+ this.setReportName(reportName);
+ }
+
+ public void startGenerateReport() {
+
+
+ File reportDir = new File(getReportDirectory());
+
+ // Create output directory if it doesn't exist.
+ if (!reportDir.exists()) {
+ reportDir.mkdirs();
+ }
+
+
+ if (reportDir != null) {
+ reportFile = new File(this.getReportDirectory() + File.separator +
this.getReportName() + ".xml");
+ }
+
+ try {
+ this.writer = new PrintWriter(new FileOutputStream(reportFile));
+ } catch (IOException e1) {
+ e1.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
+ }
+ }
+
+ public void stopGenerateReport() {
+ writeWithIndent(0, "</test-report>");
+ this.getWriter().flush();
+ this.getWriter().close();
+ log.info(" TEST REPORT OUTPUT : " + reportFile.getAbsolutePath());
+
+
+ }
+
+ protected void addTestInformation() {
+
+ writeWithIndent(0, "<test-report>");
+ writeWithIndent(2, "<test-information>");
+
+ writeWithIndent(4, "<os-name>" + System.getProperty("os.name") +
"</os-name>");
+ writeWithIndent(4, "<java-version>" +
System.getProperty("java.version") + "</java-version>");
+
+ }
+
+
+ protected void addClientSettings() {
+ if (this.getTestSettings() != null) {
+ Enumeration keys = getTestSettings().propertyNames();
+
+ writeWithIndent(4, "<test-settings>");
+
+ String key;
+ while (keys.hasMoreElements()) {
+ key = (String) keys.nextElement();
+ writeWithIndent(6, "<" + key + ">" +
getTestSettings().get(key) + "</" + key + ">");
+ }
+
+ writeWithIndent(4, "</test-settings>");
+ }
+ }
+
+ protected void endTestInformation() {
+ writeWithIndent(2, "</test-information>");
+
+ }
+
+ protected void startTestResult(long checkpointInterval) {
+ long intervalInSec = checkpointInterval / 1000;
+ writeWithIndent(2, "<test-result checkpoint_interval_in_sec=" +
intervalInSec + " >");
+ }
+
+ protected void endTestResult() {
+ writeWithIndent(2, "</test-result>");
+ }
+
+
+ protected void writeWithIndent(int indent, String result) {
+ StringBuffer buffer = new StringBuffer();
+
+ for (int i = 0; i < indent; ++i) {
+ buffer.append(" ");
+ }
+
+ buffer.append(result);
+ writer.println(buffer.toString());
+ }
+
+ public PrintWriter getWriter() {
+ return this.writer;
+ }
+
+
+ public String getReportDirectory() {
+ return reportDirectory;
+ }
+
+ public void setReportDirectory(String reportDirectory) {
+ this.reportDirectory = reportDirectory;
+ }
+
+ public String getReportName() {
+ return reportName;
+ }
+
+
+ public void setReportName(String reportName) {
+ this.reportName = reportName;
+ }
+
+ public Properties getTestSettings() {
+ return testSettings;
+ }
+
+ public void setTestSettings(Properties testSettings) {
+ this.testSettings = testSettings;
+ }
+}