http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/pom.xml ---------------------------------------------------------------------- diff --git a/examples/soak/normal/pom.xml b/examples/soak/normal/pom.xml new file mode 100644 index 0000000..37cec41 --- /dev/null +++ b/examples/soak/normal/pom.xml @@ -0,0 +1,195 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <artifactId>hornetq-jms-soak-example</artifactId> + <packaging>jar</packaging> + <name>HornetQ Soak Normal Example</name> + + <parent> + <groupId>org.hornetq.example.soak</groupId> + <artifactId>soak-examples</artifactId> + <version>2.5.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <properties> + <server.dir>${basedir}/server0/</server.dir> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-maven-plugin</artifactId> + <dependencies> + <dependency> + <groupId>org.hornetq.example.soak</groupId> + <artifactId>hornetq-jms-soak-example</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>org.jboss.naming</groupId> + <artifactId>jnpserver</artifactId> + <version>5.0.3.GA</version> + </dependency> + </dependencies> + <configuration> + <waitOnStart>false</waitOnStart> + <systemProperties> + <property> + <name>build.directory</name> + <value>${basedir}/target/</value> + </property> + </systemProperties> + </configuration> + </plugin> + </plugins> + </build> + + <profiles> + <profile> + <id>local</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-maven-plugin</artifactId> + <executions> + <execution> + <id>start</id> + <goals> + <goal>start</goal> + </goals> + <configuration> + <fork>true</fork> + <hornetqConfigurationDir>${server.dir}</hornetqConfigurationDir> + </configuration> + </execution> + <execution> + <id>runConsumer</id> + <goals> + <goal>runClient</goal> + </goals> + <configuration> + <clientClass>org.hornetq.jms.soak.example.SoakReceiver</clientClass> + <args> + <param>jnp://localhost:1099</param> + </args> + </configuration> + </execution> + <execution> + <id>runProducer</id> + <goals> + <goal>runClient</goal> + </goals> + <configuration> + <clientClass>org.hornetq.jms.soak.example.SoakSender</clientClass> + <args> + <param>jnp://localhost:1099</param> + </args> + </configuration> + </execution> + <execution> + <id>stop</id> + <goals> + <goal>stop</goal> + </goals> + <configuration> + <hornetqConfigurationDir>${server.dir}</hornetqConfigurationDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>remote</id> + <build> + <plugins> + <plugin> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-maven-plugin</artifactId> + <executions> + <execution> + <id>runConsumer</id> + <goals> + <goal>runClient</goal> + </goals> + <configuration> + <clientClass>org.hornetq.jms.soak.example.SoakReceiver</clientClass> + <args> + <param>jnp://localhost:1099</param> + </args> + </configuration> + </execution> + <execution> + <id>runProducer</id> + <goals> + <goal>runClient</goal> + </goals> + <configuration> + <clientClass>org.hornetq.jms.soak.example.SoakSender</clientClass> + <args> + <param>jnp://localhost:1099</param> + </args> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>server</id> + <build> + <plugins> + <plugin> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-maven-plugin</artifactId> + <executions> + <execution> + <id>start</id> + <goals> + <goal>start</goal> + </goals> + <configuration> + <hornetqConfigurationDir>${server.dir}</hornetqConfigurationDir> + <fork>true</fork> + <waitOnStart>true</waitOnStart> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/server0/hornetq-beans.xml ---------------------------------------------------------------------- diff --git a/examples/soak/normal/server0/hornetq-beans.xml b/examples/soak/normal/server0/hornetq-beans.xml deleted file mode 100644 index cd364f3..0000000 --- a/examples/soak/normal/server0/hornetq-beans.xml +++ /dev/null @@ -1,40 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<deployment xmlns="urn:jboss:bean-deployer:2.0"> - -<bean name="Naming" class="org.jnp.server.NamingBeanImpl"/> - - <!-- JNDI server. Disable this if you don't want JNDI --> - <bean name="JNDIServer" class="org.jnp.server.Main"> - <property name="namingInfo"> - <inject bean="Naming"/> - </property> - <property name="port">1099</property> - <property name="bindAddress">localhost</property> - <property name="rmiPort">1098</property> - <property name="rmiBindAddress">localhost</property> - </bean> - - <!-- The core configuration --> - <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/> - - <!-- The core server --> - <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl"> - <constructor> - <parameter> - <inject bean="Configuration"/> - </parameter> - </constructor> - <start ignored="true"/> - <stop ignored="true"/> - </bean> - - <!-- The JMS server --> - <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl"> - <constructor> - <parameter> - <inject bean="HornetQServer"/> - </parameter> - </constructor> - </bean> -</deployment> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakBase.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakBase.java b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakBase.java new file mode 100644 index 0000000..dd654ac --- /dev/null +++ b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakBase.java @@ -0,0 +1,129 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.soak.example; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; +import java.util.Random; +import java.util.logging.Logger; + +/** + * + * A SoakBase + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Jeff Mesnil</a> + * + * + */ +public class SoakBase +{ + private static final Logger log = Logger.getLogger(SoakBase.class.getName()); + + private static final String DEFAULT_SOAK_PROPERTIES_FILE_NAME = "soak.properties"; + + public static final int TO_MILLIS = 60 * 1000; // from minute to milliseconds + + public static byte[] randomByteArray(final int length) + { + byte[] bytes = new byte[length]; + + Random random = new Random(); + + for (int i = 0; i < length; i++) + { + bytes[i] = Integer.valueOf(random.nextInt()).byteValue(); + } + + return bytes; + } + + protected static String getPerfFileName() + { + String fileName = System.getProperty("soak.props"); + if (fileName == null) + { + fileName = SoakBase.DEFAULT_SOAK_PROPERTIES_FILE_NAME; + } + return fileName; + } + + protected static SoakParams getParams(final String fileName) throws Exception + { + Properties props = null; + + InputStream is = null; + + try + { + is = new FileInputStream(fileName); + + props = new Properties(); + + props.load(is); + } + finally + { + if (is != null) + { + is.close(); + } + } + + int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes")); + int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages")); + int messageSize = Integer.valueOf(props.getProperty("message-size")); + boolean durable = Boolean.valueOf(props.getProperty("durable")); + boolean transacted = Boolean.valueOf(props.getProperty("transacted")); + int batchSize = Integer.valueOf(props.getProperty("batch-size")); + boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue")); + String destinationLookup = props.getProperty("destination-lookup"); + String connectionFactoryLookup = props.getProperty("connection-factory-lookup"); + int throttleRate = Integer.valueOf(props.getProperty("throttle-rate")); + boolean dupsOK = Boolean.valueOf(props.getProperty("dups-ok-acknowlege")); + boolean disableMessageID = Boolean.valueOf(props.getProperty("disable-message-id")); + boolean disableTimestamp = Boolean.valueOf(props.getProperty("disable-message-timestamp")); + + SoakBase.log.info("duration-in-minutes: " + durationInMinutes); + SoakBase.log.info("num-warmup-messages: " + noOfWarmupMessages); + SoakBase.log.info("message-size: " + messageSize); + SoakBase.log.info("durable: " + durable); + SoakBase.log.info("transacted: " + transacted); + SoakBase.log.info("batch-size: " + batchSize); + SoakBase.log.info("drain-queue: " + drainQueue); + SoakBase.log.info("throttle-rate: " + throttleRate); + SoakBase.log.info("connection-factory-lookup: " + connectionFactoryLookup); + SoakBase.log.info("destination-lookup: " + destinationLookup); + SoakBase.log.info("disable-message-id: " + disableMessageID); + SoakBase.log.info("disable-message-timestamp: " + disableTimestamp); + SoakBase.log.info("dups-ok-acknowledge: " + dupsOK); + + SoakParams soakParams = new SoakParams(); + soakParams.setDurationInMinutes(durationInMinutes); + soakParams.setNoOfWarmupMessages(noOfWarmupMessages); + soakParams.setMessageSize(messageSize); + soakParams.setDurable(durable); + soakParams.setSessionTransacted(transacted); + soakParams.setBatchSize(batchSize); + soakParams.setDrainQueue(drainQueue); + soakParams.setConnectionFactoryLookup(connectionFactoryLookup); + soakParams.setDestinationLookup(destinationLookup); + soakParams.setThrottleRate(throttleRate); + soakParams.setDisableMessageID(disableMessageID); + soakParams.setDisableTimestamp(disableTimestamp); + soakParams.setDupsOK(dupsOK); + + return soakParams; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakParams.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakParams.java b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakParams.java new file mode 100644 index 0000000..9b94d2b --- /dev/null +++ b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakParams.java @@ -0,0 +1,183 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.soak.example; + +import java.io.Serializable; + +/** + * + * Class that holds the parameters used in the performance examples + * + * @author <a href="[email protected]">Andy Taylor</a> + */ +public class SoakParams implements Serializable +{ + private static final long serialVersionUID = -4336539641012356002L; + + private int durationInMinutes = 60; + + private int noOfWarmupMessages; + + private int messageSize = 1024; // in bytes + + private boolean durable = false; + + private boolean isSessionTransacted = false; + + private int batchSize = 5000; + + private boolean drainQueue = true; + + private String connectionFactoryLookup; + + private String destinationLookup; + + private int throttleRate; + + private boolean disableMessageID; + + private boolean disableTimestamp; + + private boolean dupsOK; + + public synchronized int getDurationInMinutes() + { + return durationInMinutes; + } + + public synchronized void setDurationInMinutes(final int durationInMinutes) + { + this.durationInMinutes = durationInMinutes; + } + + public synchronized int getNoOfWarmupMessages() + { + return noOfWarmupMessages; + } + + public synchronized void setNoOfWarmupMessages(final int noOfWarmupMessages) + { + this.noOfWarmupMessages = noOfWarmupMessages; + } + + public synchronized int getMessageSize() + { + return messageSize; + } + + public synchronized void setMessageSize(final int messageSize) + { + this.messageSize = messageSize; + } + + public synchronized boolean isDurable() + { + return durable; + } + + public synchronized void setDurable(final boolean durable) + { + this.durable = durable; + } + + public synchronized boolean isSessionTransacted() + { + return isSessionTransacted; + } + + public synchronized void setSessionTransacted(final boolean isSessionTransacted) + { + this.isSessionTransacted = isSessionTransacted; + } + + public synchronized int getBatchSize() + { + return batchSize; + } + + public synchronized void setBatchSize(final int batchSize) + { + this.batchSize = batchSize; + } + + public synchronized boolean isDrainQueue() + { + return drainQueue; + } + + public synchronized void setDrainQueue(final boolean drainQueue) + { + this.drainQueue = drainQueue; + } + + public synchronized String getConnectionFactoryLookup() + { + return connectionFactoryLookup; + } + + public synchronized void setConnectionFactoryLookup(final String connectionFactoryLookup) + { + this.connectionFactoryLookup = connectionFactoryLookup; + } + + public synchronized String getDestinationLookup() + { + return destinationLookup; + } + + public synchronized void setDestinationLookup(final String destinationLookup) + { + this.destinationLookup = destinationLookup; + } + + public synchronized int getThrottleRate() + { + return throttleRate; + } + + public synchronized void setThrottleRate(final int throttleRate) + { + this.throttleRate = throttleRate; + } + + public synchronized boolean isDisableMessageID() + { + return disableMessageID; + } + + public synchronized void setDisableMessageID(final boolean disableMessageID) + { + this.disableMessageID = disableMessageID; + } + + public synchronized boolean isDisableTimestamp() + { + return disableTimestamp; + } + + public synchronized void setDisableTimestamp(final boolean disableTimestamp) + { + this.disableTimestamp = disableTimestamp; + } + + public synchronized boolean isDupsOK() + { + return dupsOK; + } + + public synchronized void setDupsOK(final boolean dupsOK) + { + this.dupsOK = dupsOK; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakReceiver.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakReceiver.java b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakReceiver.java new file mode 100644 index 0000000..913a2f7 --- /dev/null +++ b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakReceiver.java @@ -0,0 +1,244 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.soak.example; + +import java.lang.Override; +import java.lang.Runnable; +import java.util.Hashtable; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +public class SoakReceiver +{ + private static final Logger log = Logger.getLogger(SoakReceiver.class.getName()); + + private static final String EOF = UUID.randomUUID().toString(); + + public static void main(final String[] args) + { + Runnable runnable = new Runnable() + { + @Override + public void run() + { + + String jndiURL = System.getProperty("jndi.address"); + if(jndiURL == null) + { + jndiURL = args.length > 0 ? args[0] : "jnp://localhost:1099"; + } + + System.out.println("Connecting to JNDI at " + jndiURL); + + try + { + String fileName = SoakBase.getPerfFileName(); + + SoakParams params = SoakBase.getParams(fileName); + + Hashtable<String, String> jndiProps = new Hashtable<String, String>(); + jndiProps.put("java.naming.provider.url", jndiURL); + jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); + jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); + + final SoakReceiver receiver = new SoakReceiver(jndiProps, params); + + Runtime.getRuntime().addShutdownHook(new Thread() + { + @Override + public void run() + { + receiver.disconnect(); + } + }); + + receiver.run(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }; + + Thread t = new Thread(runnable); + t.start(); + } + + private final Hashtable<String, String> jndiProps; + + private final SoakParams perfParams; + + private final ExceptionListener exceptionListener = new ExceptionListener() + { + public void onException(final JMSException e) + { + disconnect(); + connect(); + } + }; + + private final MessageListener listener = new MessageListener() + { + int modulo = 10000; + + private final AtomicLong count = new AtomicLong(0); + + private final long start = System.currentTimeMillis(); + + long moduloStart = start; + + public void onMessage(final Message msg) + { + long totalDuration = System.currentTimeMillis() - start; + + try + { + if (SoakReceiver.EOF.equals(msg.getStringProperty("eof"))) + { + SoakReceiver.log.info(String.format("Received %s messages in %.2f minutes", count, 1.0 * totalDuration / + SoakBase.TO_MILLIS)); + SoakReceiver.log.info("END OF RUN"); + + return; + } + } + catch (JMSException e1) + { + e1.printStackTrace(); + } + if (count.incrementAndGet() % modulo == 0) + { + double duration = (1.0 * System.currentTimeMillis() - moduloStart) / 1000; + moduloStart = System.currentTimeMillis(); + SoakReceiver.log.info(String.format("received %s messages in %2.2fs (total: %.0fs)", + modulo, + duration, + totalDuration / 1000.0)); + } + } + }; + + private Session session; + + private Connection connection; + + private SoakReceiver(final Hashtable<String, String> jndiProps, final SoakParams perfParams) + { + this.jndiProps = jndiProps; + this.perfParams = perfParams; + } + + public void run() throws Exception + { + connect(); + + boolean runInfinitely = perfParams.getDurationInMinutes() == -1; + + if (!runInfinitely) + { + Thread.sleep(perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS); + + // send EOF message + Message eof = session.createMessage(); + eof.setStringProperty("eof", SoakReceiver.EOF); + listener.onMessage(eof); + + if (connection != null) + { + connection.close(); + connection = null; + } + } + else + { + while (true) + { + Thread.sleep(500); + } + } + } + + private void disconnect() + { + if (connection != null) + { + try + { + connection.setExceptionListener(null); + connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + finally + { + connection = null; + } + } + } + + private void connect() + { + InitialContext ic = null; + try + { + ic = new InitialContext(jndiProps); + + ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup()); + + Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup()); + + connection = factory.createConnection(); + connection.setExceptionListener(exceptionListener); + + session = connection.createSession(perfParams.isSessionTransacted(), + perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE + : Session.AUTO_ACKNOWLEDGE); + + MessageConsumer messageConsumer = session.createConsumer(destination); + messageConsumer.setMessageListener(listener); + + connection.start(); + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + try + { + ic.close(); + } + catch (NamingException e) + { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakSender.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakSender.java b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakSender.java new file mode 100644 index 0000000..3bc0705 --- /dev/null +++ b/examples/soak/normal/src/main/java/org/hornetq/jms/soak/example/SoakSender.java @@ -0,0 +1,245 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.jms.soak.example; + +import java.util.Hashtable; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.hornetq.utils.TokenBucketLimiter; +import org.hornetq.utils.TokenBucketLimiterImpl; + +public class SoakSender +{ + private static final Logger log = Logger.getLogger(SoakSender.class.getName()); + + public static void main(final String[] args) + { + String jndiURL = System.getProperty("jndi.address"); + if(jndiURL == null) + { + jndiURL = args.length > 0 ? args[0] : "jnp://localhost:1099"; + } + + System.out.println("Connecting to JNDI at " + jndiURL); + try + { + String fileName = SoakBase.getPerfFileName(); + + SoakParams params = SoakBase.getParams(fileName); + + Hashtable<String, String> jndiProps = new Hashtable<String, String>(); + jndiProps.put("java.naming.provider.url", jndiURL); + jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); + jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); + + final SoakSender sender = new SoakSender(jndiProps, params); + + Runtime.getRuntime().addShutdownHook(new Thread() + { + @Override + public void run() + { + sender.disconnect(); + } + }); + + sender.run(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + private final SoakParams perfParams; + + private final Hashtable<String, String> jndiProps; + + private Connection connection; + + private Session session; + + private MessageProducer producer; + + private final ExceptionListener exceptionListener = new ExceptionListener() + { + public void onException(final JMSException e) + { + System.out.println("SoakReconnectableSender.exceptionListener.new ExceptionListener() {...}.onException()"); + disconnect(); + connect(); + } + + }; + + private SoakSender(final Hashtable<String, String> jndiProps, final SoakParams perfParams) + { + this.jndiProps = jndiProps; + this.perfParams = perfParams; + } + + public void run() throws Exception + { + connect(); + + boolean runInfinitely = perfParams.getDurationInMinutes() == -1; + + BytesMessage message = session.createBytesMessage(); + + byte[] payload = SoakBase.randomByteArray(perfParams.getMessageSize()); + + message.writeBytes(payload); + + final int modulo = 10000; + + TokenBucketLimiter tbl = perfParams.getThrottleRate() != -1 ? new TokenBucketLimiterImpl(perfParams.getThrottleRate(), + false) + : null; + + boolean transacted = perfParams.isSessionTransacted(); + int txBatchSize = perfParams.getBatchSize(); + boolean display = true; + + long start = System.currentTimeMillis(); + long moduleStart = start; + AtomicLong count = new AtomicLong(0); + while (true) + { + try + { + producer.send(message); + count.incrementAndGet(); + + if (transacted) + { + if (count.longValue() % txBatchSize == 0) + { + session.commit(); + } + } + + long totalDuration = System.currentTimeMillis() - start; + + if (display && count.longValue() % modulo == 0) + { + double duration = (1.0 * System.currentTimeMillis() - moduleStart) / 1000; + moduleStart = System.currentTimeMillis(); + SoakSender.log.info(String.format("sent %s messages in %2.2fs (time: %.0fs)", + modulo, + duration, + totalDuration / 1000.0)); + } + + if (tbl != null) + { + tbl.limit(); + } + + if (!runInfinitely && totalDuration > perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS) + { + break; + } + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + SoakSender.log.info(String.format("Sent %s messages in %s minutes", count, perfParams.getDurationInMinutes())); + SoakSender.log.info("END OF RUN"); + + if (connection != null) + { + connection.close(); + connection = null; + } + } + + private synchronized void disconnect() + { + if (connection != null) + { + try + { + connection.setExceptionListener(null); + connection.close(); + } + catch (JMSException e) + { + e.printStackTrace(); + } + finally + { + connection = null; + } + } + } + + private void connect() + { + InitialContext ic = null; + try + { + ic = new InitialContext(jndiProps); + + ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup()); + + Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup()); + + connection = factory.createConnection(); + + session = connection.createSession(perfParams.isSessionTransacted(), + perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE + : Session.AUTO_ACKNOWLEDGE); + + producer = session.createProducer(destination); + + producer.setDeliveryMode(perfParams.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + producer.setDisableMessageID(perfParams.isDisableMessageID()); + + producer.setDisableMessageTimestamp(perfParams.isDisableTimestamp()); + + connection.setExceptionListener(exceptionListener); + } + catch (Exception e) + { + e.printStackTrace(); + } + finally + { + try + { + ic.close(); + } + catch (NamingException e) + { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakBase.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakBase.java b/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakBase.java deleted file mode 100644 index 1a413bd..0000000 --- a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakBase.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.soak.example; - -import java.io.FileInputStream; -import java.io.InputStream; -import java.util.Properties; -import java.util.Random; -import java.util.logging.Logger; - -/** - * - * A SoakBase - * - * @author <a href="mailto:[email protected]">Tim Fox</a> - * @author <a href="mailto:[email protected]">Jeff Mesnil</a> - * - * - */ -public class SoakBase -{ - private static final Logger log = Logger.getLogger(SoakBase.class.getName()); - - private static final String DEFAULT_SOAK_PROPERTIES_FILE_NAME = "soak.properties"; - - public static final int TO_MILLIS = 60 * 1000; // from minute to milliseconds - - public static byte[] randomByteArray(final int length) - { - byte[] bytes = new byte[length]; - - Random random = new Random(); - - for (int i = 0; i < length; i++) - { - bytes[i] = Integer.valueOf(random.nextInt()).byteValue(); - } - - return bytes; - } - - protected static String getPerfFileName(final String[] args) - { - String fileName; - - if (args.length > 1) - { - fileName = args[1]; - } - else - { - fileName = SoakBase.DEFAULT_SOAK_PROPERTIES_FILE_NAME; - } - return fileName; - } - - protected static SoakParams getParams(final String fileName) throws Exception - { - Properties props = null; - - InputStream is = null; - - try - { - is = new FileInputStream(fileName); - - props = new Properties(); - - props.load(is); - } - finally - { - if (is != null) - { - is.close(); - } - } - - int durationInMinutes = Integer.valueOf(props.getProperty("duration-in-minutes")); - int noOfWarmupMessages = Integer.valueOf(props.getProperty("num-warmup-messages")); - int messageSize = Integer.valueOf(props.getProperty("message-size")); - boolean durable = Boolean.valueOf(props.getProperty("durable")); - boolean transacted = Boolean.valueOf(props.getProperty("transacted")); - int batchSize = Integer.valueOf(props.getProperty("batch-size")); - boolean drainQueue = Boolean.valueOf(props.getProperty("drain-queue")); - String destinationLookup = props.getProperty("destination-lookup"); - String connectionFactoryLookup = props.getProperty("connection-factory-lookup"); - int throttleRate = Integer.valueOf(props.getProperty("throttle-rate")); - boolean dupsOK = Boolean.valueOf(props.getProperty("dups-ok-acknowlege")); - boolean disableMessageID = Boolean.valueOf(props.getProperty("disable-message-id")); - boolean disableTimestamp = Boolean.valueOf(props.getProperty("disable-message-timestamp")); - - SoakBase.log.info("duration-in-minutes: " + durationInMinutes); - SoakBase.log.info("num-warmup-messages: " + noOfWarmupMessages); - SoakBase.log.info("message-size: " + messageSize); - SoakBase.log.info("durable: " + durable); - SoakBase.log.info("transacted: " + transacted); - SoakBase.log.info("batch-size: " + batchSize); - SoakBase.log.info("drain-queue: " + drainQueue); - SoakBase.log.info("throttle-rate: " + throttleRate); - SoakBase.log.info("connection-factory-lookup: " + connectionFactoryLookup); - SoakBase.log.info("destination-lookup: " + destinationLookup); - SoakBase.log.info("disable-message-id: " + disableMessageID); - SoakBase.log.info("disable-message-timestamp: " + disableTimestamp); - SoakBase.log.info("dups-ok-acknowledge: " + dupsOK); - - SoakParams soakParams = new SoakParams(); - soakParams.setDurationInMinutes(durationInMinutes); - soakParams.setNoOfWarmupMessages(noOfWarmupMessages); - soakParams.setMessageSize(messageSize); - soakParams.setDurable(durable); - soakParams.setSessionTransacted(transacted); - soakParams.setBatchSize(batchSize); - soakParams.setDrainQueue(drainQueue); - soakParams.setConnectionFactoryLookup(connectionFactoryLookup); - soakParams.setDestinationLookup(destinationLookup); - soakParams.setThrottleRate(throttleRate); - soakParams.setDisableMessageID(disableMessageID); - soakParams.setDisableTimestamp(disableTimestamp); - soakParams.setDupsOK(dupsOK); - - return soakParams; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakParams.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakParams.java b/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakParams.java deleted file mode 100644 index 9b94d2b..0000000 --- a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakParams.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.soak.example; - -import java.io.Serializable; - -/** - * - * Class that holds the parameters used in the performance examples - * - * @author <a href="[email protected]">Andy Taylor</a> - */ -public class SoakParams implements Serializable -{ - private static final long serialVersionUID = -4336539641012356002L; - - private int durationInMinutes = 60; - - private int noOfWarmupMessages; - - private int messageSize = 1024; // in bytes - - private boolean durable = false; - - private boolean isSessionTransacted = false; - - private int batchSize = 5000; - - private boolean drainQueue = true; - - private String connectionFactoryLookup; - - private String destinationLookup; - - private int throttleRate; - - private boolean disableMessageID; - - private boolean disableTimestamp; - - private boolean dupsOK; - - public synchronized int getDurationInMinutes() - { - return durationInMinutes; - } - - public synchronized void setDurationInMinutes(final int durationInMinutes) - { - this.durationInMinutes = durationInMinutes; - } - - public synchronized int getNoOfWarmupMessages() - { - return noOfWarmupMessages; - } - - public synchronized void setNoOfWarmupMessages(final int noOfWarmupMessages) - { - this.noOfWarmupMessages = noOfWarmupMessages; - } - - public synchronized int getMessageSize() - { - return messageSize; - } - - public synchronized void setMessageSize(final int messageSize) - { - this.messageSize = messageSize; - } - - public synchronized boolean isDurable() - { - return durable; - } - - public synchronized void setDurable(final boolean durable) - { - this.durable = durable; - } - - public synchronized boolean isSessionTransacted() - { - return isSessionTransacted; - } - - public synchronized void setSessionTransacted(final boolean isSessionTransacted) - { - this.isSessionTransacted = isSessionTransacted; - } - - public synchronized int getBatchSize() - { - return batchSize; - } - - public synchronized void setBatchSize(final int batchSize) - { - this.batchSize = batchSize; - } - - public synchronized boolean isDrainQueue() - { - return drainQueue; - } - - public synchronized void setDrainQueue(final boolean drainQueue) - { - this.drainQueue = drainQueue; - } - - public synchronized String getConnectionFactoryLookup() - { - return connectionFactoryLookup; - } - - public synchronized void setConnectionFactoryLookup(final String connectionFactoryLookup) - { - this.connectionFactoryLookup = connectionFactoryLookup; - } - - public synchronized String getDestinationLookup() - { - return destinationLookup; - } - - public synchronized void setDestinationLookup(final String destinationLookup) - { - this.destinationLookup = destinationLookup; - } - - public synchronized int getThrottleRate() - { - return throttleRate; - } - - public synchronized void setThrottleRate(final int throttleRate) - { - this.throttleRate = throttleRate; - } - - public synchronized boolean isDisableMessageID() - { - return disableMessageID; - } - - public synchronized void setDisableMessageID(final boolean disableMessageID) - { - this.disableMessageID = disableMessageID; - } - - public synchronized boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public synchronized void setDisableTimestamp(final boolean disableTimestamp) - { - this.disableTimestamp = disableTimestamp; - } - - public synchronized boolean isDupsOK() - { - return dupsOK; - } - - public synchronized void setDupsOK(final boolean dupsOK) - { - this.dupsOK = dupsOK; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakReceiver.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakReceiver.java b/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakReceiver.java deleted file mode 100644 index 7ecd710..0000000 --- a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakReceiver.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.soak.example; - -import java.util.Hashtable; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -public class SoakReceiver -{ - private static final Logger log = Logger.getLogger(SoakReceiver.class.getName()); - - private static final String EOF = UUID.randomUUID().toString(); - - public static void main(final String[] args) - { - for (int i = 0; i < args.length; i++) - { - System.out.println(i + ":" + args[i]); - } - String jndiURL = "jndi://localhost:1099"; - if (args.length > 0) - { - jndiURL = args[0]; - } - - System.out.println("Connecting to JNDI at " + jndiURL); - - try - { - String fileName = SoakBase.getPerfFileName(args); - - SoakParams params = SoakBase.getParams(fileName); - - Hashtable<String, String> jndiProps = new Hashtable<String, String>(); - jndiProps.put("java.naming.provider.url", jndiURL); - jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); - jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); - - final SoakReceiver receiver = new SoakReceiver(jndiProps, params); - - Runtime.getRuntime().addShutdownHook(new Thread() - { - @Override - public void run() - { - receiver.disconnect(); - } - }); - - receiver.run(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private final Hashtable<String, String> jndiProps; - - private final SoakParams perfParams; - - private final ExceptionListener exceptionListener = new ExceptionListener() - { - public void onException(final JMSException e) - { - disconnect(); - connect(); - } - }; - - private final MessageListener listener = new MessageListener() - { - int modulo = 10000; - - private final AtomicLong count = new AtomicLong(0); - - private final long start = System.currentTimeMillis(); - - long moduloStart = start; - - public void onMessage(final Message msg) - { - long totalDuration = System.currentTimeMillis() - start; - - try - { - if (SoakReceiver.EOF.equals(msg.getStringProperty("eof"))) - { - SoakReceiver.log.info(String.format("Received %s messages in %.2f minutes", count, 1.0 * totalDuration / - SoakBase.TO_MILLIS)); - SoakReceiver.log.info("END OF RUN"); - - return; - } - } - catch (JMSException e1) - { - e1.printStackTrace(); - } - if (count.incrementAndGet() % modulo == 0) - { - double duration = (1.0 * System.currentTimeMillis() - moduloStart) / 1000; - moduloStart = System.currentTimeMillis(); - SoakReceiver.log.info(String.format("received %s messages in %2.2fs (total: %.0fs)", - modulo, - duration, - totalDuration / 1000.0)); - } - } - }; - - private Session session; - - private Connection connection; - - private SoakReceiver(final Hashtable<String, String> jndiProps, final SoakParams perfParams) - { - this.jndiProps = jndiProps; - this.perfParams = perfParams; - } - - public void run() throws Exception - { - connect(); - - boolean runInfinitely = perfParams.getDurationInMinutes() == -1; - - if (!runInfinitely) - { - Thread.sleep(perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS); - - // send EOF message - Message eof = session.createMessage(); - eof.setStringProperty("eof", SoakReceiver.EOF); - listener.onMessage(eof); - - if (connection != null) - { - connection.close(); - connection = null; - } - } - else - { - while (true) - { - Thread.sleep(500); - } - } - } - - private void disconnect() - { - if (connection != null) - { - try - { - connection.setExceptionListener(null); - connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); - } - finally - { - connection = null; - } - } - } - - private void connect() - { - InitialContext ic = null; - try - { - ic = new InitialContext(jndiProps); - - ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup()); - - Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup()); - - connection = factory.createConnection(); - connection.setExceptionListener(exceptionListener); - - session = connection.createSession(perfParams.isSessionTransacted(), - perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE - : Session.AUTO_ACKNOWLEDGE); - - MessageConsumer messageConsumer = session.createConsumer(destination); - messageConsumer.setMessageListener(listener); - - connection.start(); - } - catch (Exception e) - { - e.printStackTrace(); - } - finally - { - try - { - ic.close(); - } - catch (NamingException e) - { - e.printStackTrace(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakSender.java ---------------------------------------------------------------------- diff --git a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakSender.java b/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakSender.java deleted file mode 100644 index 009b27b..0000000 --- a/examples/soak/normal/src/org/hornetq/jms/soak/example/SoakSender.java +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.soak.example; - -import java.util.Hashtable; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Logger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.hornetq.utils.TokenBucketLimiter; -import org.hornetq.utils.TokenBucketLimiterImpl; - -public class SoakSender -{ - private static final Logger log = Logger.getLogger(SoakSender.class.getName()); - - public static void main(final String[] args) - { - for (int i = 0; i < args.length; i++) - { - System.out.println(i + ":" + args[i]); - } - String jndiURL = "jndi://localhost:1099"; - if (args.length > 0) - { - jndiURL = args[0]; - } - - System.out.println("Connecting to JNDI at " + jndiURL); - try - { - String fileName = SoakBase.getPerfFileName(args); - - SoakParams params = SoakBase.getParams(fileName); - - Hashtable<String, String> jndiProps = new Hashtable<String, String>(); - jndiProps.put("java.naming.provider.url", jndiURL); - jndiProps.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); - jndiProps.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); - - final SoakSender sender = new SoakSender(jndiProps, params); - - Runtime.getRuntime().addShutdownHook(new Thread() - { - @Override - public void run() - { - sender.disconnect(); - } - }); - - sender.run(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private final SoakParams perfParams; - - private final Hashtable<String, String> jndiProps; - - private Connection connection; - - private Session session; - - private MessageProducer producer; - - private final ExceptionListener exceptionListener = new ExceptionListener() - { - public void onException(final JMSException e) - { - System.out.println("SoakReconnectableSender.exceptionListener.new ExceptionListener() {...}.onException()"); - disconnect(); - connect(); - } - - }; - - private SoakSender(final Hashtable<String, String> jndiProps, final SoakParams perfParams) - { - this.jndiProps = jndiProps; - this.perfParams = perfParams; - } - - public void run() throws Exception - { - connect(); - - boolean runInfinitely = perfParams.getDurationInMinutes() == -1; - - BytesMessage message = session.createBytesMessage(); - - byte[] payload = SoakBase.randomByteArray(perfParams.getMessageSize()); - - message.writeBytes(payload); - - final int modulo = 10000; - - TokenBucketLimiter tbl = perfParams.getThrottleRate() != -1 ? new TokenBucketLimiterImpl(perfParams.getThrottleRate(), - false) - : null; - - boolean transacted = perfParams.isSessionTransacted(); - int txBatchSize = perfParams.getBatchSize(); - boolean display = true; - - long start = System.currentTimeMillis(); - long moduleStart = start; - AtomicLong count = new AtomicLong(0); - while (true) - { - try - { - producer.send(message); - count.incrementAndGet(); - - if (transacted) - { - if (count.longValue() % txBatchSize == 0) - { - session.commit(); - } - } - - long totalDuration = System.currentTimeMillis() - start; - - if (display && count.longValue() % modulo == 0) - { - double duration = (1.0 * System.currentTimeMillis() - moduleStart) / 1000; - moduleStart = System.currentTimeMillis(); - SoakSender.log.info(String.format("sent %s messages in %2.2fs (time: %.0fs)", - modulo, - duration, - totalDuration / 1000.0)); - } - - if (tbl != null) - { - tbl.limit(); - } - - if (!runInfinitely && totalDuration > perfParams.getDurationInMinutes() * SoakBase.TO_MILLIS) - { - break; - } - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - SoakSender.log.info(String.format("Sent %s messages in %s minutes", count, perfParams.getDurationInMinutes())); - SoakSender.log.info("END OF RUN"); - - if (connection != null) - { - connection.close(); - connection = null; - } - } - - private synchronized void disconnect() - { - if (connection != null) - { - try - { - connection.setExceptionListener(null); - connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); - } - finally - { - connection = null; - } - } - } - - private void connect() - { - InitialContext ic = null; - try - { - ic = new InitialContext(jndiProps); - - ConnectionFactory factory = (ConnectionFactory)ic.lookup(perfParams.getConnectionFactoryLookup()); - - Destination destination = (Destination)ic.lookup(perfParams.getDestinationLookup()); - - connection = factory.createConnection(); - - session = connection.createSession(perfParams.isSessionTransacted(), - perfParams.isDupsOK() ? Session.DUPS_OK_ACKNOWLEDGE - : Session.AUTO_ACKNOWLEDGE); - - producer = session.createProducer(destination); - - producer.setDeliveryMode(perfParams.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - producer.setDisableMessageID(perfParams.isDisableMessageID()); - - producer.setDisableMessageTimestamp(perfParams.isDisableTimestamp()); - - connection.setExceptionListener(exceptionListener); - } - catch (Exception e) - { - e.printStackTrace(); - } - finally - { - try - { - ic.close(); - } - catch (NamingException e) - { - e.printStackTrace(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/pom.xml ---------------------------------------------------------------------- diff --git a/examples/soak/pom.xml b/examples/soak/pom.xml new file mode 100644 index 0000000..82d1bd7 --- /dev/null +++ b/examples/soak/pom.xml @@ -0,0 +1,29 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.hornetq.examples</groupId> + <artifactId>hornetq-examples</artifactId> + <version>2.5.0-SNAPSHOT</version> + </parent> + + <groupId>org.hornetq.example.soak</groupId> + <artifactId>soak-examples</artifactId> + <packaging>pom</packaging> + <name>HornetQ Soak Examples</name> + + <!-- Properties --> + <properties> + <!-- + Explicitly declaring the source encoding eliminates the following + message: [WARNING] Using platform encoding (UTF-8 actually) to copy + filtered resources, i.e. build is platform dependent! + --> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <modules> + <module>normal</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/README ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/README b/examples/soak/tx-restarts/README deleted file mode 100644 index 42c8655..0000000 --- a/examples/soak/tx-restarts/README +++ /dev/null @@ -1,29 +0,0 @@ -**************************************************** -* Soak Test For TX survival over restarts -**************************************************** - -Run The Test -============== - -To run the test simply use ./build.sh - -It's important that you always clean the data directory before starting the test, as it will validate for sequences generated. - -The test will start and stop a server multiple times. - - -Run the server remotely -======================= - -You can start the server directly if you want, you can just start the server as: - -./run.sh PATH_TO_HORNETQ/examples/soak/tx-restarts/server0 - - -Then you can run the test as: - -./build.sh runRemote - - -And you can now kill and restart the server manually as many times as you want. - http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/build.bat ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/build.bat b/examples/soak/tx-restarts/build.bat deleted file mode 100644 index 1f414eb..0000000 --- a/examples/soak/tx-restarts/build.bat +++ /dev/null @@ -1,13 +0,0 @@ -@echo off - -set "OVERRIDE_ANT_HOME=..\..\..\tools\ant" - -if exist "..\..\..\src\bin\build.bat" ( - rem running from TRUNK - call ..\..\..\src\bin\build.bat %* -) else ( - rem running from the distro - call ..\..\..\bin\build.bat %* -) - -set "OVERRIDE_ANT_HOME=" http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/build.sh ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/build.sh b/examples/soak/tx-restarts/build.sh deleted file mode 100644 index 53ffb0d..0000000 --- a/examples/soak/tx-restarts/build.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/sh - -OVERRIDE_ANT_HOME=../../tools/ant -export OVERRIDE_ANT_HOME - -if [ -f "../../../src/bin/build.sh" ]; then - # running from TRUNK - ../../../src/bin/build.sh "$@" -else - # running from the distro - ../../bin/build.sh "$@" -fi - - - http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/build.xml ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/build.xml b/examples/soak/tx-restarts/build.xml deleted file mode 100644 index 0be1f02..0000000 --- a/examples/soak/tx-restarts/build.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!DOCTYPE project [ - <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent"> - ]> -<!-- - ~ Copyright 2009 Red Hat, Inc. - ~ Red Hat licenses this file to you under the Apache License, version - ~ 2.0 (the "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - ~ implied. See the License for the specific language governing - ~ permissions and limitations under the License. - --> - -<project default="run" name="TX-Restarts soak test"> - - <import file="../../common/build.xml"/> - <property file="ant.properties"/> - <target name="run"> - <antcall target="runExample"> - <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/> - - <param name="java-min-memory" value="1G"/> - <param name="java-max-memory" value="1G"/> - </antcall> - </target> - - <target name="runRemote"> - <antcall target="runExample"> - <param name="example.classname" value="org.hornetq.jms.example.TXRestartSoak"/> - <param name="hornetq.example.runServer" value="false"/> - </antcall> - </target> - -</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/client-jndi.properties ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/client-jndi.properties b/examples/soak/tx-restarts/server0/client-jndi.properties deleted file mode 100644 index 080524f..0000000 --- a/examples/soak/tx-restarts/server0/client-jndi.properties +++ /dev/null @@ -1,3 +0,0 @@ -java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory -java.naming.provider.url=jnp://localhost:1099 -java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/hornetq-beans.xml ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/hornetq-beans.xml b/examples/soak/tx-restarts/server0/hornetq-beans.xml deleted file mode 100644 index 171d373..0000000 --- a/examples/soak/tx-restarts/server0/hornetq-beans.xml +++ /dev/null @@ -1,59 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> - -<deployment xmlns="urn:jboss:bean-deployer:2.0"> - - <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/> - - <!-- JNDI server. Disable this if you don't want JNDI --> - <bean name="JNDIServer" class="org.jnp.server.Main"> - <property name="namingInfo"> - <inject bean="Naming"/> - </property> - <property name="port">1099</property> - <property name="bindAddress">localhost</property> - <property name="rmiPort">1098</property> - <property name="rmiBindAddress">localhost</property> - </bean> - - <!-- MBean server --> - <bean name="MBeanServer" class="javax.management.MBeanServer"> - <constructor factoryClass="java.lang.management.ManagementFactory" - factoryMethod="getPlatformMBeanServer"/> - </bean> - - <!-- The core configuration --> - <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/> - - <!-- The security manager --> - <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl"> - <start ignored="true"/> - <stop ignored="true"/> - </bean> - - <!-- The core server --> - <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl"> - <constructor> - <parameter> - <inject bean="Configuration"/> - </parameter> - <parameter> - <inject bean="MBeanServer"/> - </parameter> - <parameter> - <inject bean="HornetQSecurityManager"/> - </parameter> - </constructor> - <start ignored="true"/> - <stop ignored="true"/> - </bean> - - <!-- The JMS server --> - <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl"> - <constructor> - <parameter> - <inject bean="HornetQServer"/> - </parameter> - </constructor> - </bean> - -</deployment> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/hornetq-configuration.xml ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/hornetq-configuration.xml b/examples/soak/tx-restarts/server0/hornetq-configuration.xml deleted file mode 100644 index 6909bde..0000000 --- a/examples/soak/tx-restarts/server0/hornetq-configuration.xml +++ /dev/null @@ -1,65 +0,0 @@ -<configuration xmlns="urn:hornetq" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> - - - <journal-file-size>102400</journal-file-size> - - <!-- Connectors --> - <connectors> - <connector name="netty-connector"> - <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> - </connector> - </connectors> - - <!-- Acceptors --> - <acceptors> - <acceptor name="netty-acceptor"> - <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> - </acceptor> - </acceptors> - - <address-settings> - <address-setting match="jms.queue.#"> - <max-delivery-attempts>-1</max-delivery-attempts> - <!-- <max-size-bytes>335544320000</max-size-bytes> --> - <max-size-bytes>33554432</max-size-bytes> - <page-size-bytes>16777216</page-size-bytes> - <address-full-policy>PAGE</address-full-policy> - </address-setting> - - </address-settings> - - - <diverts> - <divert name="div1"> - <address>jms.queue.inputQueue</address> - <forwarding-address>jms.queue.diverted1</forwarding-address> - <exclusive>true</exclusive> - </divert> - - <divert name="div2"> - <address>jms.queue.inputQueue</address> - <forwarding-address>jms.queue.diverted2</forwarding-address> - <exclusive>true</exclusive> - </divert> - </diverts> - - - - - <!-- Other config --> - - <security-settings> - <!--security for example queue--> - <security-setting match="jms.queue.#"> - <permission type="createDurableQueue" roles="guest"/> - <permission type="deleteDurableQueue" roles="guest"/> - <permission type="createNonDurableQueue" roles="guest"/> - <permission type="deleteNonDurableQueue" roles="guest"/> - <permission type="consume" roles="guest"/> - <permission type="send" roles="guest"/> - </security-setting> - </security-settings> - -</configuration> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/hornetq-jms.xml ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/hornetq-jms.xml b/examples/soak/tx-restarts/server0/hornetq-jms.xml deleted file mode 100644 index 67c67e6..0000000 --- a/examples/soak/tx-restarts/server0/hornetq-jms.xml +++ /dev/null @@ -1,28 +0,0 @@ -<configuration xmlns="urn:hornetq" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd"> - <!--the connection factory used by the example--> - <connection-factory name="ConnectionFactory"> - <xa>true</xa> - <connectors> - <connector-ref connector-name="netty-connector"/> - </connectors> - <min-large-message-size>100240</min-large-message-size> - <entries> - <entry name="ConnectionFactory"/> - </entries> - </connection-factory> - - <!--the queue used by the example--> - <queue name="inputQueue"> - <entry name="/queue/inputQueue"/> - </queue> - - <queue name="diverted1"> - <entry name="/queue/diverted1"/> - </queue> - - <queue name="diverted2"> - <entry name="/queue/diverted2"/> - </queue> -</configuration> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/hornetq-users.xml ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/hornetq-users.xml b/examples/soak/tx-restarts/server0/hornetq-users.xml deleted file mode 100644 index 934306c..0000000 --- a/examples/soak/tx-restarts/server0/hornetq-users.xml +++ /dev/null @@ -1,7 +0,0 @@ -<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd"> - <!-- the default user. this is used where username is null--> - <defaultuser name="guest" password="guest"> - <role name="guest"/> - </defaultuser> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/server0/jndi.properties ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/server0/jndi.properties b/examples/soak/tx-restarts/server0/jndi.properties deleted file mode 100644 index e2a9832..0000000 --- a/examples/soak/tx-restarts/server0/jndi.properties +++ /dev/null @@ -1,2 +0,0 @@ -java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory -java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java b/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java deleted file mode 100644 index 768a722..0000000 --- a/examples/soak/tx-restarts/src/org/hornetq/jms/example/ClientAbstract.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.example; - -import java.io.File; -import java.io.FileInputStream; -import java.util.Properties; -import java.util.logging.Logger; - -import javax.jms.XAConnection; -import javax.jms.XAConnectionFactory; -import javax.jms.XASession; -import javax.naming.InitialContext; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - -import org.hornetq.core.transaction.impl.XidImpl; -import org.hornetq.utils.UUIDGenerator; - -/** - * WARNING: This is not a sample on how you should handle XA. - * You are supposed to use a TransactionManager. - * This class is doing the job of a TransactionManager that fits for the purpose of this test only, - * however there are many more pitfalls to deal with Transactions. - * - * This is just to stress and soak test Transactions with HornetQ. - * - * And this is dealing with XA directly for the purpose testing only. - * - * @author <a href="mailto:[email protected]">Clebert Suconic</a> - * - * - */ -public abstract class ClientAbstract extends Thread -{ - - // Constants ----------------------------------------------------- - private static final Logger log = Logger.getLogger(ClientAbstract.class.getName()); - - // Attributes ---------------------------------------------------- - - protected InitialContext ctx; - - protected XAConnection conn; - - protected XASession sess; - - protected XAResource activeXAResource; - - protected Xid activeXid; - - protected volatile boolean running = true; - - protected volatile int errors = 0; - - /** - * A commit was called - * case we don't find the Xid, means it was accepted - */ - protected volatile boolean pendingCommit = false; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - protected InitialContext getContext(final int serverId) throws Exception - { - String jndiFilename = "server" + serverId + "/client-jndi.properties"; - File jndiFile = new File(jndiFilename); - Properties props = new Properties(); - FileInputStream inStream = null; - try - { - inStream = new FileInputStream(jndiFile); - props.load(inStream); - } - finally - { - if (inStream != null) - { - inStream.close(); - } - } - return new InitialContext(props); - - } - - public XAConnection getConnection() - { - return conn; - } - - public int getErrorsCount() - { - return errors; - } - - public final void connect() - { - while (running) - { - try - { - disconnect(); - - ctx = getContext(0); - - XAConnectionFactory cf = (XAConnectionFactory)ctx.lookup("/ConnectionFactory"); - - conn = cf.createXAConnection(); - - sess = conn.createXASession(); - - activeXAResource = sess.getXAResource(); - - if (activeXid != null) - { - synchronized (ClientAbstract.class) - { - Xid[] xids = activeXAResource.recover(XAResource.TMSTARTRSCAN); - boolean found = false; - for (Xid recXid : xids) - { - if (recXid.equals(activeXid)) - { - // System.out.println("Calling commit after a prepare on " + this); - found = true; - callCommit(); - } - } - - if (!found) - { - if (pendingCommit) - { - System.out.println("Doing a commit based on a pending commit on " + this); - onCommit(); - } - else - { - System.out.println("Doing a rollback on " + this); - onRollback(); - } - - activeXid = null; - pendingCommit = false; - } - } - } - - connectClients(); - - break; - } - catch (Exception e) - { - ClientAbstract.log.warning("Can't connect to server, retrying"); - disconnect(); - try - { - Thread.sleep(1000); - } - catch (InterruptedException ignored) - { - // if an interruption was sent, we will respect it and leave the loop - break; - } - } - } - } - - @Override - public void run() - { - connect(); - } - - protected void callCommit() throws Exception - { - pendingCommit = true; - activeXAResource.commit(activeXid, false); - pendingCommit = false; - activeXid = null; - onCommit(); - } - - protected void callPrepare() throws Exception - { - activeXAResource.prepare(activeXid); - } - - public void beginTX() throws Exception - { - activeXid = newXID(); - - activeXAResource.start(activeXid, XAResource.TMNOFLAGS); - } - - public void endTX() throws Exception - { - activeXAResource.end(activeXid, XAResource.TMSUCCESS); - callPrepare(); - callCommit(); - } - - public void setRunning(final boolean running) - { - this.running = running; - } - - /** - * @return - */ - private XidImpl newXID() - { - return new XidImpl("tst".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes()); - } - - protected abstract void connectClients() throws Exception; - - protected abstract void onCommit(); - - protected abstract void onRollback(); - - public void disconnect() - { - try - { - if (conn != null) - { - conn.close(); - } - } - catch (Exception ignored) - { - ignored.printStackTrace(); - } - - try - { - if (ctx != null) - { - ctx.close(); - } - } - catch (Exception ignored) - { - ignored.printStackTrace(); - } - - ctx = null; - conn = null; - // it's not necessary to close the session as conn.close() will already take care of that - sess = null; - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java ---------------------------------------------------------------------- diff --git a/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java b/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java deleted file mode 100644 index 5d484f1..0000000 --- a/examples/soak/tx-restarts/src/org/hornetq/jms/example/Receiver.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.jms.example; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; - -import org.hornetq.utils.ReusableLatch; - -/** - * A Receiver - * - * @author <a href="mailto:[email protected]">Clebert Suconic</a> - * - * - */ -public class Receiver extends ClientAbstract -{ - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - private Queue queue; - - // We should leave some messages on paging. We don't want to consume all for this test - private final Semaphore minConsume = new Semaphore(0); - - private final ReusableLatch latchMax = new ReusableLatch(0); - - private static final int MAX_DIFF = 10000; - - // The difference between producer and consuming - private final AtomicInteger currentDiff = new AtomicInteger(0); - - private final String queueJNDI; - - protected long msgs = 0; - - protected int pendingMsgs = 0; - - protected int pendingSemaphores = 0; - - protected MessageConsumer cons; - - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - public Receiver(String queueJNDI) - { - super(); - this.queueJNDI = queueJNDI; - } - - // Public -------------------------------------------------------- - - public void run() - { - super.run(); - - while (running) - { - try - { - beginTX(); - - for (int i = 0 ; i < 1000; i++) - { - Message msg = cons.receive(5000); - if (msg == null) - { - break; - } - - if (msg.getLongProperty("count") != msgs + pendingMsgs) - { - errors++; - System.out.println("count should be " + (msgs + pendingMsgs) + " when it was " + msg.getLongProperty("count") + " on " + queueJNDI); - } - - pendingMsgs++; - if (!minConsume.tryAcquire(1, 5, TimeUnit.SECONDS)) - { - break; - } - - } - - endTX(); - } - catch (Exception e) - { - connect(); - } - - - } - } - - /* (non-Javadoc) - * @see org.hornetq.jms.example.ClientAbstract#connectClients() - */ - @Override - protected void connectClients() throws Exception - { - - queue = (Queue)ctx.lookup(queueJNDI); - - cons = sess.createConsumer(queue); - - conn.start(); - } - - /* (non-Javadoc) - * @see org.hornetq.jms.example.ClientAbstract#onCommit() - */ - @Override - protected void onCommit() - { - msgs += pendingMsgs; - this.currentDiff.addAndGet(-pendingMsgs); - latchMax.countDown(pendingMsgs); - pendingMsgs = 0; - System.out.println("Commit on consumer " + queueJNDI + ", msgs=" + msgs + " currentDiff = " + currentDiff); - } - - /* (non-Javadoc) - * @see org.hornetq.jms.example.ClientAbstract#onRollback() - */ - @Override - protected void onRollback() - { - System.out.println("Rollback on consumer " + queueJNDI + ", msgs=" + msgs); - minConsume.release(pendingMsgs); - pendingMsgs = 0; - } - - public String toString() - { - return "Receiver::" + this.queueJNDI + ", msgs=" + msgs + ", pending=" + pendingMsgs; - } - - /** - * @param pendingMsgs2 - */ - public void messageProduced(int producedMessages) - { - minConsume.release(producedMessages); - currentDiff.addAndGet(producedMessages); - System.out.println("Msg produced on " + this.queueJNDI + ", currentDiff = " + currentDiff); - if (currentDiff.get() > MAX_DIFF) - { - System.out.println("Holding producer for 5 seconds"); - latchMax.setCount(currentDiff.get() - MAX_DIFF); - try - { - latchMax.await(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - } - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - // Private ------------------------------------------------------- - - // Inner classes ------------------------------------------------- - -}
