http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java deleted file mode 100644 index a927279..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java +++ /dev/null @@ -1,628 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.junit.Test; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashSet; -import java.util.Vector; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CopyOnWriteArrayList; - -// see https://issues.apache.org/activemq/browse/AMQ-2985 -// this demonstrated receiving old messages eventually along with validating order receipt -public class DurableSubProcessTest extends org.apache.activemq.TestSupport { - private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessTest.class); - // public static final long RUNTIME = 4 * 60 * 1000; - public static final long RUNTIME = 60 * 1000; - - public static final int SERVER_SLEEP = 2 * 1000; // max - public static final int CARGO_SIZE = 10; // max - - public static final int MAX_CLIENTS = 7; - public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000); - public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000); - public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000); - - public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true; - - - private Topic topic; - - private ClientManager clientManager; - private Server server; - private HouseKeeper houseKeeper; - - static final Vector<Throwable> exceptions = new Vector<Throwable>(); - - public void testProcess() { - try { - server.start(); - clientManager.start(); - - if (ALLOW_SUBSCRIPTION_ABANDONMENT) - houseKeeper.start(); - - Thread.sleep(RUNTIME); - assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } - catch (Throwable e) { - exit("DurableSubProcessTest.testProcess failed.", e); - } - LOG.info("DONE."); - } - - /** - * Creates batch of messages in a transaction periodically. - * The last message in the transaction is always a special - * message what contains info about the whole transaction. - * <p>Notifies the clients about the created messages also. - */ - final class Server extends Thread { - - final ConnectionFactory cf = new HedwigConnectionFactoryImpl(); - - final Object sendMutex = new Object(); - final String[] cargos = new String[500]; - - int transRover = 0; - int messageRover = 0; - - public Server() { - super("Server"); - setDaemon(true); - } - - @Override - public void run() { - try { - while (true) { - DurableSubProcessTest.sleepRandom(SERVER_SLEEP); - send(); - } - } - catch (Throwable e) { - exit("Server.run failed", e); - } - } - - public void send() throws JMSException { - // do not create new clients now - // ToDo: Test this case later. - synchronized (sendMutex) { - int trans = ++transRover; - boolean relevantTrans = random(2) > 1; - ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends this types - int count = random(200); - - LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]"); - - Connection con = cf.createConnection(); - Session sess = con.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod = sess.createProducer(null); - - for (int i = 0; i < count; i++) { - Message message = sess.createMessage(); - message.setIntProperty("ID", ++messageRover); - String type = clientType != null ? clientType.randomMessageType() : - ClientType.randomNonRelevantMessageType(); - message.setStringProperty("TYPE", type); - - if (CARGO_SIZE > 0) - message.setStringProperty("CARGO", getCargo(CARGO_SIZE)); - - prod.send(topic, message); - clientManager.onServerMessage(message); - } - - Message message = sess.createMessage(); - message.setIntProperty("ID", ++messageRover); - message.setIntProperty("TRANS", trans); - message.setBooleanProperty("COMMIT", true); - message.setBooleanProperty("RELEVANT", relevantTrans); - prod.send(topic, message); - clientManager.onServerMessage(message); - - sess.commit(); - sess.close(); - con.close(); - } - } - - private String getCargo(int length) { - if (length == 0) - return null; - - if (length < cargos.length) { - String result = cargos[length]; - if (result == null) { - result = getCargoImpl(length); - cargos[length] = result; - } - return result; - } - return getCargoImpl(length); - } - - private String getCargoImpl(int length) { - StringBuilder sb = new StringBuilder(length); - for (int i = length; --i >=0; ) { - sb.append('a'); - } - return sb.toString(); - } - } - - /** - * Clients listen on different messages in the topic. - * The 'TYPE' property helps the client to select the - * proper messages. - */ - private enum ClientType { - A ("a", "b", "c"), - B ("c", "d", "e"), - C ("d", "e", "f"), - D ("g", "h"); - - public final String[] messageTypes; - public final HashSet<String> messageTypeSet; - public final String selector; - - ClientType(String... messageTypes) { - this.messageTypes = messageTypes; - messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes)); - - StringBuilder sb = new StringBuilder("TYPE in ("); - for (int i = 0; i < messageTypes.length; i++) { - if (i > 0) - sb.append(", "); - sb.append('\'').append(messageTypes[i]).append('\''); - } - sb.append(')'); - selector = sb.toString(); - } - - public static ClientType randomClientType() { - return values()[DurableSubProcessTest.random(values().length - 1)]; - } - - public final String randomMessageType() { - return messageTypes[DurableSubProcessTest.random(messageTypes.length - 1)]; - } - - public static String randomNonRelevantMessageType() { - return Integer.toString(DurableSubProcessTest.random(20)); - } - - public final boolean isRelevant(String messageType) { - return messageTypeSet.contains(messageType); - } - - @Override - public final String toString() { - return this.name() /*+ '[' + selector + ']'*/; - } - } - - /** - * Creates new cliens. - */ - private final class ClientManager extends Thread { - - private int clientRover = 0; - - private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>(); - - public ClientManager() { - super("ClientManager"); - setDaemon(true); - } - - @Override - public void run() { - try { - while (true) { - if (clients.size() < MAX_CLIENTS) - createNewClient(); - - int size = clients.size(); - sleepRandom(size * 3 * 1000, size * 6 * 1000); - } - } - catch (Throwable e) { - exit("ClientManager.run failed.", e); - } - } - - private void createNewClient() throws JMSException { - ClientType type = ClientType.randomClientType(); - - Client client; - synchronized (server.sendMutex) { - client = new Client(++clientRover, type, CLIENT_LIFETIME, CLIENT_ONLINE, CLIENT_OFFLINE); - clients.add(client); - } - client.start(); - - LOG.info(client.toString() + " created. " + this); - } - - public void removeClient(Client client) { - clients.remove(client); - } - - public void onServerMessage(Message message) throws JMSException { - for (Client client: clients) { - client.onServerMessage(message); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder("ClientManager[count="); - sb.append(clients.size()); - sb.append(", clients="); - boolean sep = false; - for (Client client: clients) { - if (sep) sb.append(", "); - else sep = true; - sb.append(client.toString()); - } - sb.append(']'); - return sb.toString(); - } - } - - /** - * Consumes massages from a durable subscription. - * Goes online/offline periodically. Checks the incoming messages - * against the sent messages of the server. - */ - private final class Client extends Thread { - - final ConnectionFactory cf = new HedwigConnectionFactoryImpl(); - - public static final String SUBSCRIPTION_NAME = "subscription"; - - private final int id; - private final String conClientId; - - private final Random lifetime; - private final Random online; - private final Random offline; - private int numIter = 0; - - private final ClientType clientType; - private final String selector; - - private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>(); - - public Client(int id, ClientType clientType, Random lifetime, Random online, Random offline) - throws JMSException { - super("Client" + id); - setDaemon(true); - - this.id = id; - conClientId = "cli" + id; - this.clientType = clientType; - selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector; - - this.lifetime = lifetime; - this.online = online; - this.offline = offline; - - subscribe(); - } - - @Override - public void run() { - long end = System.currentTimeMillis() + lifetime.next(); - try { - boolean sleep = false; - while (true) { - long max = end - System.currentTimeMillis(); - if (max <= 0) - break; - - if (sleep) offline.sleepRandom(); - else sleep = true; - - numIter ++; - process(online.next()); - } - - if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0) - unsubscribe(); - else { - LOG.info("Client abandon the subscription. " + this); - - // housekeeper should sweep these abandoned subscriptions - houseKeeper.abandonedSubscriptions.add(conClientId); - } - } - catch (Throwable e) { - exit(toString() + " failed.", e); - } - - clientManager.removeClient(this); - LOG.info(toString() + " DONE."); - } - - private void process(long millis) throws JMSException { - long end = System.currentTimeMillis() + millis; - long hardEnd = end + 2000; // wait to finish the transaction. - boolean inTransaction = false; - int transCount = 0; - - LOG.info(toString() + " ONLINE."); - Connection con = openConnection(); - Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false); - try { - do { - long max = end - System.currentTimeMillis(); - if (max <= 0) { - if (!inTransaction) - break; - - max = hardEnd - System.currentTimeMillis(); - if (max <= 0) - exit("" + this + " failed: Transaction is not finished."); - } - - Message message = consumer.receive(max); - if (message == null) - continue; - - onClientMessage(message); - - if (message.propertyExists("COMMIT")) { - message.acknowledge(); - - LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") - + ", count=" + transCount + "] in " + this + "."); - - inTransaction = false; - transCount = 0; - } - else { - inTransaction = true; - transCount++; - } - } while (true); - } - finally { - sess.close(); - con.close(); - - LOG.info(toString() + " OFFLINE."); - - // Check if the messages are in the waiting - // list for long time. - Message topMessage = waitingList.peek(); - if (topMessage != null) - checkDeliveryTime(topMessage); - } - } - - public void onServerMessage(Message message) throws JMSException { - if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) { - if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT"))) - waitingList.add(message); - } - else { - String messageType = message.getStringProperty("TYPE"); - if (clientType.isRelevant(messageType)) - waitingList.add(message); - } - } - - public void onClientMessage(Message message) { - Message serverMessage = waitingList.poll(); - try { - if (serverMessage == null) - exit("" + this + " failed: There is no next server message, but received: " + message); - - Integer receivedId = (Integer) message.getObjectProperty("ID"); - Integer serverId = (Integer) serverMessage.getObjectProperty("ID"); - if (receivedId == null || serverId == null) - exit("" + this + " failed: message ID not found.\r\n" + - " received: " + message + "\r\n" + - " server: " + serverMessage); - - if (!serverId.equals(receivedId)) - exit("" + this + " failed: Received wrong message.\r\n" + - " received: " + message + "\r\n" + - " server: " + serverMessage); - - checkDeliveryTime(message); - } - catch (Throwable e) { - exit("" + this + ".onClientMessage failed.\r\n" + - " received: " + message + "\r\n" + - " server: " + serverMessage, e); - } - } - - /** - * Checks if the message was not delivered fast enough. - */ - public void checkDeliveryTime(Message message) throws JMSException { - long creation = message.getJMSTimestamp(); - // + 1000 for the various additional setup times that activemq supports (if I am not wrong !) - long min = System.currentTimeMillis() - numIter * ((offline.max + online.min) + 1000); - - if (min > creation) { - SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS"); - exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) - + ", min: " + df.format(new Date(min)) + "\r\n" + message); - } - } - - private Connection openConnection() throws JMSException { - Connection con = cf.createConnection(); - con.setClientID(conClientId); - con.start(); - return con; - } - - private void subscribe() throws JMSException { - Connection con = openConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true); - session.close(); - con.close(); - } - - private void unsubscribe() throws JMSException { - try { - Connection con = openConnection(); - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - session.unsubscribe(SUBSCRIPTION_NAME); - session.close(); - con.close(); - } catch (JMSException jsEx){ // ignore - } - } - - @Override - public String toString() { - return "Client[id=" + id + ", type=" + clientType + "]"; - } - } - - /** - * Sweeps out not-used durable subscriptions. - */ - private final class HouseKeeper extends Thread { - - private HouseKeeper() { - super("HouseKeeper"); - setDaemon(true); - } - - public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>(); - - @Override - public void run() { - while (true) { - try { - Thread.sleep(60 * 1000); - sweep(); - } - catch (InterruptedException ex) { - break; - } - catch (Throwable e) { - Exception log = new Exception("HouseKeeper failed.", e); - log.printStackTrace(); - } - } - } - - private void sweep() throws Exception { - LOG.info("Housekeeper sweeping."); - - int closed = 0; - ArrayList<String> sweeped = new ArrayList<String>(); - try { - for (String clientId: abandonedSubscriptions) { - sweeped.add(clientId); - LOG.info("Sweeping out subscription of " + clientId + "."); - closed++; - } - } - finally { - abandonedSubscriptions.removeAll(sweeped); - } - - LOG.info("Housekeeper sweeped out " + closed + " subscriptions."); - } - } - - public static int random(int max) { - return (int) (Math.random() * (max + 1)); - } - - public static int random(int min, int max) { - return random(max - min) + min; - } - - public static void sleepRandom(int maxMillis) throws InterruptedException { - Thread.sleep(random(maxMillis)); - } - - public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException { - Thread.sleep(random(minMillis, maxMillis)); - } - - public static final class Random { - - final int min; - final int max; - - Random(int min, int max) { - this.min = min; - this.max = max; - } - - public int next() { - return random(min, max); - } - - public void sleepRandom() throws InterruptedException { - DurableSubProcessTest.sleepRandom(min, max); - } - } - - public static void exit(String message) { - exit(message, null); - } - - public static void exit(String message, Throwable e) { - Throwable log = new RuntimeException(message, e); - log.printStackTrace(); - LOG.error(message, e); - exceptions.add(e); - fail(message); - } - - protected void setUp() throws Exception { - topic = (Topic) createDestination(); - - clientManager = new ClientManager(); - server = new Server(); - houseKeeper = new HouseKeeper(); - - super.setUp(); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java deleted file mode 100644 index b8d12da..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubSelectorDelayTest.java +++ /dev/null @@ -1,256 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR ONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DurableSubSelectorDelayTest extends JmsTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(DurableSubSelectorDelayTest.class); - - public static final long RUNTIME = 3 * 60 * 1000; - - private Topic topic; - - public void testProcess() throws Exception { - - DurableSubscriber subscribers[] = new DurableSubscriber[10]; - - for (int i = 0; i < subscribers.length; i++) { - subscribers[i] = new DurableSubscriber(i); - } - - MsgProducer msgProducer = new MsgProducer(); - msgProducer.start(); - - for (int i = 0; i < subscribers.length; i++) { - subscribers[i].process(); - } - - - // wait for server to finish - msgProducer.join(); - - for (int j = 0; j < subscribers.length; j++) { - LOG.info("Unsubscribing subscriber " + subscribers[j]); - subscribers[j].unsubscribe(); - } - - // allow the clean up thread time to run - TimeUnit.MINUTES.sleep(2); - LOG.info("DONE."); - } - - /** - * Message Producer - */ - final class MsgProducer extends Thread { - - final ConnectionFactory cf = new HedwigConnectionFactoryImpl(); - - int transRover = 0; - int messageRover = 0; - - public MsgProducer() { - super("MsgProducer"); - setDaemon(true); - } - - @Override - public void run() { - long endTime = RUNTIME + System.currentTimeMillis(); - - try { - while (endTime > System.currentTimeMillis()) { - Thread.sleep(400); - send(); - } - } catch (Throwable e) { - e.printStackTrace(System.out); - throw new RuntimeException(e); - } - } - - public void send() throws JMSException { - - int trans = ++transRover; - boolean relevantTrans = true; - int count = 40; - - LOG.info("Sending Trans[id=" + trans + ", count=" - + count + "]"); - - Connection con = cf.createConnection(); - - Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer prod = sess.createProducer(null); - - for (int i = 0; i < count; i++) { - Message message = sess.createMessage(); - message.setIntProperty("ID", ++messageRover); - message.setIntProperty("TRANS", trans); - message.setBooleanProperty("RELEVANT", false); - prod.send(topic, message); - } - - Message message = sess.createMessage(); - message.setIntProperty("ID", ++messageRover); - message.setIntProperty("TRANS", trans); - message.setBooleanProperty("COMMIT", true); - message.setBooleanProperty("RELEVANT", relevantTrans); - prod.send(topic, message); - - LOG.info("Committed Trans[id=" + trans + ", count=" - + count + "], ID=" + messageRover); - - sess.close(); - con.close(); - } - } - - /** - * Consumes massages from a durable subscription. Goes online/offline - * periodically. Checks the incoming messages against the sent messages of - * the server. - */ - private final class DurableSubscriber { - - final ConnectionFactory cf = new HedwigConnectionFactoryImpl(); - - private final String subName ; - - private final int id; - private final String conClientId; - private final String selector; - - private final Session sess; - private final MessageConsumer consumer; - private final Connection con; - - public DurableSubscriber(int id) throws JMSException { - this.id = id; - conClientId = "cli" + id; - subName = "subscription"+ id; - selector ="RELEVANT = true"; - con = openConnection(); - sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = sess.createDurableSubscriber(topic, subName, selector, false); - - } - - private void process() throws JMSException { - long end = System.currentTimeMillis() + 20000; - int transCount = 0; - - LOG.info(toString() + " ONLINE."); - - try { - - do { - long max = end - System.currentTimeMillis(); - - if (max <= 0) { - break; - } - - Message message = consumer.receive(max); - if (message == null) { - continue; - } - - LOG.info("Received Trans[id=" - + message.getIntProperty("TRANS") + ", count=" - + transCount + "] in " + this + "."); - - } while (true); - - } finally { - sess.close(); - con.close(); - - LOG.info(toString() + " OFFLINE."); - } - } - - private Connection openConnection() throws JMSException { - Connection con = cf.createConnection(); - con.setClientID(conClientId); - con.start(); - return con; - } - - private void unsubscribe() throws JMSException { - Connection con = openConnection(); - Session session = con - .createSession(false, Session.AUTO_ACKNOWLEDGE); - // Call a dummp createDurableSubscriber (o.a.h.jms.package-info.html for more on why). - session.createDurableSubscriber(topic, subName, selector, false); - session.unsubscribe(subName); - session.close(); - con.close(); - } - - @Override - public String toString() { - return "DurableSubscriber[id=" + id + "]"; - } - } - - public void setUp() throws Exception { - super.setUp(); - topic = SessionImpl.asTopic("TopicT"); - } - - public String getName() { - return "DurableSubSelectorDelayTest"; - } - - private static boolean delete(File path) { - if (path == null) - return true; - - if (path.isDirectory()) { - for (File file : path.listFiles()) { - delete(file); - } - } - return path.delete(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java deleted file mode 100644 index 0a9968b..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableSubscriptionHangTestCase.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.util.concurrent.TimeUnit; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicConnection; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - - -import org.apache.commons.lang.RandomStringUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import static org.junit.Assert.assertNotNull; - -public class DurableSubscriptionHangTestCase extends JmsTestBase { - private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class); - final static String clientID = "myId"; - private static final String topicName = "myTopic"; - private static final String durableSubName = "mySub"; - - public void testHanging() throws Exception - { - registerDurableSubscription(); - produceExpiredAndOneNonExpiredMessages(1000); - TimeUnit.SECONDS.sleep(10); // make sure messages are expired - int numMessages = collectMessagesFromDurableSubscriptionForOneMinute(); - assert 1 == numMessages : "Expected " + 1 + ", received " + numMessages; - } - - private void produceExpiredAndOneNonExpiredMessages(final int messageCount) throws JMSException { - HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl(); - TopicConnection connection = connectionFactory.createTopicConnection(); - TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(topicName); - MessageProducer producer = session.createProducer(topic); - producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1)); - for(int i=0; i<messageCount; i++) { - sendRandomMessage(session, producer); - } - producer.setTimeToLive(TimeUnit.DAYS.toMillis(1)); - sendRandomMessage(session, producer); - connection.close(); - LOG.info("produceExpiredAndOneNonExpiredMessages done"); - } - - private void registerDurableSubscription() throws JMSException - { - HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl(); - TopicConnection connection = connectionFactory.createTopicConnection(); - connection.setClientID(clientID); - TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = topicSession.createTopic(topicName); - TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName); - connection.start(); - durableSubscriber.close(); - connection.close(); - LOG.info("Durable Sub Registered"); - } - - private int collectMessagesFromDurableSubscriptionForOneMinute() throws Exception - { - HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl(); - TopicConnection connection = connectionFactory.createTopicConnection(); - - connection.setClientID(clientID); - TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = topicSession.createTopic(topicName); - connection.start(); - TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName); - LOG.info("About to receive messages"); - int retval = 0; - while (true){ - Message message = subscriber.receive(2000); - if (null == message) { - break; - } - retval ++; - } - subscriber.close(); - connection.close(); - LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done"); - - return retval; - } - - private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException { - TextMessage textMessage = session.createTextMessage(); - textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz")); - producer.send(textMessage); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java deleted file mode 100644 index edf180b..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/MyObject.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.usecases; - -import java.io.Serializable; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.IOException; -import java.io.ObjectStreamException; -import java.util.concurrent.atomic.AtomicInteger; - -public class MyObject implements Serializable { - - private String message; - private AtomicInteger writeObjectCalled = new AtomicInteger(0); - private AtomicInteger readObjectCalled = new AtomicInteger(0); - private AtomicInteger readObjectNoDataCalled = new AtomicInteger(0); - - public MyObject(String message) { - this.setMessage(message); - } - - public void setMessage(String message) { - this.message = message; - } - - public String getMessage() { - return message; - } - - private void writeObject(java.io.ObjectOutputStream out) throws IOException { - writeObjectCalled.incrementAndGet(); - out.defaultWriteObject(); - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - readObjectCalled.incrementAndGet(); - } - - private void readObjectNoData() throws ObjectStreamException { - readObjectNoDataCalled.incrementAndGet(); - } - - public int getWriteObjectCalled() { - return writeObjectCalled.get(); - } - - public int getReadObjectCalled() { - return readObjectCalled.get(); - } - - public int getReadObjectNoDataCalled() { - return readObjectNoDataCalled.get(); - } -} - - http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java deleted file mode 100644 index adfb47d..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java +++ /dev/null @@ -1,340 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - - -import javax.jms.TextMessage; -import javax.jms.Topic; -import static org.junit.Assert.*; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NonBlockingConsumerRedeliveryTest extends JmsTestBase { - private static final Logger LOG = LoggerFactory.getLogger(NonBlockingConsumerRedeliveryTest.class); - - private final String destinationName = "Destination"; - // private final int MSG_COUNT = 100; - - private HedwigConnectionFactoryImpl connectionFactory; - - public void testMessageDeleiveredWhenNonBlockingEnabled() throws Exception { - - final List<String> received = new ArrayList<String>(16); - final List<String> beforeRollback = new ArrayList<String>(16); - final List<String> afterRollback = new ArrayList<String>(16); - - HedwigConnectionImpl connection = connectionFactory.createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(destinationName); - MessageConsumer consumer = session.createConsumer(destination); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - // should not happen ... - e.printStackTrace(); - } - } - }); - - final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1; - - sendMessages(MSG_COUNT); - - session.commit(); - connection.start(); - - - Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages."); - return received.size() == MSG_COUNT; - } - }); - - assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages, got " - + received.size() + ".",MSG_COUNT == received.size()); - - beforeRollback.addAll(received); - received.clear(); - session.rollback(); - - assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages since rollback."); - return received.size() == MSG_COUNT; - } - } - )); - - afterRollback.addAll(received); - received.clear(); - - assertEquals(beforeRollback.size(), afterRollback.size()); - assertEquals(beforeRollback, afterRollback); - session.commit(); - } - - public void testMessageDeleiveryDoesntStop() throws Exception { - - final List<String> received = Collections.synchronizedList(new ArrayList<String>(16)); - final List<String> beforeRollback = new ArrayList<String>(16); - final List<String> afterRollback = new ArrayList<String>(16); - - HedwigConnectionImpl connection = connectionFactory.createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(destinationName); - MessageConsumer consumer = session.createConsumer(destination); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - // should not happen - e.printStackTrace(); - } - } - }); - - final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() / 2 - 1; - - sendMessages(MSG_COUNT); - connection.start(); - - assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages."); - return received.size() == MSG_COUNT; - } - } - )); - - beforeRollback.addAll(received); - received.clear(); - session.rollback(); - - sendMessages(MSG_COUNT); - - { - boolean messagesReceived = Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages since rollback."); - return received.size() == MSG_COUNT * 2; - } - }); - assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages, received " - + received.size() + " messages.", messagesReceived); - } - - afterRollback.addAll(received); - received.clear(); - - assertEquals(beforeRollback.size() * 2, afterRollback.size()); - - session.commit(); - } - - public void testNonBlockingMessageDeleiveryIsDelayed() throws Exception { - final List<String> received = new ArrayList<String>(16); - - HedwigConnectionImpl connection = (HedwigConnectionImpl) connectionFactory.createConnection(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(destinationName); - MessageConsumer consumer = session.createConsumer(destination); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - // should not happen - e.printStackTrace(); - } - } - }); - - final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1; - - sendMessages(MSG_COUNT); - connection.start(); - - assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages."); - return received.size() == MSG_COUNT; - } - } - )); - - received.clear(); - session.rollback(); - - { - boolean condition = Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return received.size() > 0; - } - }, TimeUnit.SECONDS.toMillis(4) - ); - // We do not have any notion of delaying rederlivery - so we immediately get the message (unlike activemq's - // connection.getRedeliveryPolicy().setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(6)); - // assertFalse("Delayed redelivery test not expecting any messages yet. got " - // + received.size() + " messages", condition); - assertTrue("Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages."); - return received.size() == MSG_COUNT; - } - } - )); - } - - session.commit(); - session.close(); - } - - public void testNonBlockingMessageDeleiveryWithRollbacks() throws Exception { - final List<String> received = new ArrayList<String>(16); - - HedwigConnectionImpl connection = (HedwigConnectionImpl) connectionFactory.createConnection(); - final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - final Destination destination = session.createTopic(destinationName); - final MessageConsumer consumer = session.createConsumer(destination); - - consumer.setMessageListener(new MessageListener() { - @Override - public void onMessage(Message message) { - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - // should not happen - e.printStackTrace(); - } - } - }); - - final int MSG_COUNT = connection.getHedwigClientConfig().getMaximumOutstandingMessages() - 1; - - sendMessages(MSG_COUNT); - connection.start(); - - assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages."); - return received.size() == MSG_COUNT; - } - } - )); - - received.clear(); - - consumer.setMessageListener(new MessageListener() { - - int count = 0; - - @Override - public void onMessage(Message message) { - - if (++count > 10) { - try { - session.rollback(); - LOG.info("Rolling back session."); - count = 0; - } catch (JMSException e) { - LOG.warn("Caught an unexcepted exception: " + e.getMessage()); - } - } else { - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - // should not happen - e.printStackTrace(); - } - try { - session.commit(); - } catch (JMSException e) { - LOG.warn("Caught an unexcepted exception: " + e.getMessage()); - } - } - } - }); - - session.rollback(); - - assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", - Wait.waitFor(new Wait.Condition(){ - public boolean isSatisified() throws Exception { - LOG.info("Consumer has received " + received.size() + " messages since rollback."); - return received.size() == MSG_COUNT; - } - } - )); - - assertEquals(MSG_COUNT, received.size()); - session.commit(); - } - - private void sendMessages(final int MSG_COUNT) throws Exception { - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createTopic(destinationName); - MessageProducer producer = session.createProducer(destination); - for(int i = 0; i < MSG_COUNT; ++i) { - producer.send(session.createTextMessage("" + i)); - } - } - - public void setUp() throws Exception { - super.setUp(); - connectionFactory = new HedwigConnectionFactoryImpl(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java deleted file mode 100644 index fa7b0ea..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ObjectMessageNotSerializableTest.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Topic; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.Test; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.CombinationTestSupport; - -import javax.jms.Destination; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class ObjectMessageNotSerializableTest extends CombinationTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(ObjectMessageNotSerializableTest.class); - - AtomicInteger numReceived = new AtomicInteger(0); - final Vector<Throwable> exceptions = new Vector<Throwable>(); - - public static Test suite() { - return suite(ObjectMessageNotSerializableTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - protected void setUp() throws Exception { - super.setUp(); - exceptions.clear(); - } - - public void testSendNotSerializeableObjectMessage() throws Exception { - - final Destination destination = SessionImpl.asTopic("testT"); - final MyObject obj = new MyObject("A message"); - - final CountDownLatch consumerStarted = new CountDownLatch(1); - - Thread vmConsumerThread = new Thread("Consumer Thread") { - public void run() { - try { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - consumerStarted.countDown(); - ObjectMessage message = (ObjectMessage) consumer.receive(30000); - if ( message != null ) { - MyObject object = (MyObject)message.getObject(); - LOG.info("Got message " + object.getMessage()); - numReceived.incrementAndGet(); - } - consumer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - vmConsumerThread.start(); - - Thread producingThread = new Thread("Producing Thread") { - public void run() { - try { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - ObjectMessage message = session.createObjectMessage(); - message.setObject(obj); - producer.send(message); - producer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - - assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS)); - producingThread.start(); - - vmConsumerThread.join(); - producingThread.join(); - - assert obj.getWriteObjectCalled() > 0 : "writeObject not called"; - assert 0 == obj.getReadObjectCalled() : "readObject called"; - assert 0 == obj.getReadObjectNoDataCalled() : "readObjectNoData called ?"; - - assertEquals("Got expected messages", 1, numReceived.get()); - assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty()); - } - - public void testSendNotSerializeableObjectMessageOverTcp() throws Exception { - final Destination destination = SessionImpl.asTopic("testTopic"); - final MyObject obj = new MyObject("A message"); - - final CountDownLatch consumerStarted = new CountDownLatch(3); - final Vector<Throwable> exceptions = new Vector<Throwable>(); - Thread vmConsumerThread = new Thread("Consumer Thread") { - public void run() { - try { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - consumerStarted.countDown(); - ObjectMessage message = (ObjectMessage)consumer.receive(30000); - if ( message != null ) { - MyObject object = (MyObject)message.getObject(); - LOG.info("Got message " + object.getMessage()); - numReceived.incrementAndGet(); - } - consumer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - vmConsumerThread.start(); - - Thread tcpConsumerThread = new Thread("Consumer Thread") { - public void run() { - try { - - HedwigConnectionFactoryImpl factory = - new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - consumerStarted.countDown(); - ObjectMessage message = (ObjectMessage)consumer.receive(30000); - if ( message != null ) { - MyObject object = (MyObject)message.getObject(); - LOG.info("Got message " + object.getMessage()); - numReceived.incrementAndGet(); - assert object.getReadObjectCalled() > 0 : "readObject called"; - } - consumer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - tcpConsumerThread.start(); - - - Thread notherVmConsumerThread = new Thread("Consumer Thread") { - public void run() { - try { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - connection.start(); - consumerStarted.countDown(); - ObjectMessage message = (ObjectMessage)consumer.receive(30000); - if ( message != null ) { - MyObject object = (MyObject)message.getObject(); - LOG.info("Got message " + object.getMessage()); - numReceived.incrementAndGet(); - } - consumer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - notherVmConsumerThread.start(); - - Thread producingThread = new Thread("Producing Thread") { - public void run() { - try { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - ObjectMessage message = (ObjectMessage)session.createObjectMessage(); - message.setObject(obj); - producer.send(message); - producer.close(); - } catch (Throwable ex) { - exceptions.add(ex); - } - } - }; - - assertTrue("consumers started", consumerStarted.await(10, TimeUnit.SECONDS)); - producingThread.start(); - - vmConsumerThread.join(); - tcpConsumerThread.join(); - notherVmConsumerThread.join(); - producingThread.join(); - - assertEquals("writeObject called", 1, obj.getWriteObjectCalled()); - assertEquals("readObject called", 0, obj.getReadObjectCalled()); - assertEquals("readObjectNoData called", 0, obj.getReadObjectNoDataCalled()); - - assertEquals("Got expected messages", 3, numReceived.get()); - assertTrue("no unexpected exceptions: " + exceptions, exceptions.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java deleted file mode 100644 index 3f570df..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ProducerConsumerTestSupport.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - - -/** - * Base class for simple test cases using a single connection, session - * producer and consumer - */ -public class ProducerConsumerTestSupport extends TestSupport { - protected Connection connection; - protected Session session; - protected MessageProducer producer; - protected MessageConsumer consumer; - protected Destination destination; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - destination = this.createDestination(getSubject()); - producer = session.createProducer(destination); - consumer = session.createConsumer(destination); - connection.start(); - } - - protected void tearDown() throws Exception { - consumer.close(); - producer.close(); - session.close(); - connection.close(); - super.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java deleted file mode 100644 index 6fd3ef5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnDurableTopicConsumedMessageTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Topic; - -public class PublishOnDurableTopicConsumedMessageTest extends PublishOnTopicConsumedMessageTest { - - protected void setUp() throws Exception { - this.durable = true; - super.setUp(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java deleted file mode 100644 index 31d0219..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageProducer; - -import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; -import org.apache.hedwig.jms.message.MessageImpl; -import org.apache.hedwig.jms.message.MessageUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PublishOnTopicConsumedMessageTest extends JmsTopicSendReceiveWithTwoConnectionsTest { - - private static final Logger LOG = LoggerFactory.getLogger(PublishOnTopicConsumedMessageTest.class); - - private MessageProducer replyProducer; - - public synchronized void onMessage(Message message) { - - // lets resend the message somewhere else - try { - Message msgCopy = MessageUtil.createMessageCopy(null, message); - replyProducer.send(msgCopy); - - // log.info("Sending reply: " + message); - super.onMessage(message); - } catch (JMSException e) { - LOG.info("Failed to send message: " + e); - e.printStackTrace(); - } - } - - protected void setUp() throws Exception { - super.setUp(); - - Destination replyDestination = null; - - if (topic) { - replyDestination = receiveSession.createTopic("REPLY." + getSubject()); - } else { - replyDestination = receiveSession.createTopic("REPLY." + getSubject()); - } - - replyProducer = receiveSession.createProducer(replyDestination); - LOG.info("Created replyProducer: " + replyProducer); - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java deleted file mode 100644 index 0327f66..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.test.TestSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SubscribeClosePublishThenConsumeTest extends TestSupport { - private static final Logger LOG = LoggerFactory.getLogger(SubscribeClosePublishThenConsumeTest.class); - - public void testDurableTopic() throws Exception { - HedwigConnectionFactoryImpl connectionFactory = new HedwigConnectionFactoryImpl(); - - String topicName = "TestTopic"; - String clientID = getName(); - String subscriberName = "MySubscriber:" + System.currentTimeMillis(); - - Connection connection = connectionFactory.createConnection(); - connection.setClientID(clientID); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(topicName); - - // this should register a durable subscriber, we then close it to - // test that we get messages from the producer later on - TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriberName); - connection.start(); - - topic = null; - subscriber.close(); - subscriber = null; - session.close(); - session = null; - - // Create the new connection before closing to avoid the broker shutting - // down. - // now create a new Connection, Session & Producer, send some messages & - // then close - Connection t = connectionFactory.createConnection(); - connection.close(); - connection = t; - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - topic = session.createTopic(topicName); - MessageProducer producer = session.createProducer(topic); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - TextMessage textMessage = session.createTextMessage("Hello World"); - producer.send(textMessage); - textMessage = null; - - topic = null; - session.close(); - session = null; - - // Now (re)register the Durable subscriber, setup a listener and wait - // for messages that should - // have been published by the previous producer - t = connectionFactory.createConnection(); - connection.close(); - connection = t; - - connection.setClientID(clientID); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - topic = session.createTopic(topicName); - - subscriber = session.createDurableSubscriber(topic, subscriberName); - connection.start(); - - LOG.info("Started connection - now about to try receive the textMessage"); - - long time = System.currentTimeMillis(); - Message message = subscriber.receive(15000L); - long elapsed = System.currentTimeMillis() - time; - - LOG.info("Waited for: " + elapsed + " millis"); - - assertNotNull("Should have received the message we published by now", message); - assertTrue("should be text textMessage", message instanceof TextMessage); - textMessage = (TextMessage)message; - assertEquals("Hello World", textMessage.getText()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java deleted file mode 100644 index c2902cf..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/TestSupport.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Topic; -import javax.jms.Connection; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.SessionImpl; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import junit.framework.TestCase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.hedwig.jms.message.MessageImpl; - - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Useful base class for unit test cases - */ -// Does not contain any testcases. -@Ignore -public class TestSupport extends JmsTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class); - - protected HedwigConnectionFactoryImpl connectionFactory; - protected boolean topic = true; - - public TestSupport() { - super(); - } - - public TestSupport(String name) { - super(name); - } - - protected MessageImpl createMessage() { - return new MessageImpl(null); - } - - protected Destination createDestination(String subject) { - if (topic) { - return SessionImpl.asTopic(subject); - } else { - return SessionImpl.asTopic(subject); - } - } - - protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException { - assertTextMessagesEqual("", firstSet, secondSet); - } - - /** - * @param messsage - * @param firstSet - * @param secondSet - */ - protected void assertTextMessagesEqual(String messsage, Message[] firstSet, - Message[] secondSet) throws JMSException { - assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length); - for (int i = 0; i < secondSet.length; i++) { - TextMessage m1 = (TextMessage)firstSet[i]; - TextMessage m2 = (TextMessage)secondSet[i]; - assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2); - } - } - - protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException { - assertEquals("", m1, m2); - } - - /** - * @param message - * @param firstSet - * @param secondSet - */ - protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException { - assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); - if (m1 == null) { - return; - } - assertEquals(message, m1.getText(), m2.getText()); - } - - protected void assertEquals(Message m1, Message m2) throws JMSException { - assertEquals("", m1, m2); - } - - /** - * @param message - * @param firstSet - * @param secondSet - */ - protected void assertEquals(String message, Message m1, Message m2) throws JMSException { - assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); - if (m1 == null) { - return; - } - assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass()); - if (m1 instanceof TextMessage) { - assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2); - } else { - assertEquals(message, m1, m2); - } - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - /** - * Factory method to create a new connection - */ - protected Connection createConnection() throws Exception { - return getConnectionFactory().createConnection(); - } - - public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception { - if (connectionFactory == null) { - connectionFactory = createConnectionFactory(); - assertTrue("Should have created a connection factory!", connectionFactory != null); - } - return connectionFactory; - } - - protected String getConsumerSubject() { - return getSubject(); - } - - protected String getProducerSubject() { - return getSubject(); - } - - protected String getSubject() { - return getClass().getName() + "." + getName(); - } -}
