http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java deleted file mode 100644 index 6de9021..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/TestSupport.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test; - -import javax.jms.Topic; -import java.io.File; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.SessionImpl; -import java.lang.reflect.Array; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.hedwig.jms.message.MessageImpl; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Useful base class for unit test cases - */ -public abstract class TestSupport extends JmsTestBase { - private static final Logger LOG = LoggerFactory.getLogger(TestSupport.class); - - protected HedwigConnectionFactoryImpl connectionFactory; - protected boolean topic = true; - - public TestSupport() { - super(); - } - - public TestSupport(String name) { - super(name); - } - - /** - * Creates an MessageImpl. - * - * @return MessageImpl - */ - protected MessageImpl createMessage() { - return new MessageImpl(null); - } - - /** - * Creates a destination. - * - * @param subject - topic or queue name. - * @return Destination - either an Topic or ActiveMQQUeue. - */ - protected Destination createDestination(String subject) { - if (topic) { - return SessionImpl.asTopic(subject); - } else { - return SessionImpl.asTopic(subject); - } - } - - /** - * Tests if firstSet and secondSet are equal. - * - * @param messsage - string to be displayed when the assertion fails. - * @param firstSet[] - set of messages to be compared with its counterpart - * in the secondset. - * @param secondSet[] - set of messages to be compared with its counterpart - * in the firstset. - * @throws JMSException - */ - protected void assertTextMessagesEqual(Message[] firstSet, Message[] secondSet) throws JMSException { - assertTextMessagesEqual("", firstSet, secondSet); - } - - /** - * Tests if firstSet and secondSet are equal. - * - * @param messsage - string to be displayed when the assertion fails. - * @param firstSet[] - set of messages to be compared with its counterpart - * in the secondset. - * @param secondSet[] - set of messages to be compared with its counterpart - * in the firstset. - */ - protected void assertTextMessagesEqual(String messsage, Message[] firstSet, - Message[] secondSet) throws JMSException { - assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length); - - for (int i = 0; i < secondSet.length; i++) { - TextMessage m1 = (TextMessage)firstSet[i]; - TextMessage m2 = (TextMessage)secondSet[i]; - assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2); - } - } - - /** - * Tests if m1 and m2 are equal. - * - * @param m1 - message to be compared with m2. - * @param m2 - message to be compared with m1. - * @throws JMSException - */ - protected void assertEquals(TextMessage m1, TextMessage m2) throws JMSException { - assertEquals("", m1, m2); - } - - /** - * Tests if m1 and m2 are equal. - * - * @param message - string to be displayed when the assertion fails. - * @param m1 - message to be compared with m2. - * @param m2 - message to be compared with m1. - */ - protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException { - assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); - - if (m1 == null) { - return; - } - - assertEquals(message, m1.getText(), m2.getText()); - } - - /** - * Tests if m1 and m2 are equal. - * - * @param m1 - message to be compared with m2. - * @param m2 - message to be compared with m1. - * @throws JMSException - */ - protected void assertEquals(Message m1, Message m2) throws JMSException { - assertEquals("", m1, m2); - } - - /** - * Tests if m1 and m2 are equal. - * - * @param message - error message. - * @param m1 - message to be compared with m2. - * @param m2 -- message to be compared with m1. - */ - protected void assertEquals(String message, Message m1, Message m2) throws JMSException { - assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); - - if (m1 == null) { - return; - } - - assertTrue(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getClass() == m2.getClass()); - - if (m1 instanceof TextMessage) { - assertTextMessageEqual(message, (TextMessage)m1, (TextMessage)m2); - } else { - assertEquals(message, m1, m2); - } - } - - /** - * Test if base directory contains spaces - */ - protected void assertBaseDirectoryContainsSpaces() { - assertFalse("Base directory cannot contain spaces.", - new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" ")); - } - - /** - * Creates an HedwigConnectionFactoryImpl. - * - * @return HedwigConnectionFactoryImpl - * @throws Exception - */ - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - /** - * Factory method to create a new connection. - * - * @return connection - * @throws Exception - */ - protected Connection createConnection() throws Exception { - return getConnectionFactory().createConnection(); - } - - /** - * Creates an ActiveMQ connection factory. - * - * @return connectionFactory - * @throws Exception - */ - public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception { - if (connectionFactory == null) { - connectionFactory = createConnectionFactory(); - assertTrue("Should have created a connection factory!", connectionFactory != null); - } - - return connectionFactory; - } - - /** - * Returns the consumer subject. - * - * @return String - */ - protected String getConsumerSubject() { - return getSubject(); - } - - /** - * Returns the producer subject. - * - * @return String - */ - protected String getProducerSubject() { - return getSubject(); - } - - /** - * Returns the subject. - * - * @return String - */ - protected String getSubject() { - return getClass().getName() + "." + getName(); - } - - protected void assertArrayEqual(String message, Object[] expected, Object[] actual) { - assertEquals(message + ". Array length", expected.length, actual.length); - for (int i = 0; i < expected.length; i++) { - assertEquals(message + ". element: " + i, expected[i], actual[i]); - } - } - - protected void assertPrimitiveArrayEqual(String message, Object expected, Object actual) { - int length = Array.getLength(expected); - assertEquals(message + ". Array length", length, Array.getLength(actual)); - for (int i = 0; i < length; i++) { - assertEquals(message + ". element: " + i, Array.get(expected, i), Array.get(actual, i)); - } - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java b/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java deleted file mode 100644 index 0bc8a8e..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.test.rollback; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DelegatingTransactionalMessageListener implements MessageListener { - private static final transient Logger LOG = LoggerFactory.getLogger(DelegatingTransactionalMessageListener.class); - - private final MessageListener underlyingListener; - private boolean transacted = true; - private int ackMode = Session.AUTO_ACKNOWLEDGE; - private Session session; - - public DelegatingTransactionalMessageListener(MessageListener underlyingListener, - Connection connection, Destination destination) { - this.underlyingListener = underlyingListener; - - try { - session = connection.createSession(transacted, ackMode); - MessageConsumer consumer = session.createConsumer(destination); - consumer.setMessageListener(this); - } catch (JMSException e) { - throw new IllegalStateException("Could not listen to " + destination, e); - } - } - - public void onMessage(Message message) { - try { - underlyingListener.onMessage(message); - session.commit(); - } catch (Throwable e) { - rollback(); - } - } - - private void rollback() { - try { - session.rollback(); - } catch (JMSException e) { - LOG.error("Failed to rollback: " + e, e); - } - } - - public Session getSession() { - return session; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java deleted file mode 100644 index 5bc1a07..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSentMessageTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.usecases; - -import java.util.HashMap; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import org.apache.activemq.test.TestSupport; - -public class ChangeSentMessageTest extends TestSupport { - private static final int COUNT = 200; - private static final String VALUE_NAME = "value"; - - /** - * test Object messages can be changed after sending with no side-affects - * - * @throws Exception - */ - public void testDoChangeSentMessage() throws Exception { - Destination destination = createDestination("test-" + ChangeSentMessageTest.class.getName()); - Connection connection = createConnection(); - connection.start(); - Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(destination); - Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = publisherSession.createProducer(destination); - HashMap<String, Integer> map = new HashMap<String, Integer>(); - ObjectMessage message = publisherSession.createObjectMessage(); - for (int i = 0; i < COUNT; i++) { - map.put(VALUE_NAME, Integer.valueOf(i)); - message.setObject(map); - producer.send(message); - assertTrue(message.getObject() == map); - } - for (int i = 0; i < COUNT; i++) { - ObjectMessage msg = (ObjectMessage)consumer.receive(); - HashMap receivedMap = (HashMap)msg.getObject(); - Integer intValue = (Integer)receivedMap.get(VALUE_NAME); - assertTrue(intValue.intValue() == i); - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java deleted file mode 100644 index b4da7a7..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ChangeSessionDeliveryModeTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.usecases; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.activemq.test.TestSupport; - -public class ChangeSessionDeliveryModeTest extends TestSupport implements MessageListener { - - /** - * test following condition- which are defined by JMS Spec 1.1: - * MessageConsumers cannot use a MessageListener and receive() from the same - * session - * - * @throws Exception - */ - public void testDoChangeSessionDeliveryMode() throws Exception { - Destination destination = createDestination("foo.bar"); - Connection connection = createConnection(); - connection.start(); - Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - /* - MessageConsumer consumer1 = consumerSession.createConsumer(destination); - consumer1.setMessageListener(this); - JMSException jmsEx = null; - MessageConsumer consumer2 = consumerSession.createConsumer(destination); - - try { - consumer2.receive(10); - fail("Did not receive expected exception."); - } catch (JMSException e) { - assertTrue(e instanceof IllegalStateException); - } - */ - MessageConsumer consumer1 = consumerSession.createConsumer(destination); - consumer1.setMessageListener(this); - - try { - consumer1.receive(10); - fail("Did not receive expected exception."); - } catch (JMSException e) { - assertTrue(e instanceof IllegalStateException); - } - } - - public void onMessage(Message msg) { - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java deleted file mode 100644 index 7ea8cc0..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositeConsumeTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Destination; -import javax.jms.Message; - - -import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; - -import org.apache.hedwig.jms.SessionImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CompositeConsumeTest extends JmsTopicSendReceiveWithTwoConnectionsTest { - private static final Logger LOG = LoggerFactory.getLogger(CompositeConsumeTest.class); - - public void testSendReceive() throws Exception { - messages.clear(); - - Destination[] destinations = getDestinations(); - int destIdx = 0; - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(data[i]); - - if (verbose) { - LOG.info("About to send a message: " + message + " with text: " + data[i]); - } - - producer.send(destinations[destIdx], message); - - if (++destIdx >= destinations.length) { - destIdx = 0; - } - } - - assertMessagesAreReceived(); - } - - /** - * Returns the subscription subject - */ - protected String getSubject() { - // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y," + getPrefix() + "BAR.>"; - return getPrefix() + "FOO.BAR"; - } - - /** - * Returns the destinations on which we publish - */ - protected Destination[] getDestinations() { - return new Destination[]{SessionImpl.asTopic(getSubject())}; - } - - protected String getPrefix() { - return super.getSubject() + "."; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java deleted file mode 100644 index 7833e01..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/CompositePublishTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.util.List; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import org.apache.activemq.test.JmsSendReceiveTestSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CompositePublishTest extends JmsSendReceiveTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(CompositePublishTest.class); - - protected Connection sendConnection; - protected Connection receiveConnection; - protected Session receiveSession; - protected MessageConsumer[] consumers; - protected List[] messageLists; - - @SuppressWarnings("unchecked") - protected void setUp() throws Exception { - super.setUp(); - - connectionFactory = createConnectionFactory(); - - sendConnection = createConnection(false); - sendConnection.start(); - - receiveConnection = createConnection(false); - receiveConnection.start(); - - LOG.info("Created sendConnection: " + sendConnection); - LOG.info("Created receiveConnection: " + receiveConnection); - - session = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - receiveSession = receiveConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOG.info("Created sendSession: " + session); - LOG.info("Created receiveSession: " + receiveSession); - - producer = session.createProducer(null); - - LOG.info("Created producer: " + producer); - - consumerDestination = session.createTopic(getConsumerSubject()); - producerDestination = session.createTopic(getProducerSubject()); - - LOG.info("Created consumer destination: " + consumerDestination - + " of type: " + consumerDestination.getClass()); - LOG.info("Created producer destination: " + producerDestination - + " of type: " + producerDestination.getClass()); - - Destination[] destinations = getDestinations(); - consumers = new MessageConsumer[destinations.length]; - messageLists = new List[destinations.length]; - for (int i = 0; i < destinations.length; i++) { - Destination dest = destinations[i]; - messageLists[i] = createConcurrentList(); - consumers[i] = receiveSession.createConsumer(dest); - consumers[i].setMessageListener(createMessageListener(i, messageLists[i])); - } - - LOG.info("Started connections"); - } - - protected MessageListener createMessageListener(int i, final List<Message> messageList) { - return new MessageListener() { - public void onMessage(Message message) { - consumeMessage(message, messageList); - } - }; - } - - /** - * Returns the subject on which we publish - */ - protected String getSubject() { - // return getPrefix() + "FOO.BAR," + getPrefix() + "FOO.X.Y"; - return getPrefix() + "FOO.BAR"; - } - - /** - * Returns the destinations to which we consume - */ - protected Destination[] getDestinations() { - // return new Destination[] {SessionImpl.asTopic(getPrefix() + "FOO.BAR"), - // SessionImpl.asTopic(getPrefix() + "FOO.*"), SessionImpl.asTopic(getPrefix() + "FOO.X.Y")}; - return new Destination[] {SessionImpl.asTopic(getPrefix() + "FOO.BAR")}; - } - - protected String getPrefix() { - return super.getSubject() + "."; - } - - @SuppressWarnings("unchecked") - protected void assertMessagesAreReceived() throws JMSException { - waitForMessagesToBeDelivered(); - int size = messageLists.length; - for (int i = 0; i < size; i++) { - LOG.info("Message list: " + i + " contains: " + messageLists[i].size() + " message(s)"); - } - size = messageLists.length; - for (int i = 0; i < size; i++) { - assertMessagesReceivedAreValid(messageLists[i]); - } - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() { - return new HedwigConnectionFactoryImpl(); - } - - protected void tearDown() throws Exception { - session.close(); - receiveSession.close(); - - sendConnection.close(); - receiveConnection.close(); - super.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java deleted file mode 100644 index 087c5ef..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ /dev/null @@ -1,413 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.TopicSubscriber; -import junit.framework.Test; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.TestSupport; - -import org.apache.activemq.util.MessageIdList; -import org.apache.activemq.util.Wait; -//import org.apache.commons.dbcp.BasicDataSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConcurrentProducerDurableConsumerTest extends TestSupport { - private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class); - private int consumerCount = 5; - protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>()); - protected Map<MessageConsumer, TimedMessageListener> consumers - = new HashMap<MessageConsumer, TimedMessageListener>(); - protected MessageIdList allMessagesList = new MessageIdList(); - private int messageSize = 1024; - - public void testSendRateWithActivatingConsumers() throws Exception { - final Destination destination = createDestination(); - final ConnectionFactory factory = createConnectionFactory(); - startInactiveConsumers(factory, destination); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = createMessageProducer(session, destination); - - // preload the durable consumers - double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null); - LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1] - + ", max: " + inactiveConsumerStats[0] + ", multiplier: " - + (inactiveConsumerStats[0]/inactiveConsumerStats[1])); - - // periodically start a durable sub that has a backlog - final int consumersToActivate = 5; - final Object addConsumerSignal = new Object(); - Executors.newCachedThreadPool(new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "ActivateConsumer" + this); - } - }).execute(new Runnable() { - @Override - public void run() { - try { - MessageConsumer consumer = null; - for (int i = 0; i < consumersToActivate; i++) { - LOG.info("Waiting for add signal from producer..."); - synchronized (addConsumerSignal) { - addConsumerSignal.wait(30 * 60 * 1000); - } - TimedMessageListener listener = new TimedMessageListener(); - consumer = createDurableSubscriber(factory.createConnection(), - destination, "consumer" + (i + 1)); - LOG.info("Created consumer " + consumer); - consumer.setMessageListener(listener); - consumers.put(consumer, listener); - } - } catch (Exception e) { - LOG.error("failed to start consumer", e); - } - } - }); - - - double[] statsWithActive = produceMessages(destination, 500, 10, session, producer, addConsumerSignal); - - LOG.info(" with concurrent activate, ave: " + statsWithActive[1] - + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1])); - - while(consumers.size() < consumersToActivate) { - TimeUnit.SECONDS.sleep(2); - } - - long timeToFirstAccumulator = 0; - for (TimedMessageListener listener : consumers.values()) { - long time = listener.getFirstReceipt(); - timeToFirstAccumulator += time; - LOG.info("Time to first " + time); - } - LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size()); - - for (TimedMessageListener listener : consumers.values()) { - LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(10000) - + " max receipt: " + listener.maxReceiptTime); - } - - //assertTrue("max (" + statsWithActive[0] + ") within reasonable - // multiplier of ave (" + statsWithActive[1] + ")", - // statsWithActive[0] < 5 * statsWithActive[1]); - - // compare no active to active - LOG.info("Ave send time with active: " + statsWithActive[1] - + " as multiplier of ave with none active: " + inactiveConsumerStats[1] - + ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1])); - - assertTrue("Ave send time with active: " + statsWithActive[1] - + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1] - + ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]), - statsWithActive[1] < 15 * inactiveConsumerStats[1]); - } - - - public void x_testSendWithInactiveAndActiveConsumers() throws Exception { - Destination destination = createDestination(); - ConnectionFactory factory = createConnectionFactory(); - startInactiveConsumers(factory, destination); - - Connection connection = factory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - - final int toSend = 100; - final int numIterations = 5; - - double[] noConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null); - - startConsumers(factory, destination); - LOG.info("Activated consumer"); - - double[] withConsumerStats = produceMessages(destination, toSend, numIterations, session, producer, null); - - LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1] - + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1])); - final int reasonableMultiplier = 15; // not so reasonable but improving - assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: " - + noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]), - withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier); - - final int toReceive = toSend * numIterations * consumerCount * 2; - Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - LOG.info("count: " + allMessagesList.getMessageCount()); - return toReceive == allMessagesList.getMessageCount(); - } - }, 60 * 1000); - - assertEquals("got all messages", toReceive, allMessagesList.getMessageCount()); - } - - - private MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException { - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - return producer; - } - - - private void startInactiveConsumers(ConnectionFactory factory, Destination destination) throws Exception { - // create off line consumers - startConsumers(factory, destination); - for (Connection connection: connections) { - connection.close(); - } - connections.clear(); - consumers.clear(); - } - - - protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception { - MessageConsumer consumer; - for (int i = 0; i < consumerCount; i++) { - TimedMessageListener list = new TimedMessageListener(); - consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1)); - consumer.setMessageListener(list); - consumers.put(consumer, list); - } - } - - protected TopicSubscriber createDurableSubscriber(Connection conn, - Destination dest, String name) throws Exception { - conn.setClientID(name); - connections.add(conn); - conn.start(); - - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic)dest, name); - - return consumer; - } - - /** - * @return max and ave send time - * @throws Exception - */ - private double[] produceMessages(Destination destination, - final int toSend, - final int numIterations, - Session session, - MessageProducer producer, - Object addConsumerSignal) throws Exception { - long start; - long count = 0; - double batchMax = 0, max = 0, sum = 0; - for (int i=0; i<numIterations; i++) { - start = System.currentTimeMillis(); - for (int j=0; j < toSend; j++) { - long singleSendstart = System.currentTimeMillis(); - TextMessage msg = createTextMessage(session, "" + j); - // rotate - int priority = ((int)count%10); - producer.send(msg, DeliveryMode.PERSISTENT, priority, 0); - max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); - if (++count % 500 == 0) { - if (addConsumerSignal != null) { - synchronized (addConsumerSignal) { - addConsumerSignal.notifyAll(); - LOG.info("Signalled add consumer"); - } - } - } - ; - if (count % 5000 == 0) { - LOG.info("Sent " + count + ", singleSendMax:" + max); - } - - } - long duration = System.currentTimeMillis() - start; - batchMax = Math.max(batchMax, duration); - sum += duration; - LOG.info("Iteration " + i + ", sent " + toSend + ", time: " - + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max); - } - - LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max); - return new double[]{batchMax, sum/numIterations}; - } - - protected TextMessage createTextMessage(Session session, String initText) throws Exception { - TextMessage msg = session.createTextMessage(); - - // Pad message text - if (initText.length() < messageSize) { - char[] data = new char[messageSize - initText.length()]; - Arrays.fill(data, '*'); - String str = new String(data); - msg.setText(initText + str); - - // Do not pad message text - } else { - msg.setText(initText); - } - - return msg; - } - - @Override - protected void setUp() throws Exception { - topic = true; - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - for (Iterator<Connection> iter = connections.iterator(); iter.hasNext();) { - Connection conn = iter.next(); - try { - conn.close(); - } catch (Throwable e) { - } - } - allMessagesList.flushMessages(); - consumers.clear(); - super.tearDown(); - } - - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - return factory; - } - - public static Test suite() { - return suite(ConcurrentProducerDurableConsumerTest.class); - } - - class TimedMessageListener implements MessageListener { - final int batchSize = 1000; - CountDownLatch firstReceiptLatch = new CountDownLatch(1); - long mark = System.currentTimeMillis(); - long firstReceipt = 0l; - long receiptAccumulator = 0; - long batchReceiptAccumulator = 0; - long maxReceiptTime = 0; - AtomicLong count = new AtomicLong(0); - Map<Integer, MessageIdList> messageLists - = new ConcurrentHashMap<Integer, MessageIdList>(new HashMap<Integer, MessageIdList>()); - - @Override - public void onMessage(Message message) { - final long current = System.currentTimeMillis(); - final long duration = current - mark; - receiptAccumulator += duration; - int priority = 0; - try { - priority = message.getJMSPriority(); - } catch (JMSException ignored) {} - if (!messageLists.containsKey(priority)) { - messageLists.put(priority, new MessageIdList()); - } - messageLists.get(priority).onMessage(message); - if (count.incrementAndGet() == 1) { - firstReceipt = duration; - firstReceiptLatch.countDown(); - LOG.info("First receipt in " + firstReceipt + "ms"); - } else if (count.get() % batchSize == 0) { - LOG.info("Consumed " + count.get() + " in " - + batchReceiptAccumulator + "ms" + ", priority:" + priority); - batchReceiptAccumulator=0; - } - maxReceiptTime = Math.max(maxReceiptTime, duration); - receiptAccumulator += duration; - batchReceiptAccumulator += duration; - mark = current; - } - - long getMessageCount() { - return count.get(); - } - - long getFirstReceipt() throws Exception { - firstReceiptLatch.await(30, TimeUnit.SECONDS); - return firstReceipt; - } - - public long waitForReceivedLimit(long limit) throws Exception { - final long expiry = System.currentTimeMillis() + 30*60*1000; - while (count.get() < limit) { - if (System.currentTimeMillis() > expiry) { - throw new RuntimeException("Expired waiting for X messages, " + limit); - } - TimeUnit.SECONDS.sleep(2); - String missing = findFirstMissingMessage(); - if (missing != null) { - LOG.info("first missing = " + missing); - throw new RuntimeException("We have a missing message. " + missing); - } - - } - return receiptAccumulator/(limit/batchSize); - } - - private String findFirstMissingMessage() { - /* - MessageId current = new MessageId(); - for (MessageIdList priorityList : messageLists.values()) { - MessageId previous = null; - for (String id : priorityList.getMessageIds()) { - current.setValue(id); - if (previous == null) { - previous = current.copy(); - } else { - if (current.getProducerSequenceId() - 1 != previous.getProducerSequenceId() && - current.getProducerSequenceId() - 10 != previous.getProducerSequenceId()) { - return "Missing next after: " + previous + ", got: " + current; - } else { - previous = current.copy(); - } - } - } - } - return null; - */ - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java deleted file mode 100644 index b14ef71..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.java +++ /dev/null @@ -1,322 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.JmsConnectionStartStopTest; - - - - -/** - * Test case intended to demonstrate delivery interruption to queue consumers when - * a JMS selector leaves some messages on the queue (due to use of a JMS Selector) - * - * testNonDiscriminatingConsumer() demonstrates proper functionality for consumers that don't use - * a selector to qualify their input. - * - * testDiscriminatingConsumer() demonstrates the failure condition in which delivery to the consumer - * eventually halts. - * - * The expected behavior is for the delivery to the client to be maintained regardless of the depth - * of the queue, particularly when the messages in the queue do not meet the selector criteria of the - * client. - * - * https://issues.apache.org/activemq/browse/AMQ-2217 - * - */ -public class DiscriminatingConsumerLoadTest extends TestSupport { - - private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory - .getLog(DiscriminatingConsumerLoadTest.class); - - private Connection producerConnection; - private Connection consumerConnection; - private int counterSent = 0; - private int counterReceived = 0; - - public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe"; - public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe"; - - private int testSize = 5000; // setting this to a small number will pass all tests - - - protected void setUp() throws Exception { - super.setUp(); - this.producerConnection = this.createConnection(); - this.consumerConnection = this.createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (producerConnection != null) { - producerConnection.close(); - producerConnection = null; - } - if (consumerConnection != null) { - consumerConnection.close(); - consumerConnection = null; - } - super.tearDown(); - } - - /** - * Test to check if a single consumer with no JMS selector will receive all intended messages - * - * @throws java.lang.Exception - */ - public void testNonDiscriminatingConsumer() throws Exception { - consumerConnection = createConnection(); - consumerConnection.start(); - LOG.info("consumerConnection = " +consumerConnection); - - try {Thread.sleep(1000); } catch (Exception e) {} - - // here we pass in null for the JMS selector - Consumer consumer = new Consumer(consumerConnection, null); - Thread consumerThread = new Thread(consumer); - - consumerThread.start(); - - producerConnection = createConnection(); - producerConnection.start(); - LOG.info("producerConnection = " +producerConnection); - - try {Thread.sleep(3000); } catch (Exception e) {} - - Producer producer = new Producer(producerConnection); - Thread producerThread = new Thread(producer); - producerThread.start(); - - // now that everything is running, let's wait for the consumer thread to finish ... - consumerThread.join(); - producer.stop = true; - - if (consumer.getCount() == testSize ) - LOG.info("test complete .... all messsages consumed!!"); - else - LOG.info("test failed .... Sent " + (testSize / 1) + - " messages intended to be consumed ( " + testSize - + " total), but only consumed " + consumer.getCount()); - - - assertTrue("Sent " + testSize + " messages intended to be consumed, but only consumed " + consumer.getCount(), - (consumer.getCount() == testSize )); - assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted()); - - - } - - /** - * Test to check if a single consumer with a JMS selector will receive all intended messages - * - * @throws java.lang.Exception - */ - public void testDiscriminatingConsumer() throws Exception { - - consumerConnection = createConnection(); - consumerConnection.start(); - LOG.info("consumerConnection = " +consumerConnection); - - try {Thread.sleep(1000); } catch (Exception e) {} - - // here we pass the JMS selector we intend to consume - Consumer consumer = new Consumer(consumerConnection, JMSTYPE_EATME); - Thread consumerThread = new Thread(consumer); - - consumerThread.start(); - - producerConnection = createConnection(); - producerConnection.start(); - LOG.info("producerConnection = " +producerConnection); - - try {Thread.sleep(3000); } catch (Exception e) {} - - Producer producer = new Producer(producerConnection); - Thread producerThread = new Thread(producer); - producerThread.start(); - - // now that everything is running, let's wait for the consumer thread to finish ... - consumerThread.join(); - producer.stop = true; - - if (consumer.getCount() == (testSize / 2)) - { - LOG.info("test complete .... all messsages consumed!!"); - } - else - { - LOG.info("test failed .... Sent " + testSize - + " original messages, only half of which (" + (testSize / 2) + - ") were intended to be consumed: consumer paused at: " + consumer.getCount()); - //System.out.println("test failed .... Sent " + testSize - //+ " original messages, only half of which (" + (testSize / 2) + - // ") were intended to be consumed: consumer paused at: " + consumer.getCount()); - } - assertTrue("Sent " + testSize + " original messages, only half of which (" + (testSize / 2) + - ") were intended to be consumed: consumer paused at: " + consumer.getCount(), - (consumer.getCount() == (testSize / 2))); - assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", - consumer.deliveryHalted()); - } - - /** - * Helper class that will publish 2 * testSize messages. The messages will be distributed evenly - * between the following two JMS types: - * - * @see JMSTYPE_INTENDED_FOR_CONSUMPTION - * @see JMSTYPE_NOT_INTENDED_FOR_CONSUMPTION - * - */ - private class Producer extends Thread - { - private int counterSent = 0; - private Connection connection = null; - public boolean stop = false; - - public Producer(Connection connection) - { - this.connection = connection; - } - - public void run() { - try { - final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Topic queue = session.createTopic("test"); - - // wait for 10 seconds to allow consumer.receive to be run - // first - Thread.sleep(10000); - MessageProducer producer = session.createProducer(queue); - - while (!stop && (counterSent < testSize)) - { - // first send a message intended to be consumed .... - TextMessage message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ... - message.setJMSType(JMSTYPE_EATME); - //LOG.info("sending .... JMSType = " + message.getJMSType()); - producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000); - - counterSent++; - - // now send a message intended to be consumed by some other consumer in the the future - // ... we expect these messages to accrue in the queue - message = session.createTextMessage("*** Ill ....... Ini ***"); // alma mater ... - message.setJMSType(JMSTYPE_IGNOREME); - //LOG.info("sending .... JMSType = " + message.getJMSType()); - producer.send(message,DeliveryMode.NON_PERSISTENT,0,1800000); - - counterSent++; - } - - session.close(); - - } catch (Exception e) { - e.printStackTrace(); - } - LOG.info("producer thread complete ... " + counterSent + " messages sent to the queue"); - } - - public int getCount() - { - return this.counterSent; - } - } - - /** - * Helper class that will consume messages from the queue based on the supplied JMS selector. - * Thread will stop after the first receive(..) timeout, or once all expected messages have - * been received (see testSize). If the thread stops due to a timeout, it is experiencing the - * delivery pause that is symptomatic of a bug in the broker. - */ - private class Consumer extends Thread - { - protected int counterReceived = 0; - private Connection connection = null; - private String jmsSelector = null; - private boolean deliveryHalted = false; - - public Consumer(Connection connection, String jmsSelector) - { - this.connection = connection; - this.jmsSelector = jmsSelector; - } - - public void run() { - boolean testComplete = false; - try { - Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Topic queue = session.createTopic("test"); - MessageConsumer consumer = null; - if (null != this.jmsSelector) - { - consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'"); - } - else - { - consumer = session.createConsumer(queue); - } - - while (!deliveryHalted && (counterReceived < testSize)) - { - TextMessage result = (TextMessage) consumer.receive(30000); - if (result != null) { - counterReceived++; - //System.out.println("consuming .... JMSType = " + result.getJMSType() - // + " received = " + counterReceived); - LOG.info("consuming .... JMSType = " + result.getJMSType() - + " received = " + counterReceived); - } else - { - LOG.info("consuming .... timeout while waiting for a message ." - + ".. broker must have stopped delivery ... received = " + counterReceived); - deliveryHalted = true; - } - } - session.close(); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - public int getCount() - { - return this.counterReceived; - } - - public boolean deliveryHalted() - { - return this.deliveryHalted; - } - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java deleted file mode 100644 index fa79d69..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Topic; -import java.util.concurrent.CountDownLatch; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.SessionImpl; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import junit.framework.TestCase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DispatchMultipleConsumersTest extends JmsTestBase { - private final static Logger logger = LoggerFactory.getLogger(DispatchMultipleConsumersTest.class); - Destination dest; - String destinationName = "TEST.Q"; - String msgStr = "Test text message"; - int messagesPerThread = 20; - int producerThreads = 50; - int consumerCount = 2; - AtomicInteger sentCount; - AtomicInteger consumedCount; - CountDownLatch producerLatch; - CountDownLatch consumerLatch; - String userName = ""; - String password = ""; - - @Override - protected void setUp() throws Exception { - super.setUp(); - dest = SessionImpl.asTopic(destinationName); - } - - private void resetCounters() { - sentCount = new AtomicInteger(0); - consumedCount = new AtomicInteger(0); - producerLatch = new CountDownLatch(producerThreads); - consumerLatch = new CountDownLatch(consumerCount); - } - - public void testDispatch1() { - for (int i = 1; i <= 5; i++) { - resetCounters(); - dispatch(); - assertEquals("Incorrect messages in Iteration " + i, sentCount.get() * consumerCount, consumedCount.get()); - } - } - - private void dispatch() { - startConsumers(); - startProducers(); - try { - producerLatch.await(); - consumerLatch.await(); - } catch (InterruptedException e) { - fail("test interrupted!"); - } - } - - private void startConsumers() { - HedwigConnectionFactoryImpl connFactory = new HedwigConnectionFactoryImpl(); - Connection conn; - try { - conn = connFactory.createConnection(userName, password); - conn.start(); - for (int i = 0; i < consumerCount; i++) { - ConsumerThread th = new ConsumerThread(conn, "ConsumerThread"+i); - th.start(); - } - } catch (JMSException e) { - logger.error("Failed to start consumers", e); - } - } - - private void startProducers() { - HedwigConnectionFactoryImpl connFactory = new HedwigConnectionFactoryImpl(); - for (int i = 0; i < producerThreads; i++) { - Thread th = new ProducerThread(connFactory, messagesPerThread, "ProducerThread"+i); - th.start(); - } - } - - private class ConsumerThread extends Thread { - private Session session; - private MessageConsumer consumer; - - public ConsumerThread(Connection conn, String name) { - super(); - this.setName(name); - logger.trace("Created new consumer thread:" + name); - try { - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createConsumer(dest); - } catch (JMSException e) { - logger.error("Failed to start consumer thread:" + name, e); - } - } - - @Override - public void run() { - int msgCount = 0; - int nullCount = 0; - while (true) { - try { - Message msg = consumer.receive(200); - if (msg == null) { - if (producerLatch.getCount() > 0) { - continue; - } - nullCount++; - if (nullCount > 10) { - //assume that we are not getting any more messages - break; - } else { - continue; - } - } else { - nullCount = 0; - } - // Thread.sleep(100); - if (logger.isTraceEnabled()) { - logger.trace("Message received:" + msg.getJMSMessageID()); - } - msgCount++; - } catch (JMSException e) { - logger.error("Failed to consume:", e); - /* - } catch (InterruptedException e) { - logger.error("Interrupted!", e); - */ - } - } - try { - consumer.close(); - } catch (JMSException e) { - logger.error("Failed to close consumer " + getName(), e); - } - consumedCount.addAndGet(msgCount); - consumerLatch.countDown(); - logger.trace("Consumed " + msgCount + " messages using thread " + getName()); - } - } - - private class ProducerThread extends Thread { - private int count; - private Connection conn; - private Session session; - private MessageProducer producer; - - public ProducerThread(HedwigConnectionFactoryImpl connFactory, int count, String name) { - super(); - this.count = count; - this.setName(name); - logger.trace("Created new producer thread:" + name); - try { - conn = connFactory.createConnection(); - conn.start(); - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(dest); - } catch (JMSException e) { - logger.error("Failed to start producer thread:" + name, e); - } - } - - @Override - public void run() { - int i = 0; - try { - for (; i < count; i++) { - producer.send(session.createTextMessage(msgStr)); - // Thread.sleep(500); - } - conn.close(); - } catch (JMSException e) { - logger.error(e.getMessage(), e); - /* - } catch (InterruptedException e) { - logger.error("Interrupted!", e); - */ - } - sentCount.addAndGet(i); - producerLatch.countDown(); - if (logger.isTraceEnabled()) { - logger.trace("Sent " + i + " messages from thread " + getName()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java deleted file mode 100644 index c4fa74d..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.usecases; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; - -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.activemq.test.TestSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DurableConsumerCloseAndReconnectTest extends TestSupport { - protected static final long RECEIVE_TIMEOUT = 5000L; - private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class); - - protected Connection connection; - private Session session; - private MessageConsumer consumer; - private MessageProducer producer; - private Destination destination; - private int messageCount; - - @Override - protected void setUp() throws Exception { - super.setUp(); - deleteAllMessages(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - deleteAllMessages(); - } - - private void deleteAllMessages() throws Exception { - HedwigConnectionFactoryImpl fac = new HedwigConnectionFactoryImpl(); - Connection dummyConnection = fac.createConnection(); - dummyConnection.start(); - dummyConnection.close(); - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - public void testCreateDurableConsumerCloseThenReconnect() throws Exception { - // force the server to stay up across both connection tests - Connection dummyConnection = createConnection(); - dummyConnection.start(); - - consumeMessagesDeliveredWhileConsumerClosed(); - - dummyConnection.close(); - - // now lets try again without one connection open - consumeMessagesDeliveredWhileConsumerClosed(); - } - - protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception { - makeConsumer(); - closeConsumer(); - - publish(); - - // wait a few moments for the close to really occur - Thread.sleep(1000); - - makeConsumer(); - - Message message = consumer.receive(RECEIVE_TIMEOUT); - assertTrue("Should have received a message!", message != null); - - closeConsumer(); - - LOG.info("Now lets create the consumer again and because we didn't ack, we should get it again"); - makeConsumer(); - - message = consumer.receive(RECEIVE_TIMEOUT); - assertTrue("Should have received a message!", message != null); - message.acknowledge(); - - closeConsumer(); - - LOG.info("Now lets create the consumer again and because we did ack, we should not get it again"); - makeConsumer(); - - message = consumer.receive(2000); - assertTrue("Should have no more messages left!", message == null); - - closeConsumer(); - - LOG.info("Lets publish one more message now"); - publish(); - - makeConsumer(); - message = consumer.receive(RECEIVE_TIMEOUT); - assertTrue("Should have received a message!", message != null); - message.acknowledge(); - - closeConsumer(); - } - - protected void publish() throws Exception { - connection = createConnection(); - connection.start(); - - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - destination = createDestination(); - - producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - TextMessage msg = session.createTextMessage("This is a test: " + messageCount++); - producer.send(msg); - - producer.close(); - producer = null; - closeSession(); - } - - protected Destination createDestination() throws JMSException { - if (isTopic()) { - return session.createTopic(getSubject()); - } else { - return session.createTopic(getSubject()); - } - } - - protected boolean isTopic() { - return true; - } - - protected void closeConsumer() throws JMSException { - consumer.close(); - consumer = null; - closeSession(); - } - - protected void closeSession() throws JMSException { - session.close(); - session = null; - connection.close(); - connection = null; - } - - protected void makeConsumer() throws Exception { - String durableName = getName(); - String clientID = getSubject(); - LOG.info("Creating a durable subscribe for clientID: " + clientID + " and durable name: " + durableName); - createSession(clientID); - consumer = createConsumer(durableName); - } - - private MessageConsumer createConsumer(String durableName) throws JMSException { - if (destination instanceof Topic) { - return session.createDurableSubscriber((Topic)destination, durableName); - } else { - return session.createConsumer(destination); - } - } - - protected void createSession(String clientID) throws Exception { - connection = createConnection(); - connection.setClientID(clientID); - connection.start(); - - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - destination = createDestination(); - } -}