http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java deleted file mode 100644 index cadb071..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JMSUsecaseTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - -import javax.jms.Destination; - -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.message.MessageImpl; - -public class JMSUsecaseTest extends JmsTestSupport { - - public Destination destination; - public int deliveryMode; - public int prefetch; - public MessagingSessionFacade.DestinationType destinationType; - public boolean durableConsumer; - - public static Test suite() { - return suite(JMSUsecaseTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - public void initCombosForTestSendReceive() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testSendReceive() throws Exception { - // Send a message to the broker. - connection.start(); - SessionImpl session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageProducer producer = session.createProducer(destination); - MessageConsumer consumer = session.createConsumer(destination); - MessageImpl message = new MessageImpl(session); - producer.send(message); - - // Make sure only 1 message was delivered. - assertNotNull(consumer.receive(1000)); - assertNull(consumer.receiveNoWait()); - } - - public void initCombosForTestSendReceiveTransacted() { - addCombinationValues("deliveryMode", new Object[] { - Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - } - - public void testSendReceiveTransacted() throws Exception { - // Send a message to the broker. - connection.start(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - destination = createDestination(session, destinationType); - MessageProducer producer = session.createProducer(destination); - MessageConsumer consumer = session.createConsumer(destination); - producer.send(session.createTextMessage("test")); - - // Message should not be delivered until commit. - assertNull(consumer.receiveNoWait()); - session.commit(); - - // Make sure only 1 message was delivered. - Message message = consumer.receive(1000); - assertNotNull(message); - assertFalse(message.getJMSRedelivered()); - assertNull(consumer.receiveNoWait()); - - // Message should be redelivered is rollback is used. - session.rollback(); - - // Make sure only 1 message was delivered. - message = consumer.receive(2000); - assertNotNull(message); - assertTrue(message.getJMSRedelivered()); - assertNull(consumer.receiveNoWait()); - - // If we commit now, the message should not be redelivered. - session.commit(); - assertNull(consumer.receiveNoWait()); - } - -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java deleted file mode 100644 index ec5243f..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckListenerTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class JmsAutoAckListenerTest extends TestSupport implements MessageListener { - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Tests if acknowleged messages are being consumed. - * - * @throws javax.jms.JMSException - */ - public void testAckedMessageAreConsumed() throws Exception { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic queue = session.createTopic("test"); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - consumer.setMessageListener(this); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - - Thread.sleep(10000); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Attempt to Consume the message...check if message was acknowledge - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - Message msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } - - public void onMessage(Message message) { - assertNotNull(message); - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java deleted file mode 100644 index 13eaa27..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsAutoAckTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class JmsAutoAckTest extends TestSupport { - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Tests if acknowleged messages are being consumed. - * - * @throws javax.jms.JMSException - */ - public void testAckedMessageAreConsumed() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic queue = session.createTopic("test"); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - - // Reset the session. - session.close(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java deleted file mode 100644 index dd914b5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsBenchmark.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.Test; - - - -import javax.jms.Destination; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmarks the broker by starting many consumer and producers against the - * same destination. Make sure you run with jvm option -server (makes a big - * difference). The tests simulate storing 1000 1k jms messages to see the rate - * of processing msg/sec. - */ -public class JmsBenchmark extends JmsTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(JmsBenchmark.class); - - private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5)); - private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10")); - private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", - "" + 1000 * 60)); - private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10")); - private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10")); - - public Destination destination; - - public static Test suite() { - return suite(JmsBenchmark.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(JmsBenchmark.class); - } - - public void initCombos() { - addCombinationValues("destination", new Object[] {SessionImpl.asTopic("TEST")}); - } - - protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { - return new HedwigConnectionFactoryImpl(); - } - - /** - * @throws Throwable - */ - public void testConcurrentSendReceive() throws Throwable { - - final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT)); - final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT)); - final CountDownLatch sampleTimeDone = new CountDownLatch(1); - - final AtomicInteger producedMessages = new AtomicInteger(0); - final AtomicInteger receivedMessages = new AtomicInteger(0); - - final Callable producer = new Callable() { - public Object call() throws JMSException, InterruptedException { - Connection connection = factory.createConnection(); - connections.add(connection); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - BytesMessage message = session.createBytesMessage(); - message.writeBytes(new byte[1024]); - connection.start(); - connectionsEstablished.release(); - - while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) { - producer.send(message); - producedMessages.incrementAndGet(); - } - - connection.close(); - workerDone.release(); - return null; - } - }; - - final Callable consumer = new Callable() { - public Object call() throws JMSException, InterruptedException { - Connection connection = factory.createConnection(); - connections.add(connection); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message msg) { - receivedMessages.incrementAndGet(); - } - }); - connection.start(); - - connectionsEstablished.release(); - sampleTimeDone.await(); - - connection.close(); - workerDone.release(); - return null; - } - }; - - final Throwable workerError[] = new Throwable[1]; - for (int i = 0; i < PRODUCER_COUNT; i++) { - new Thread("Producer:" + i) { - public void run() { - try { - producer.call(); - } catch (Throwable e) { - e.printStackTrace(); - workerError[0] = e; - } - } - }.start(); - } - - for (int i = 0; i < CONSUMER_COUNT; i++) { - new Thread("Consumer:" + i) { - public void run() { - try { - consumer.call(); - } catch (Throwable e) { - e.printStackTrace(); - workerError[0] = e; - } - } - }.start(); - } - - LOG.info(getName() + ": Waiting for Producers and Consumers to startup."); - connectionsEstablished.acquire(); - LOG.info("Producers and Consumers are now running. Waiting for system to reach steady state: " - + (SAMPLE_DELAY / 1000.0f) + " seconds"); - Thread.sleep(1000 * 10); - - LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds"); - - for (int i = 0; i < SAMPLES; i++) { - - long start = System.currentTimeMillis(); - producedMessages.set(0); - receivedMessages.set(0); - - Thread.sleep(SAMPLE_DURATION); - - long end = System.currentTimeMillis(); - int r = receivedMessages.get(); - int p = producedMessages.get(); - - LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " - + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec"); - } - - LOG.info("Sample done."); - sampleTimeDone.countDown(); - - workerDone.acquire(); - if (workerError[0] != null) { - throw workerError[0]; - } - - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java deleted file mode 100644 index 78d7fb3..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckListenerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class JmsClientAckListenerTest extends TestSupport implements MessageListener { - - private Connection connection; - private boolean dontAck; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Tests if acknowleged messages are being consumed. - * - * @throws javax.jms.JMSException - */ - public void testAckedMessageAreConsumed() throws Exception { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("test"); - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - consumer.setMessageListener(this); - - Thread.sleep(10000); - - // Reset the session. - session.close(); - - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - Message msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } - - /** - * Tests if unacknowleged messages are being redelivered when the consumer - * connects again. - * - * @throws javax.jms.JMSException - */ - public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception { - connection.start(); - // don't aknowledge message on onMessage() call - dontAck = true; - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("test"); - MessageProducer producer = session.createProducer(queue); - // Consume the message... - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - consumer.setMessageListener(this); - - // Don't ack the message. - producer.send(session.createTextMessage("Hello")); - - // Reset the session. This should cause the Unacked message to be - // redelivered. - session.close(); - - Thread.sleep(10000); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - Message msg = consumer.receive(2000); - assertNotNull(msg); - msg.acknowledge(); - - session.close(); - } - - public void onMessage(Message message) { - - assertNotNull(message); - if (!dontAck) { - try { - message.acknowledge(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java deleted file mode 100644 index c4aa3c6..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsClientAckTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -public class JmsClientAckTest extends TestSupport { - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Tests if acknowledged messages are being consumed. - * - * @throws JMSException - */ - public void testAckedMessageAreConsumed() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - msg.acknowledge(); - - // Reset the session. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } - - /** - * Tests if acknowledged messages are being consumed. - * - * @throws JMSException - */ - public void testLastMessageAcked() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - producer.send(session.createTextMessage("Hello")); - producer.send(session.createTextMessage("Hello2")); - producer.send(session.createTextMessage("Hello3")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - msg = consumer.receive(1000); - assertNotNull(msg); - msg = consumer.receive(1000); - assertNotNull(msg); - msg.acknowledge(); - - // Reset the session. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - msg = consumer.receive(1000); - assertNull(msg); - - session.close(); - } - - /** - * Tests if unacknowledged messages are being re-delivered when the consumer connects again. - * - * @throws JMSException - */ - public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic(getQueueName()); - MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id3"); - producer.send(session.createTextMessage("Hello")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - // Don't ack the message. - - // Reset the session. This should cause the unacknowledged message to be re-delivered. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id3"); - msg = consumer.receive(2000); - assertNotNull(msg); - msg.acknowledge(); - - session.close(); - } - - protected String getQueueName() { - return getClass().getName() + "." + getName(); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java deleted file mode 100644 index 3649614..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConnectionStartStopTest.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import org.apache.hedwig.jms.DebugUtil; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.util.Random; -import java.util.Vector; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -public class JmsConnectionStartStopTest extends TestSupport { - - private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory - .getLog(JmsConnectionStartStopTest.class); - - private Connection startedConnection; - private Connection stoppedConnection; - - /** - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - - super.setUp(); - LOG.info(getClass().getClassLoader().getResource("log4j.properties")); - - HedwigConnectionFactoryImpl factory = createConnectionFactory(); - startedConnection = factory.createConnection(); - startedConnection.start(); - stoppedConnection = factory.createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - stoppedConnection.close(); - startedConnection.close(); - super.tearDown(); - } - - /** - * Tests if the consumer receives the messages that were sent before the - * connection was started. - * - * @throws JMSException - */ - public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException { - Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Setup the consumers. - Topic topic = startedSession.createTopic("test"); - MessageConsumer startedConsumer = startedSession.createConsumer(topic); - MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic); - - // Send the message. - MessageProducer producer = startedSession.createProducer(topic); - TextMessage message = startedSession.createTextMessage("Hello"); - producer.send(message); - - // Test the assertions. - Message m = startedConsumer.receive(1000); - assertNotNull(m); - - m = stoppedConsumer.receive(1000); - assertNull(m); - - stoppedConnection.start(); - m = stoppedConsumer.receive(5000); - assertNotNull(m); - - startedSession.close(); - stoppedSession.close(); - } - - /** - * Tests if the consumer is able to receive messages eveb when the - * connecction restarts multiple times. - * - * @throws Exception - */ - public void testMultipleConnectionStops() throws Exception { - testStoppedConsumerHoldsMessagesTillStarted(); - stoppedConnection.stop(); - testStoppedConsumerHoldsMessagesTillStarted(); - stoppedConnection.stop(); - testStoppedConsumerHoldsMessagesTillStarted(); - } - - - public void testConcurrentSessionCreateWithStart() throws Exception { - ThreadPoolExecutor executor = new ThreadPoolExecutor(50, Integer.MAX_VALUE, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>()); - final Vector<Throwable> exceptions = new Vector<Throwable>(); - final Random rand = new Random(); - final int numIterations = 100; - final CountDownLatch latch = new CountDownLatch(numIterations * 2); - Runnable createSessionTask = new Runnable() { - public void run() { - try { - TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); - latch.countDown(); - stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } catch (Exception e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }; - - Runnable startStopTask = new Runnable() { - public void run() { - try { - TimeUnit.MILLISECONDS.sleep(rand.nextInt(10)); - latch.countDown(); - stoppedConnection.start(); - stoppedConnection.stop(); - } catch (Exception e) { - e.printStackTrace(); - exceptions.add(e); - } - } - }; - - for (int i=0; i<numIterations; i++) { - executor.execute(createSessionTask); - executor.execute(startStopTask); - } - - executor.shutdown(); - final long remaining; - { - boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS); - remaining = latch.getCount(); - if (!terminated){ - DebugUtil.dumpAllStacktraces(System.err); - } - assertTrue("executor terminated. remaining : " + remaining, terminated); - } - assertTrue("remaining : " + remaining + ", no exceptions: " + exceptions, exceptions.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java deleted file mode 100644 index aaf47f2..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsConsumerResetActiveListenerTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import java.util.Vector; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - - -public class JmsConsumerResetActiveListenerTest extends JmsTestBase { - - private Connection connection; - private HedwigConnectionFactoryImpl factory; - - protected void setUp() throws Exception { - super.setUp(); - factory = new HedwigConnectionFactoryImpl(); - connection = factory.createConnection(); - } - - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * verify the (undefined by spec) behaviour of setting a listener while receiving a message. - * - * @throws Exception - */ - public void testSetListenerFromListener() throws Exception { - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Destination dest = session.createTopic("Queue-" + getName()); - final MessageConsumer consumer = session.createConsumer(dest); - - final CountDownLatch latch = new CountDownLatch(2); - final AtomicBoolean first = new AtomicBoolean(true); - final Vector<Object> results = new Vector<Object>(); - consumer.setMessageListener(new MessageListener() { - - public void onMessage(Message message) { - if (first.compareAndSet(true, false)) { - try { - consumer.setMessageListener(this); - results.add(message); - } catch (JMSException e) { - results.add(e); - } - } else { - results.add(message); - } - latch.countDown(); - } - }); - - connection.start(); - - MessageProducer producer = session.createProducer(dest); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(session.createTextMessage("First")); - producer.send(session.createTextMessage("Second")); - - assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); - - assertEquals("we have a result", 2, results.size()); - Object result = results.get(0); - assertTrue(result instanceof TextMessage); - assertEquals("result is first", "First", ((TextMessage)result).getText()); - result = results.get(1); - assertTrue(result instanceof TextMessage); - assertEquals("result is first", "Second", ((TextMessage)result).getText()); - } - - /** - * and a listener on a new consumer, just in case. - * - * @throws Exception - */ - public void testNewConsumerSetListenerFromListener() throws Exception { - final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - final Destination dest = session.createTopic("Queue-" + getName()); - final MessageConsumer consumer = session.createConsumer(dest); - - final CountDownLatch latch = new CountDownLatch(2); - final AtomicBoolean first = new AtomicBoolean(true); - final Vector<Object> results = new Vector<Object>(); - consumer.setMessageListener(new MessageListener() { - - public void onMessage(Message message) { - if (first.compareAndSet(true, false)) { - try { - MessageConsumer anotherConsumer = session.createConsumer(dest); - anotherConsumer.setMessageListener(this); - results.add(message); - } catch (JMSException e) { - results.add(e); - } - } else { - results.add(message); - } - latch.countDown(); - } - }); - - connection.start(); - - MessageProducer producer = session.createProducer(dest); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(session.createTextMessage("First")); - producer.send(session.createTextMessage("Second")); - - assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS)); - - assertEquals("we have a result", 2, results.size()); - Object result = results.get(0); - assertTrue(result instanceof TextMessage); - assertEquals("result is first", "First", ((TextMessage)result).getText()); - result = results.get(1); - assertTrue(result instanceof TextMessage); - assertEquals("result is first", "Second", ((TextMessage)result).getText()); - } - } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java deleted file mode 100644 index 218bbe5..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener { - - private Connection connection; - private Session publisherSession; - private Session consumerSession; - private MessageConsumer consumer; - private MessageConsumer testConsumer; - private MessageProducer producer; - private Topic topic; - private Object lock = new Object(); - - /* - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - super.topic = true; - connection = createConnection(false); - connection.setClientID("connection:" + getSubject()); - publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - topic = (Topic)super.createDestination("Test.Topic"); - consumer = consumerSession.createConsumer(topic); - consumer.setMessageListener(this); - producer = publisherSession.createProducer(topic); - connection.start(); - } - - /* - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - connection.close(); - super.tearDown(); - } - - /** - * Tests if a consumer can be created asynchronusly - * - * @throws Exception - */ - public void testCreateConsumer() throws Exception { - Message msg = super.createMessage(); - producer.send(msg); - if (testConsumer == null) { - synchronized (lock) { - lock.wait(3000); - } - } - assertTrue(testConsumer != null); - } - - /** - * Use the asynchronous subscription mechanism - * - * @param message - */ - public void onMessage(Message message) { - try { - testConsumer = consumerSession.createConsumer(topic); - consumerSession.createProducer(topic); - synchronized (lock) { - lock.notify(); - } - } catch (Exception ex) { - ex.printStackTrace(); - assertTrue(false); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java deleted file mode 100644 index 548e7a8..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSelectorTest.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; - -public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest { - public void setUp() throws Exception { - durable = true; - super.setUp(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java deleted file mode 100644 index b2e2ed7..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicSendReceiveTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.activemq.test.JmsTopicSendReceiveTest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest { - private static final Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSendReceiveTest.class); - - protected Connection connection2; - protected Session session2; - protected Session consumeSession2; - protected MessageConsumer consumer2; - protected MessageProducer producer2; - protected Destination consumerDestination2; - protected Destination producerDestination2; - - /** - * Set up a durable suscriber test. - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - this.durable = true; - super.setUp(); - } - - /** - * Test if all the messages sent are being received. - * - * @throws Exception - */ - public void testSendWhileClosed() throws Exception { - connection2 = createConnection(false); - if (null == connection.getClientID()) connection2.setClientID(getName() + "test"); - connection2.start(); - session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer2 = session2.createProducer(null); - producer2.setDeliveryMode(deliveryMode); - producerDestination2 = session2.createTopic(getProducerSubject() + "2"); - Thread.sleep(1000); - - consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumerDestination2 = session2.createTopic(getConsumerSubject() + "2"); - consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName()); - Thread.sleep(1000); - consumer2.close(); - TextMessage message = session2.createTextMessage("test"); - message.setStringProperty("test", "test"); - message.setJMSType("test"); - producer2.send(producerDestination2, message); - LOG.info("Creating durable consumer"); - consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName()); - Message msg = consumer2.receive(1000); - assertNotNull(msg); - assertEquals(((TextMessage)msg).getText(), "test"); - assertEquals(msg.getJMSType(), "test"); - assertEquals(msg.getStringProperty("test"), "test"); - connection2.stop(); - connection2.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java deleted file mode 100644 index c0da42f..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsDurableTopicTransactionTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.DeliveryMode; - -import org.apache.activemq.test.JmsResourceProvider; - -public class JmsDurableTopicTransactionTest extends JmsTopicTransactionTest { - - /** - * @see JmsTransactionTestSupport#getJmsResourceProvider() - */ - protected JmsResourceProvider getJmsResourceProvider() { - JmsResourceProvider provider = new JmsResourceProvider(); - provider.setTopic(true); - provider.setDeliveryMode(DeliveryMode.PERSISTENT); - provider.setClientID(getClass().getName()); - provider.setDurableName(getName()); - return provider; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java deleted file mode 100644 index 3873a0f..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.net.URI; -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.TopicSubscriber; - - - -import javax.jms.Destination; - - -import org.apache.activemq.util.MessageIdList; - -/** - * Test case support used to test multiple message comsumers and message - * producers connecting to a single broker. - */ -public class JmsMultipleClientsTestSupport extends CombinationTestSupport { - - protected Map<MessageConsumer, MessageIdList> consumers - = new HashMap<MessageConsumer, MessageIdList>(); // Map of consumer with messages received - protected int consumerCount = 1; - protected int producerCount = 1; - - protected int messageSize = 1024; - - protected boolean useConcurrentSend = true; - protected boolean autoFail = true; - protected boolean durable; - public boolean topic; - protected boolean persistent; - - protected Destination destination; - protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); - protected MessageIdList allMessagesList = new MessageIdList(); - - private AtomicInteger producerLock; - - protected void startProducers(Destination dest, int msgCount) throws Exception { - startProducers(createConnectionFactory(), dest, msgCount); - } - - protected void startProducers(final ConnectionFactory factory, - final Destination dest, final int msgCount) throws Exception { - // Use concurrent send - if (useConcurrentSend) { - producerLock = new AtomicInteger(producerCount); - - for (int i = 0; i < producerCount; i++) { - Thread t = new Thread(new Runnable() { - public void run() { - try { - sendMessages(factory.createConnection(), dest, msgCount); - } catch (Exception e) { - e.printStackTrace(); - } - - synchronized (producerLock) { - producerLock.decrementAndGet(); - producerLock.notifyAll(); - } - } - }); - - t.start(); - } - - // Wait for all producers to finish sending - synchronized (producerLock) { - while (producerLock.get() != 0) { - producerLock.wait(2000); - } - } - - // Use serialized send - } else { - for (int i = 0; i < producerCount; i++) { - sendMessages(factory.createConnection(), dest, msgCount); - } - } - } - - protected void sendMessages(Connection connection, Destination destination, int count) throws Exception { - connections.add(connection); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - for (int i = 0; i < count; i++) { - TextMessage msg = createTextMessage(session, "" + i); - producer.send(msg); - } - - producer.close(); - session.close(); - connection.close(); - } - - protected TextMessage createTextMessage(Session session, String initText) throws Exception { - TextMessage msg = session.createTextMessage(); - - // Pad message text - if (initText.length() < messageSize) { - char[] data = new char[messageSize - initText.length()]; - Arrays.fill(data, '*'); - String str = new String(data); - msg.setText(initText + str); - - // Do not pad message text - } else { - msg.setText(initText); - } - - return msg; - } - - protected void startConsumers(Destination dest) throws Exception { - startConsumers(createConnectionFactory(), dest); - } - - protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception { - MessageConsumer consumer; - for (int i = 0; i < consumerCount; i++) { - if (durable && topic) { - consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1)); - } else { - consumer = createMessageConsumer(factory.createConnection(), dest); - } - MessageIdList list = new MessageIdList(); - list.setParent(allMessagesList); - consumer.setMessageListener(list); - consumers.put(consumer, list); - } - } - - protected MessageConsumer createMessageConsumer(Connection conn, Destination dest) throws Exception { - connections.add(conn); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sess.createConsumer(dest); - conn.start(); - - return consumer; - } - - protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception { - conn.setClientID(name); - connections.add(conn); - conn.start(); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name); - - return consumer; - } - - protected void waitForAllMessagesToBeReceived(int messageCount) throws Exception { - allMessagesList.waitForMessagesToArrive(messageCount); - } - - protected Destination createDestination() throws JMSException { - String name = "." + getClass().getName() + "." + getName(); - // ensure not inadvertently composite because of combos - name = name.replace(' ','_'); - name = name.replace(',','&'); - if (topic) { - destination = SessionImpl.asTopic("Topic" + name); - return (Destination)destination; - } else { - destination = SessionImpl.asTopic("Queue" + name); - return (Destination)destination; - } - } - - protected ConnectionFactory createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - protected void setUp() throws Exception { - super.setAutoFail(autoFail); - super.setUp(); - } - - protected void tearDown() throws Exception { - for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) { - Connection conn = iter.next(); - try { - conn.close(); - } catch (Throwable e) { - } - } - allMessagesList.flushMessages(); - consumers.clear(); - super.tearDown(); - } - - /* - * Some helpful assertions for multiple consumers. - */ - protected void assertConsumerReceivedAtLeastXMessages(MessageConsumer consumer, int msgCount) { - MessageIdList messageIdList = consumers.get(consumer); - messageIdList.assertAtLeastMessagesReceived(msgCount); - } - - protected void assertConsumerReceivedAtMostXMessages(MessageConsumer consumer, int msgCount) { - MessageIdList messageIdList = consumers.get(consumer); - messageIdList.assertAtMostMessagesReceived(msgCount); - } - - protected void assertConsumerReceivedXMessages(MessageConsumer consumer, int msgCount) { - MessageIdList messageIdList = consumers.get(consumer); - messageIdList.assertMessagesReceivedNoWait(msgCount); - } - - protected void assertEachConsumerReceivedAtLeastXMessages(int msgCount) { - for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { - assertConsumerReceivedAtLeastXMessages(i.next(), msgCount); - } - } - - protected void assertEachConsumerReceivedAtMostXMessages(int msgCount) { - for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { - assertConsumerReceivedAtMostXMessages(i.next(), msgCount); - } - } - - protected void assertEachConsumerReceivedXMessages(int msgCount) { - for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { - assertConsumerReceivedXMessages(i.next(), msgCount); - } - } - - protected void assertTotalMessagesReceived(int msgCount) { - allMessagesList.assertMessagesReceivedNoWait(msgCount); - - // now lets count the individual messages received - int totalMsg = 0; - for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); i.hasNext();) { - MessageIdList messageIdList = consumers.get(i.next()); - totalMsg += messageIdList.getMessageCount(); - } - assertEquals("Total of consumers message count", msgCount, totalMsg); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java deleted file mode 100644 index fba7064..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsRedeliveredTest.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Connection; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -public class JmsRedeliveredTest extends JmsTestBase { - - private Connection connection; - - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - /** - * Creates a connection. - * - * @return connection - * @throws Exception - */ - protected Connection createConnection() throws Exception { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - return factory.createConnection(); - } - - /** - * Tests if a message unacknowledged message gets to be resent when the - * session is closed and then a new consumer session is created. - * - */ - public void testTopicSessionCloseMarksMessageRedelivered() throws JMSException { - connection.start(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - MessageProducer producer = createProducer(session, queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - producer.send(createTextMessage(session)); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - // Don't ack the message. - - // Reset the session. This should cause the Unacked message to be - // redelivered. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id1"); - msg = consumer.receive(2000); - assertNotNull(msg); - // Since we only simulate this in provider, we cannot do this across consumers ! - // assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - msg.acknowledge(); - - session.close(); - } - - - public void testTopicSessionCloseMarksUnAckedMessageRedelivered() throws JMSException { - connection.start(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - MessageProducer producer = createProducer(session, queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - producer.send(createTextMessage(session, "1")); - producer.send(createTextMessage(session, "2")); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - assertEquals("1", ((TextMessage)msg).getText()); - msg.acknowledge(); - - // Don't ack the message. - msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - assertEquals("2", ((TextMessage)msg).getText()); - - // Reset the session. This should cause the Unacked message to be - // redelivered. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(queue, "subscriber-id2"); - msg = consumer.receive(2000); - assertNotNull(msg); - assertEquals("2", ((TextMessage)msg).getText()); - // Since we only simulate this in provider, we cannot do this across consumers ! - // assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - msg.acknowledge(); - - session.close(); - } - - /** - * Tests session recovery and that the redelivered message is marked as - * such. Session uses client acknowledgement, the destination is a queue. - * - * @throws JMSException - */ - public void testTopicRecoverMarksMessageRedelivered() throws Exception { - connection.setClientID(getName()); - connection.start(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - MessageProducer producer = createProducer(session, queue); - MessageConsumer consumer = session.createDurableSubscriber(queue, getName() + " - subscriber"); - producer.send(createTextMessage(session)); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - // Don't ack the message. - - // We DO NOT support session recovery - // - to unblock this test, I am stopp'ing and start'ing connection : not the same, but ... - // Reset the session. This should cause the Unacked message to be - // redelivered. - // session.recover(); - connection.close(); - connection = createConnection(); - connection.setClientID(getName()); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - consumer = session.createDurableSubscriber(queue, getName() + " - subscriber"); - connection.start(); - - // Attempt to Consume the message... - msg = consumer.receive(2000); - assertNotNull(msg); - // assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - msg.acknowledge(); - - session.close(); - } - - /** - * Tests rollback message to be marked as redelivered. Session uses client - * acknowledgement and the destination is a queue. - * - * @throws JMSException - */ - public void testTopicRollbackMarksMessageRedelivered() throws JMSException { - connection.start(); - - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - MessageProducer producer = createProducer(session, queue); - MessageConsumer consumer = session.createConsumer(queue); - producer.send(createTextMessage(session)); - session.commit(); - - // Get the message... Should not be redelivered. - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - - // Rollback.. should cause redelivery. - session.rollback(); - - // Attempt to Consume the message... - msg = consumer.receive(2000); - assertNotNull(msg); - assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - - session.commit(); - session.close(); - } - - /** - * Tests if the message gets to be re-delivered when the session closes and - * that the re-delivered message is marked as such. Session uses client - * acknowledgment, the destination is a topic and the consumer is a durable - * subscriber. - * - * @throws JMSException - */ - public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException { - connection.setClientID(getName()); - connection.start(); - - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Topic topic = session.createTopic("topic-" + getName()); - MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); - - // This case only works with persistent messages since transient - // messages - // are dropped when the consumer goes offline. - MessageProducer producer = session.createProducer(topic); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.send(createTextMessage(session)); - - // Consume the message... - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered()); - // Don't ack the message. - - // Reset the session. This should cause the Unacked message to be - // re-delivered. - session.close(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - // Attempt to Consume the message... - consumer = session.createDurableSubscriber(topic, "sub1"); - msg = consumer.receive(2000); - assertNotNull(msg); - // Since we only simulate this in provider, we cannot do this across consumers ! - // assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - msg.acknowledge(); - - session.close(); - } - - /** - * Tests rollback message to be marked as redelivered. Session uses client - * acknowledgement and the destination is a topic. - * - * @throws JMSException - */ - public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException { - if (null == connection.getClientID()) connection.setClientID(getName()); - connection.start(); - - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); - Topic topic = session.createTopic("topic-" + getName()); - MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); - - MessageProducer producer = createProducer(session, topic); - producer.send(createTextMessage(session)); - session.commit(); - - // Get the message... Should not be redelivered. - Message msg = consumer.receive(1000); - assertNotNull(msg); - assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); - - // Rollback.. should cause redelivery. - session.rollback(); - - // Attempt to Consume the message... - msg = consumer.receive(2000); - assertNotNull(msg); - assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); - - session.commit(); - session.close(); - } - - /** - * Creates a text message. - * - * @param session - * @return TextMessage. - * @throws JMSException - */ - private TextMessage createTextMessage(Session session) throws JMSException { - return createTextMessage(session, "Hello"); - } - - private TextMessage createTextMessage(Session session, String txt) throws JMSException { - return session.createTextMessage(txt); - } - - /** - * Creates a producer. - * - * @param session - * @param queue - destination. - * @return MessageProducer - * @throws JMSException - */ - private MessageProducer createProducer(Session session, Destination queue) throws JMSException { - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(getDeliveryMode()); - return producer; - } - - /** - * Returns delivery mode. - * - * @return int - persistent delivery mode. - */ - protected int getDeliveryMode() { - return DeliveryMode.PERSISTENT; - } - - /** - * Run the JmsRedeliverTest with the delivery mode set as persistent. - */ - public static final class PersistentCase extends JmsRedeliveredTest { - - /** - * Returns delivery mode. - * - * @return int - persistent delivery mode. - */ - protected int getDeliveryMode() { - return DeliveryMode.PERSISTENT; - } - } - - /** - * Run the JmsRedeliverTest with the delivery mode set as non-persistent. - */ - public static final class TransientCase extends JmsRedeliveredTest { - - /** - * Returns delivery mode. - * - * @return int - non-persistent delivery mode. - */ - protected int getDeliveryMode() { - return DeliveryMode.NON_PERSISTENT; - } - } - - public static Test suite() { - TestSuite suite = new TestSuite(); - suite.addTestSuite(PersistentCase.class); - suite.addTestSuite(TransientCase.class); - return suite; - } -}