http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java deleted file mode 100644 index 23a9121..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java +++ /dev/null @@ -1,692 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; -import javax.jms.TextMessage; - - - -import org.apache.activemq.test.JmsResourceProvider; -import org.apache.activemq.test.TestSupport; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener { - - private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class); - private static final int MESSAGE_COUNT = 5; - private static final String MESSAGE_TEXT = "message"; - - protected ConnectionFactory connectionFactory; - protected Connection connection; - protected Session session; - protected MessageConsumer consumer; - protected MessageProducer producer; - protected JmsResourceProvider resourceProvider; - protected Destination destination; - protected int batchCount = 10; - // protected int batchSize = 20; - - // for message listener test - private List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT); - private List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT); - private boolean resendPhase; - - public JmsTransactionTestSupport() { - super(); - } - - public JmsTransactionTestSupport(String name) { - super(name); - } - - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - resourceProvider = getJmsResourceProvider(); - topic = resourceProvider.isTopic(); - // We will be using transacted sessions. - setSessionTransacted(); - connectionFactory = newConnectionFactory(); - reconnect(); - } - - protected void setSessionTransacted() { - resourceProvider.setTransacted(true); - } - - protected ConnectionFactory newConnectionFactory() throws Exception { - return resourceProvider.createConnectionFactory(); - } - - protected void beginTx() throws Exception { - //no-op for local tx - } - - protected void commitTx() throws Exception { - session.commit(); - } - - protected void rollbackTx() throws Exception { - session.rollback(); - } - - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - LOG.info("Closing down connection"); - - session.close(); - session = null; - connection.close(); - connection = null; - LOG.info("Connection closed."); - super.tearDown(); - } - - protected abstract JmsResourceProvider getJmsResourceProvider(); - - /** - * Sends a batch of messages and validates that the messages are received. - * - * @throws Exception - */ - public void testSendReceiveTransactedBatches() throws Exception { - - final int batchSize = ((HedwigConnectionImpl) connection).getHedwigClientConfig() - .getMaximumOutstandingMessages() - 1; - - TextMessage message = session.createTextMessage("Batch Message"); - for (int j = 0; j < batchCount; j++) { - LOG.info("Producing bacth " + j + " of " + batchSize + " messages"); - - beginTx(); - for (int i = 0; i < batchSize; i++) { - producer.send(message); - } - messageSent(); - commitTx(); - LOG.info("Consuming bacth " + j + " of " + batchSize + " messages"); - - beginTx(); - for (int i = 0; i < batchSize; i++) { - message = (TextMessage)consumer.receive(1000 * 5); - assertNotNull("Received only " + i + " messages in batch " + j, message); - assertEquals("Batch Message", message.getText()); - } - - commitTx(); - } - } - - protected void messageSent() throws Exception { - } - - /** - * Sends a batch of messages and validates that the rollbacked message was - * not consumed. - * - * @throws Exception - */ - public void testSendRollback() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // sends a message - beginTx(); - producer.send(outbound[0]); - commitTx(); - - // sends a message that gets rollbacked - beginTx(); - producer.send(session.createTextMessage("I'm going to get rolled back.")); - rollbackTx(); - - // sends a message - beginTx(); - producer.send(outbound[1]); - commitTx(); - - // receives the first message - beginTx(); - ArrayList<Message> messages = new ArrayList<Message>(); - LOG.info("About to consume message 1"); - Message message = consumer.receive(1000); - messages.add(message); - LOG.info("Received: " + message); - - // receives the second message - LOG.info("About to consume message 2"); - message = consumer.receive(4000); - messages.add(message); - LOG.info("Received: " + message); - - // validates that the rollbacked was not consumed - commitTx(); - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Rollback did not work.", outbound, inbound); - } - - /** - * spec section 3.6 acking a message with automation acks has no effect. - * @throws Exception - */ - public void testAckMessageInTx() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message")}; - - // sends a message - beginTx(); - producer.send(outbound[0]); - outbound[0].acknowledge(); - commitTx(); - outbound[0].acknowledge(); - - // receives the first message - beginTx(); - ArrayList<Message> messages = new ArrayList<Message>(); - LOG.info("About to consume message 1"); - Message message = consumer.receive(1000); - messages.add(message); - LOG.info("Received: " + message); - - // validates that the rollbacked was not consumed - commitTx(); - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Message not delivered.", outbound, inbound); - } - - /** - * Sends a batch of messages and validates that the message sent before - * session close is not consumed. - * - * This test only works with local transactions, not xa. - * @throws Exception - */ - public void testSendSessionClose() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // sends a message - beginTx(); - producer.send(outbound[0]); - commitTx(); - - // sends a message that gets rollbacked - beginTx(); - producer.send(session.createTextMessage("I'm going to get rolled back.")); - consumer.close(); - - reconnectSession(); - - // sends a message - producer.send(outbound[1]); - commitTx(); - - // receives the first message - ArrayList<Message> messages = new ArrayList<Message>(); - LOG.info("About to consume message 1"); - beginTx(); - Message message = consumer.receive(1000); - messages.add(message); - LOG.info("Received: " + message); - - // receives the second message - LOG.info("About to consume message 2"); - message = consumer.receive(4000); - messages.add(message); - LOG.info("Received: " + message); - - // validates that the rollbacked was not consumed - commitTx(); - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Rollback did not work.", outbound, inbound); - } - - /** - * Sends a batch of messages and validates that the message sent before - * session close is not consumed. - * - * @throws Exception - */ - public void testSendSessionAndConnectionClose() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // sends a message - beginTx(); - producer.send(outbound[0]); - commitTx(); - - // sends a message that gets rollbacked - beginTx(); - producer.send(session.createTextMessage("I'm going to get rolled back.")); - consumer.close(); - session.close(); - - reconnect(); - - // sends a message - beginTx(); - producer.send(outbound[1]); - commitTx(); - - // receives the first message - ArrayList<Message> messages = new ArrayList<Message>(); - LOG.info("About to consume message 1"); - beginTx(); - Message message = consumer.receive(1000); - messages.add(message); - LOG.info("Received: " + message); - - // receives the second message - LOG.info("About to consume message 2"); - message = consumer.receive(4000); - messages.add(message); - LOG.info("Received: " + message); - - // validates that the rollbacked was not consumed - commitTx(); - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Rollback did not work.", outbound, inbound); - } - - /** - * Sends a batch of messages and validates that the rollbacked message was - * redelivered. - * - * @throws Exception - */ - public void testReceiveRollback() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // lets consume any outstanding messages from prev test runs - beginTx(); - while (consumer.receive(1000) != null) { - } - commitTx(); - - // sent both messages - beginTx(); - producer.send(outbound[0]); - producer.send(outbound[1]); - commitTx(); - - LOG.info("Sent 0: " + outbound[0]); - LOG.info("Sent 1: " + outbound[1]); - - ArrayList<Message> messages = new ArrayList<Message>(); - beginTx(); - Message message = consumer.receive(1000); - messages.add(message); - assertEquals(outbound[0], message); - commitTx(); - - // rollback so we can get that last message again. - beginTx(); - message = consumer.receive(1000); - assertNotNull(message); - assertEquals(outbound[1], message); - rollbackTx(); - - // Consume again.. the prev message should - // get redelivered. - beginTx(); - message = consumer.receive(5000); - assertNotNull("Should have re-received the message again!", message); - messages.add(message); - commitTx(); - - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Rollback did not work", outbound, inbound); - } - - /** - * Sends a batch of messages and validates that the rollbacked message was - * redelivered. - * - * @throws Exception - */ - public void testReceiveTwoThenRollback() throws Exception { - TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // lets consume any outstanding messages from prev test runs - beginTx(); - while (consumer.receive(1000) != null) { - } - commitTx(); - - // - beginTx(); - producer.send(outbound[0]); - producer.send(outbound[1]); - commitTx(); - - LOG.info("Sent 0: " + outbound[0]); - LOG.info("Sent 1: " + outbound[1]); - - ArrayList<Message> messages = new ArrayList<Message>(); - beginTx(); - Message message = consumer.receive(1000); - assert message instanceof TextMessage; - assertEquals(outbound[0].getText(), ((TextMessage) message).getText()); - - message = consumer.receive(1000); - assertNotNull(message); - assert message instanceof TextMessage; - assertEquals(outbound[1].getText(), ((TextMessage) message).getText()); - rollbackTx(); - - // Consume again.. the prev message should - // get redelivered. - beginTx(); - message = consumer.receive(5000); - assertNotNull("Should have re-received the first message again!", message); - messages.add(message); - assert message instanceof TextMessage; - assertEquals(outbound[0].getText(), ((TextMessage) message).getText()); - message = consumer.receive(5000); - assertNotNull("Should have re-received the second message again!", message); - messages.add(message); - assert message instanceof TextMessage; - assertEquals(outbound[1].getText(), ((TextMessage) message).getText()); - - assertNull(consumer.receiveNoWait()); - commitTx(); - - Message inbound[] = new Message[messages.size()]; - messages.toArray(inbound); - assertTextMessagesEqual("Rollback did not work", outbound, inbound); - } - - /** - * Sends a batch of messages and validates that the rollbacked message was - * not consumed. - * - * @throws Exception - */ - public void testSendReceiveWithPrefetchOne() throws Exception { - Message[] outbound = new Message[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message"), - session.createTextMessage("Third Message"), - session.createTextMessage("Fourth Message")}; - - beginTx(); - for (int i = 0; i < outbound.length; i++) { - // sends a message - producer.send(outbound[i]); - } - commitTx(); - - // receives the first message - beginTx(); - for (int i = 0; i < outbound.length; i++) { - LOG.info("About to consume message 1"); - Message message = consumer.receive(1000); - assertNotNull(message); - LOG.info("Received: " + message); - } - - // validates that the rollbacked was not consumed - commitTx(); - } - - /** - * Perform the test that validates if the rollbacked message was redelivered - * multiple times. - * - * @throws Exception - */ - public void testReceiveTwoThenRollbackManyTimes() throws Exception { - for (int i = 0; i < 5; i++) { - testReceiveTwoThenRollback(); - } - } - - /** - * Sends a batch of messages and validates that the rollbacked message was - * not consumed. This test differs by setting the message prefetch to one. - * - * @throws Exception - */ - public void testSendRollbackWithPrefetchOfOne() throws Exception { - testSendRollback(); - } - - /** - * Sends a batch of messages and and validates that the rollbacked message - * was redelivered. This test differs by setting the message prefetch to - * one. - * - * @throws Exception - */ - public void testReceiveRollbackWithPrefetchOfOne() throws Exception { - testReceiveRollback(); - } - - /** - * Tests if the messages can still be received if the consumer is closed - * (session is not closed). - * - * @throws Exception see http://jira.codehaus.org/browse/AMQ-143 - */ - public void testCloseConsumerBeforeCommit() throws Exception { - TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), - session.createTextMessage("Second Message")}; - - // lets consume any outstanding messages from prev test runs - beginTx(); - while (consumer.receiveNoWait() != null) { - } - - commitTx(); - - // sends the messages - beginTx(); - producer.send(outbound[0]); - producer.send(outbound[1]); - commitTx(); - LOG.info("Sent 0: " + outbound[0]); - LOG.info("Sent 1: " + outbound[1]); - - beginTx(); - TextMessage message = (TextMessage)consumer.receive(1000); - assertEquals(outbound[0].getText(), message.getText()); - // Close the consumer before the commit. This should not cause the - // received message - // to rollback. - consumer.close(); - commitTx(); - reconnectSession(); - - // Create a new consumer - consumer = resourceProvider.createConsumer(session, destination); - LOG.info("Created consumer: " + consumer); - - beginTx(); - message = (TextMessage)consumer.receive(1000); - assert null != message; - assertEquals(outbound[1].getText(), message.getText()); - commitTx(); - } - - public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { - ArrayList<String> list = new ArrayList<String>(); - list.add("First"); - Message outbound = session.createObjectMessage(list); - outbound.setStringProperty("foo", "abc"); - - beginTx(); - producer.send(outbound); - commitTx(); - - LOG.info("About to consume message 1"); - beginTx(); - Message message = consumer.receive(5000); - - List<String> body = assertReceivedObjectMessageWithListBody(message); - - // now lets try mutate it - try { - message.setStringProperty("foo", "def"); - fail("Cannot change properties of the object!"); - } catch (JMSException e) { - LOG.info("Caught expected exception: " + e, e); - } - body.clear(); - body.add("This should never be seen!"); - rollbackTx(); - - beginTx(); - message = consumer.receive(5000); - List<String> secondBody = assertReceivedObjectMessageWithListBody(message); - assertNotSame("Second call should return a different body", secondBody, body); - commitTx(); - } - - @SuppressWarnings("unchecked") - protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException { - assertNotNull("Should have received a message!", message); - assertEquals("foo header", "abc", message.getStringProperty("foo")); - - assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage); - ObjectMessage objectMessage = (ObjectMessage)message; - List<String> body = (List<String>)objectMessage.getObject(); - LOG.info("Received body: " + body); - - assertEquals("Size of list should be 1", 1, body.size()); - assertEquals("element 0 of list", "First", body.get(0)); - return body; - } - - /** - * Recreates the connection. - * - * @throws JMSException - */ - protected void reconnect() throws Exception { - - if (connection != null) { - // Close the prev connection. - connection.close(); - } - session = null; - connection = resourceProvider.createConnection(connectionFactory); - reconnectSession(); - connection.start(); - } - - /** - * Recreates the connection. - * - * @throws JMSException - */ - protected void reconnectSession() throws JMSException { - if (session != null) { - session.close(); - } - - session = resourceProvider.createSession(connection); - destination = resourceProvider.createDestination(session, getSubject()); - producer = resourceProvider.createProducer(session, destination); - consumer = resourceProvider.createConsumer(session, destination); - } - - //This test won't work with xa tx so no beginTx() has been added. - public void testMessageListener() throws Exception { - consumer.setMessageListener(this); - // send messages - for (int i = 0; i < MESSAGE_COUNT; i++) { - producer.send(session.createTextMessage(MESSAGE_TEXT + i)); - } - commitTx(); - // wait receive - waitReceiveUnack(); - assertEquals(unackMessages.size(), MESSAGE_COUNT); - // resend phase - waitReceiveAck(); - assertEquals(ackMessages.size(), MESSAGE_COUNT); - // should no longer re-receive - consumer.setMessageListener(null); - assertNull(consumer.receive(500)); - reconnect(); - } - - public void onMessage(Message message) { - if (!resendPhase) { - unackMessages.add(message); - if (unackMessages.size() == MESSAGE_COUNT) { - try { - rollbackTx(); - resendPhase = true; - } catch (Exception e) { - e.printStackTrace(); - } - } - } else { - ackMessages.add(message); - if (ackMessages.size() == MESSAGE_COUNT) { - try { - commitTx(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - - private void waitReceiveUnack() throws Exception { - for (int i = 0; i < 100 && !resendPhase; i++) { - Thread.sleep(100); - } - assertTrue(resendPhase); - } - - private void waitReceiveAck() throws Exception { - for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) { - Thread.sleep(100); - } - assertFalse(ackMessages.size() < MESSAGE_COUNT); - } -}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java b/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java deleted file mode 100644 index 5593daf..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/LoadTestBurnIn.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.io.IOException; - -import org.apache.hedwig.jms.MessagingSessionFacade; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -import junit.framework.Test; - - - -import javax.jms.Destination; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Small burn test moves sends a moderate amount of messages through the broker, - * to checking to make sure that the broker does not lock up after a while of - * sustained messaging. - */ -public class LoadTestBurnIn extends JmsTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(LoadTestBurnIn.class); - - public Destination destination; - public int deliveryMode; - public MessagingSessionFacade.DestinationType destinationType; - public boolean durableConsumer; - public int messageCount = 50000; - public int messageSize = 1024; - - public static Test suite() { - return suite(LoadTestBurnIn.class); - } - - protected void setUp() throws Exception { - LOG.info("Start: " + getName()); - super.setUp(); - } - - protected void tearDown() throws Exception { - try { - super.tearDown(); - } catch (Throwable e) { - e.printStackTrace(System.out); - } finally { - LOG.info("End: " + getName()); - } - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { - return new HedwigConnectionFactoryImpl(); - } - - public void initCombosForTestSendReceive() { - addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), - Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destinationType", new Object[] {MessagingSessionFacade.DestinationType.TOPIC}); - addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE}); - addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102), - Integer.valueOf(103), Integer.valueOf(104), - Integer.valueOf(105), Integer.valueOf(106), - Integer.valueOf(107), Integer.valueOf(108)}); - } - - public void testSendReceive() throws Exception { - - // Durable consumer combination is only valid with topics - if (durableConsumer && destinationType != MessagingSessionFacade.DestinationType.TOPIC) { - return; - } - - if (null == connection.getClientID()) connection.setClientID(getName()); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - destination = createDestination(session, destinationType); - MessageConsumer consumer; - if (durableConsumer) { - consumer = session.createDurableSubscriber((Topic)destination, "sub1:" - + System.currentTimeMillis()); - } else { - consumer = session.createConsumer(destination); - } - profilerPause("Ready: "); - - final CountDownLatch producerDoneLatch = new CountDownLatch(1); - - // Send the messages, async - new Thread() { - public void run() { - Connection connection2 = null; - try { - connection2 = factory.createConnection(); - Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - for (int i = 0; i < messageCount; i++) { - BytesMessage m = session.createBytesMessage(); - m.writeBytes(new byte[messageSize]); - producer.send(m); - } - producer.close(); - } catch (JMSException e) { - e.printStackTrace(); - } finally { - safeClose(connection2); - producerDoneLatch.countDown(); - } - - } - }.start(); - - // Make sure all the messages were delivered. - Message message = null; - for (int i = 0; i < messageCount; i++) { - message = consumer.receive(5000); - assertNotNull("Did not get message: " + i, message); - } - - profilerPause("Done: "); - - assertNull(consumer.receiveNoWait()); - message.acknowledge(); - - // Make sure the producer thread finishes. - assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS)); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java deleted file mode 100644 index 735d701..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.SessionImpl; -import java.util.ArrayList; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import javax.jms.Destination; -import org.apache.hedwig.jms.message.MessageImpl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MessageListenerRedeliveryTest extends JmsTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class); - - private Connection connection; - - protected void setUp() throws Exception { - super.setUp(); - connection = createConnection(); - } - - /** - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - super.tearDown(); - } - - protected Connection createConnection() throws Exception { - HedwigConnectionFactoryImpl factory = new HedwigConnectionFactoryImpl(); - return factory.createConnection(); - } - - private class TestMessageListener implements MessageListener { - - public int counter = 0; - private Session session; - static final int ROLLBACK_COUNT = 5; - - public TestMessageListener(Session session) { - this.session = session; - } - - public void onMessage(Message message) { - try { - LOG.info("Message Received: " + message); - counter++; - if (counter < ROLLBACK_COUNT) { - LOG.info("Message Rollback."); - session.rollback(); - } else { - LOG.info("Message Commit."); - message.acknowledge(); - session.commit(); - } - } catch (JMSException e) { - LOG.error("Error when rolling back transaction"); - } - } - } - - public void testTopicRollbackConsumerListener() throws JMSException { - connection.start(); - - Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - MessageProducer producer = createProducer(session, queue); - Message message = createTextMessage(session); - MessageConsumer consumer = session.createConsumer(queue); - TestMessageListener listener = new TestMessageListener(session); - consumer.setMessageListener(listener); - producer.send(message); - session.commit(); - - - MessageConsumer mc = (MessageConsumer)consumer; - - - try { - Thread.sleep(500); - } catch (InterruptedException e) { - - } - - assertEquals(TestMessageListener.ROLLBACK_COUNT, listener.counter); - - session.close(); - } - - public void testTopicSessionListenerExceptionRetry() throws Exception { - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic queue = session.createTopic("queue-" + getName()); - Message message = createTextMessage(session, "1"); - MessageConsumer consumer = session.createConsumer(queue); - - final int maxDeliveries = 2; - final CountDownLatch gotMessage = new CountDownLatch(2); - final AtomicInteger count = new AtomicInteger(0); - final ArrayList<String> received = new ArrayList<String>(); - consumer.setMessageListener(new MessageListener() { - public void onMessage(Message message) { - LOG.info("Message Received: " + message); - try { - received.add(((TextMessage) message).getText()); - } catch (JMSException e) { - e.printStackTrace(); - fail(e.toString()); - } - if (count.incrementAndGet() < maxDeliveries) { - throw new RuntimeException(getName() + " force a redelivery"); - } - // new blood - count.set(0); - gotMessage.countDown(); - } - }); - - MessageProducer producer = createProducer(session, queue); - producer.send(message); - message = createTextMessage(session, "2"); - producer.send(message); - - assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS)); - - for (int i=0; i<maxDeliveries; i++) { - assertEquals("got first redelivered: " + i, "1", received.get(i)); - } - for (int i=maxDeliveries; i<maxDeliveries*2; i++) { - assertEquals("got first redelivered: " + i, "2", received.get(i)); - } - session.close(); - } - - private TextMessage createTextMessage(Session session, String text) throws JMSException { - return session.createTextMessage(text); - } - private TextMessage createTextMessage(Session session) throws JMSException { - return session.createTextMessage("Hello"); - } - - private MessageProducer createProducer(Session session, Destination queue) throws JMSException { - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(getDeliveryMode()); - return producer; - } - - protected int getDeliveryMode() { - return DeliveryMode.PERSISTENT; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java deleted file mode 100644 index 002fea0..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import org.apache.hedwig.jms.SessionImpl; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.Test; - - - -/** - * Test cases used to test the JMS message exclusive consumers. - * - * - */ -public class RedeliveryPolicyTest extends JmsTestSupport { - - public static Test suite() { - return suite(RedeliveryPolicyTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - - /** - * @throws Exception - */ - public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { - - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Topic destination = SessionImpl.asTopic(getName()); - MessageProducer producer = session.createProducer(destination); - - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - producer.send(session.createTextMessage("1st")); - producer.send(session.createTextMessage("2nd")); - session.commit(); - - TextMessage m; - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - // No delay on first rollback.. - m = (TextMessage)consumer.receive(100); - assertNotNull(m); - session.rollback(); - - m = (TextMessage)consumer.receive(700); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - // Show re-delivery delay is incrementing exponentially - m = (TextMessage)consumer.receive(100); - assertNotNull(m); - assertEquals("1st", m.getText()); - - m = (TextMessage)consumer.receive(100); - assertNotNull(m); - assertEquals("2nd", m.getText()); - } - - - /** - * @throws Exception - */ - public void testInfiniteMaximumNumberOfRedeliveries() throws Exception { - - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Topic destination = SessionImpl.asTopic("TEST"); - MessageProducer producer = session.createProducer(destination); - - MessageConsumer consumer = session.createConsumer(destination); - - // Send the messages - producer.send(session.createTextMessage("1st")); - producer.send(session.createTextMessage("2nd")); - session.commit(); - - TextMessage m; - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - //we should be able to get the 1st message redelivered until a session.commit is called - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.rollback(); - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("1st", m.getText()); - session.commit(); - - m = (TextMessage)consumer.receive(1000); - assertNotNull(m); - assertEquals("2nd", m.getText()); - session.commit(); - - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java deleted file mode 100644 index fd838ab..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/TestSupport.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import java.io.File; -import java.io.IOException; -import java.util.Enumeration; -import java.util.Map; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.TextMessage; - - -import org.apache.hedwig.jms.SessionImpl; -import org.apache.hedwig.jms.message.MessageImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; - - -/** - * Useful base class for unit test cases - */ -public abstract class TestSupport extends CombinationTestSupport { - - protected HedwigConnectionFactoryImpl connectionFactory; - protected boolean topic = true; - - protected MessageImpl createMessage() { - return new MessageImpl(null); - } - - protected Destination createDestination(String subject) { - if (topic) { - return SessionImpl.asTopic(subject); - } else { - throw new IllegalArgumentException("Queue NOT supported"); - } - } - - protected Destination createDestination() { - return createDestination(getDestinationString()); - } - - /** - * Returns the name of the destination used in this test case - */ - protected String getDestinationString() { - return getClass().getName() + "." + getName(true); - } - - /** - * @param messsage - * @param firstSet - * @param secondSet - */ - protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) - throws JMSException { - assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length); - for (int i = 0; i < secondSet.length; i++) { - TextMessage m1 = (TextMessage)firstSet[i]; - TextMessage m2 = (TextMessage)secondSet[i]; - assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1 - + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); - assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1 - + "}, but was {" + m2 + "}", m1.getText(), m2.getText()); - } - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - /** - * Factory method to create a new connection - */ - protected Connection createConnection() throws Exception { - return createConnection(true); - } - - protected Connection createConnection(boolean setClientId) throws Exception { - HedwigConnectionImpl connection = getConnectionFactory().createConnection(); - if (setClientId) connection.setClientID(getName()); - return connection; - } - - public HedwigConnectionFactoryImpl getConnectionFactory() throws Exception { - if (connectionFactory == null) { - connectionFactory = createConnectionFactory(); - assertTrue("Should have created a connection factory!", connectionFactory != null); - } - return connectionFactory; - } - - protected String getConsumerSubject() { - return getSubject(); - } - - protected String getProducerSubject() { - return getSubject(); - } - - protected String getSubject() { - return getName(); - } - - public static void recursiveDelete(File f) { - if (f.isDirectory()) { - File[] files = f.listFiles(); - for (int i = 0; i < files.length; i++) { - recursiveDelete(files[i]); - } - } - f.delete(); - } - - public static void removeMessageStore() { - if (System.getProperty("activemq.store.dir") != null) { - recursiveDelete(new File(System.getProperty("activemq.store.dir"))); - } - if (System.getProperty("derby.system.home") != null) { - recursiveDelete(new File(System.getProperty("derby.system.home"))); - } - } - - /** - * Test if base directory contains spaces - */ - protected void assertBaseDirectoryContainsSpaces() { - assertFalse("Base directory cannot contain spaces.", - new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" ")); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java deleted file mode 100644 index 4b12ccc..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/TimeStampTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import javax.jms.Topic; -import javax.jms.Connection; - -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - - -public class TimeStampTest extends JmsTestBase { - public void test() throws Exception { - // Create a ConnectionFactory - HedwigConnectionFactoryImpl connectionFactory = - new HedwigConnectionFactoryImpl(); - - // Create a Connection - Connection connection = connectionFactory.createConnection(); - connection.start(); - - // Create a Session - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - // Create the destination Queue - Destination destination = session.createTopic("TEST.FOO"); - - // Create a MessageProducer from the Session to the Topic or Queue - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - // Create a MessageConsumer from the Session to the Topic or Queue - MessageConsumer consumer = session.createConsumer(destination); - // Create a messages - Message sentMessage = session.createMessage(); - - // Tell the producer to send the message - long beforeSend = System.currentTimeMillis(); - producer.send(sentMessage); - long afterSend = System.currentTimeMillis(); - - // assert message timestamp is in window - assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend); - - - // Wait for a message - Message receivedMessage = consumer.receive(1000); - - // assert we got the same message ID we sent - assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); - - // assert message timestamp is in window - assertTrue("JMS Message Timestamp should be set during the send method: \n" + " beforeSend = " - + beforeSend + "\n" + " getJMSTimestamp = " - + receivedMessage.getJMSTimestamp() + "\n" + " afterSend = " - + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp() - && receivedMessage.getJMSTimestamp() <= afterSend); - - // assert message timestamp is unchanged - assertEquals("JMS Message Timestamp of recieved message should be the same as the sent message\n ", - sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp()); - - // Clean up - producer.close(); - consumer.close(); - session.close(); - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java deleted file mode 100644 index 1d18724..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleConsumer.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The SimpleQueueReceiver class consists only of a main method, - * which fetches one or more messages from a queue using - * synchronous message delivery. Run this program in conjunction - * with SimpleQueueSender. Specify a queue name on the command - * line when you run the program. - */ -package org.apache.activemq.demo; - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -/** - * A simple polymorphic JMS consumer which can work with Queues or Topics which - * uses JNDI to lookup the JMS connection factory and destination - */ -public final class SimpleConsumer { - - private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory - .getLog(SimpleConsumer.class); - - private SimpleConsumer() { - } - - /** - * @param args the queue used by the example - */ - public static void main(String[] args) { - String destinationName = null; - Context jndiContext = null; - ConnectionFactory connectionFactory = null; - Connection connection = null; - Session session = null; - Destination destination = null; - MessageConsumer consumer = null; - - /* - * Read destination name from command line and display it. - */ - if (args.length != 1) { - LOG.info("Usage: java SimpleConsumer <destination-name>"); - System.exit(1); - } - destinationName = args[0]; - LOG.info("Destination name is " + destinationName); - - /* - * Create a JNDI API InitialContext object - */ - try { - jndiContext = new InitialContext(); - } catch (NamingException e) { - LOG.info("Could not create JNDI API " + "context: " + e.toString()); - System.exit(1); - } - - /* - * Look up connection factory and destination. - */ - try { - connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory"); - destination = (Destination)jndiContext.lookup(destinationName); - } catch (NamingException e) { - LOG.info("JNDI API lookup failed: " + e.toString()); - System.exit(1); - } - - /* - * Create connection. Create session from connection; false means - * session is not transacted. Create receiver, then start message - * delivery. Receive all text messages from destination until a non-text - * message is received indicating end of message stream. Close - * connection. - */ - try { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createConsumer(destination); - connection.start(); - while (true) { - Message m = consumer.receive(1); - if (m != null) { - if (m instanceof TextMessage) { - TextMessage message = (TextMessage)m; - LOG.info("Reading message: " + message.getText()); - } else { - break; - } - } - } - } catch (JMSException e) { - LOG.info("Exception occurred: " + e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException e) { - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java b/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java deleted file mode 100644 index 4facc3d..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/demo/SimpleProducer.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The SimpleQueueSender class consists only of a main method, - * which sends several messages to a queue. - * - * Run this program in conjunction with SimpleQueueReceiver. - * Specify a queue name on the command line when you run the - * program. By default, the program sends one message. Specify - * a number after the queue name to send that number of messages. - */ -package org.apache.activemq.demo; - -// START SNIPPET: demo - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A simple polymorphic JMS producer which can work with Queues or Topics which - * uses JNDI to lookup the JMS connection factory and destination - */ -public final class SimpleProducer { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleProducer.class); - - private SimpleProducer() { - } - - /** - * @param args the destination name to send to and optionally, the number of - * messages to send - */ - public static void main(String[] args) { - Context jndiContext = null; - ConnectionFactory connectionFactory = null; - Connection connection = null; - Session session = null; - Destination destination = null; - MessageProducer producer = null; - String destinationName = null; - final int numMsgs; - - if ((args.length < 1) || (args.length > 2)) { - LOG.info("Usage: java SimpleProducer <destination-name> [<number-of-messages>]"); - System.exit(1); - } - destinationName = args[0]; - LOG.info("Destination name is " + destinationName); - if (args.length == 2) { - numMsgs = (new Integer(args[1])).intValue(); - } else { - numMsgs = 1; - } - - /* - * Create a JNDI API InitialContext object - */ - try { - jndiContext = new InitialContext(); - } catch (NamingException e) { - LOG.info("Could not create JNDI API context: " + e.toString()); - System.exit(1); - } - - /* - * Look up connection factory and destination. - */ - try { - connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory"); - destination = (Destination)jndiContext.lookup(destinationName); - } catch (NamingException e) { - LOG.info("JNDI API lookup failed: " + e); - System.exit(1); - } - - /* - * Create connection. Create session from connection; false means - * session is not transacted. Create sender and text message. Send - * messages, varying text slightly. Send end-of-messages message. - * Finally, close connection. - */ - try { - connection = connectionFactory.createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(destination); - TextMessage message = session.createTextMessage(); - for (int i = 0; i < numMsgs; i++) { - message.setText("This is message " + (i + 1)); - LOG.info("Sending message: " + message.getText()); - producer.send(message); - } - - /* - * Send a non-text control message indicating end of messages. - */ - producer.send(session.createMessage()); - } catch (JMSException e) { - LOG.info("Exception occurred: " + e); - } finally { - if (connection != null) { - try { - connection.close(); - } catch (JMSException e) { - } - } - } - } -} - -// END SNIPPET: demo http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java deleted file mode 100644 index c5d9b0c..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadClient.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.load; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.perf.PerfRate; -import org.apache.hedwig.jms.LRUCacheSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LoadClient implements Runnable{ - private static final Logger LOG = LoggerFactory.getLogger(LoadClient.class); - protected static int SLEEP_TIME = 2; - protected String name; - protected ConnectionFactory factory; - protected Connection connection; - protected Destination startDestination; - protected Destination nextDestination; - protected Session session; - protected MessageConsumer consumer; - protected MessageProducer producer; - protected PerfRate rate = new PerfRate(); - protected int deliveryMode = DeliveryMode.PERSISTENT; - protected boolean connectionPerMessage = false; - protected boolean running; - protected int timeout = 10000; - - public LoadClient(String name,ConnectionFactory factory) { - this.name=name; - this.factory = factory; - } - - public synchronized void start() throws JMSException { - if (!running) { - rate.reset(); - running = true; - if (!connectionPerMessage) { - connection = factory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session.createConsumer(getConsumeDestination()); - producer = session.createProducer(getSendDestination()); - producer.setDeliveryMode(this.deliveryMode); - } - Thread t = new Thread(this); - t.setName(name); - t.start(); - } - } - - public void stop() throws JMSException, InterruptedException { - running = false; - if(connection != null) { - connection.stop(); - } - } - - public void run() { - try { - while (running) { - String result = consume(); - if(result != null) { - send(result); - rate.increment(); - } - else if (running) { - LOG.error(name + " Failed to consume!"); - } - } - } catch (Throwable e) { - e.printStackTrace(); - } - } - - private LRUCacheSet<String> messageIdCache = new LRUCacheSet<String>(2048, false); - protected String consume() throws Exception { - Connection con = null; - MessageConsumer c = consumer; - if (connectionPerMessage){ - con = factory.createConnection(); - con.start(); - Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - c = s.createConsumer(getConsumeDestination()); - } - TextMessage result = (TextMessage) c.receive(timeout); - if (result != null) { - if (messageIdCache.contains(result.getJMSMessageID())) { - throw new JMSException("Received duplicate " + result.getText()); - } - messageIdCache.add(result.getJMSMessageID()); - - if (connectionPerMessage) { - Thread.sleep(SLEEP_TIME);//give the broker a chance - con.close(); - } - } - return result != null ? result.getText() : null; - } - - protected void send(String text) throws Exception { - Connection con = connection; - MessageProducer p = producer; - Session s = session; - if (connectionPerMessage){ - con = factory.createConnection(); - con.start(); - s = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - p = s.createProducer(getSendDestination()); - p.setDeliveryMode(deliveryMode); - } - TextMessage message = s.createTextMessage(text); - p.send(message); - if (connectionPerMessage) { - Thread.sleep(SLEEP_TIME);//give the broker a chance - con.close(); - } - } - - - - public String getName() { - return name; - } - - - - public void setName(String name) { - this.name = name; - } - - - - public Destination getStartDestination() { - return startDestination; - } - - - - public void setStartDestination(Destination startDestination) { - this.startDestination = startDestination; - } - - - - public Destination getNextDestination() { - return nextDestination; - } - - - - public void setNextDestination(Destination nextDestination) { - this.nextDestination = nextDestination; - } - - - - public int getDeliveryMode() { - return deliveryMode; - } - - - - public void setDeliveryMode(int deliveryMode) { - this.deliveryMode = deliveryMode; - } - - - - public boolean isConnectionPerMessage() { - return connectionPerMessage; - } - - - - public void setConnectionPerMessage(boolean connectionPerMessage) { - this.connectionPerMessage = connectionPerMessage; - } - - - - public int getTimeout() { - return timeout; - } - - - - public void setTimeout(int timeout) { - this.timeout = timeout; - } - - protected Destination getSendDestination() { - return nextDestination; - } - - protected Destination getConsumeDestination() { - return startDestination; - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java deleted file mode 100644 index 8149814..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadController.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.load; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; - -public class LoadController extends LoadClient{ - private int numberOfBatches=1; - private int batchSize =1000; - private int count; - private final CountDownLatch stopped = new CountDownLatch(1); - - public LoadController(String name,ConnectionFactory factory) { - super(name,factory); - } - - public int awaitTestComplete() throws InterruptedException { - boolean complete = stopped.await(60*5,TimeUnit.SECONDS); - return count; - } - - public void stop() throws JMSException, InterruptedException { - running = false; - stopped.countDown(); - if (connection != null) { - this.connection.stop(); - } - } - - public void run() { - try { - for (int i = 0; i < numberOfBatches; i++) { - for (int j = 0; j < batchSize; j++) { - String payLoad = "batch[" + i + "]no:" + j; - send(payLoad); - } - for (int j = 0; j < batchSize; j++) { - String result = consume(); - if (result != null) { - count++; - rate.increment(); - } - } - } - } catch (Throwable e) { - e.printStackTrace(); - } finally { - stopped.countDown(); - } - } - - - public int getNumberOfBatches() { - return numberOfBatches; - } - - - public void setNumberOfBatches(int numberOfBatches) { - this.numberOfBatches = numberOfBatches; - } - - - public int getBatchSize() { - return batchSize; - } - - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - protected Destination getSendDestination() { - return startDestination; - } - - protected Destination getConsumeDestination() { - return nextDestination; - } -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java deleted file mode 100644 index 66880af..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/load/LoadTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.load; - - -import javax.jms.Topic; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Session; -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// For now, ignore it ... -@Ignore -public class LoadTest extends JmsTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class); - - protected LoadController controller; - protected LoadClient[] clients; - protected ConnectionFactory factory; - protected Destination destination; - protected int numberOfClients = 50; - protected int deliveryMode = DeliveryMode.PERSISTENT; - protected int batchSize = 1000; - protected int numberOfBatches = 10; - protected int timeout = Integer.MAX_VALUE; - protected boolean connectionPerMessage = false; - protected Connection managementConnection; - protected Session managementSession; - - /** - * Sets up a test where the producer and consumer have their own connection. - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - super.setUp(); - factory = createConnectionFactory(); - managementConnection = factory.createConnection(); - managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination startDestination = createDestination(managementSession, getClass()+".start"); - Destination endDestination = createDestination(managementSession, getClass()+".end"); - LOG.info("Running with " + numberOfClients + " clients - sending " - + numberOfBatches + " batches of " + batchSize + " messages"); - controller = new LoadController("Controller",factory); - controller.setBatchSize(batchSize); - controller.setNumberOfBatches(numberOfBatches); - controller.setDeliveryMode(deliveryMode); - controller.setConnectionPerMessage(connectionPerMessage); - controller.setStartDestination(startDestination); - controller.setNextDestination(endDestination); - controller.setTimeout(timeout); - clients = new LoadClient[numberOfClients]; - for (int i = 0; i < numberOfClients; i++) { - Destination inDestination = null; - if (i==0) { - inDestination = startDestination; - }else { - inDestination = createDestination(managementSession, getClass() + ".client."+(i)); - } - Destination outDestination = null; - if (i==(numberOfClients-1)) { - outDestination = endDestination; - }else { - outDestination = createDestination(managementSession, getClass() + ".client."+(i+1)); - } - LoadClient client = new LoadClient("client("+i+")",factory); - client.setTimeout(timeout); - client.setDeliveryMode(deliveryMode); - client.setConnectionPerMessage(connectionPerMessage); - client.setStartDestination(inDestination); - client.setNextDestination(outDestination); - clients[i] = client; - } - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - managementConnection.close(); - for (int i = 0; i < numberOfClients; i++) { - clients[i].stop(); - } - controller.stop(); - } - - protected Destination createDestination(Session s, String destinationName) throws JMSException { - return s.createTopic(destinationName); - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() throws Exception { - return new HedwigConnectionFactoryImpl(); - } - - public void testLoad() throws JMSException, InterruptedException { - for (int i = 0; i < numberOfClients; i++) { - clients[i].start(); - } - controller.start(); - assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete()); - } - -} http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java ---------------------------------------------------------------------- diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java b/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java deleted file mode 100644 index a47ba67..0000000 --- a/hedwig-client-jms/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.perf; - -import java.util.ArrayList; -import java.util.List; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import junit.framework.TestCase; -import org.apache.hedwig.JmsTestBase; -import org.apache.hedwig.jms.spi.HedwigConnectionImpl; -import org.apache.hedwig.jms.spi.HedwigConnectionFactoryImpl; - - -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// For now, ignore it ... -@Ignore -public class ConnectionChurnTest extends JmsTestBase { - protected static final int CONNECTION_COUNT = 200; - private static final Logger LOG = LoggerFactory.getLogger(ConnectionChurnTest.class); - protected int topicCount; - - public void testPerformance() throws Exception { - ConnectionFactory factory = createConnectionFactory(); - List<Connection> list = new ArrayList<Connection>(); - for (int i = 0; i < CONNECTION_COUNT; i++) { - Connection connection = factory.createConnection(); - connection.start(); - list.add(connection); - LOG.info("Created " + i); - if (i % 100 == 0) { - closeConnections(list); - } - } - closeConnections(list); - } - - protected void closeConnections(List<Connection> list) throws JMSException { - for (Connection c : list) { - c.close(); - } - list.clear(); - } - - protected void setUp() throws Exception { - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - } - - protected HedwigConnectionFactoryImpl createConnectionFactory() - throws Exception { - HedwigConnectionFactoryImpl cf = new HedwigConnectionFactoryImpl(); - return cf; - } - -}
