Author: jlim
Date: Fri Feb 17 03:05:10 2006
New Revision: 378483
URL: http://svn.apache.org/viewcvs?rev=378483&view=rev
Log:
test case to check sending and receiving of messages inside a transaction
(http://forums.activemq.org/posts/list/364.page)
Added:
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
Added:
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java?rev=378483&view=auto
==============================================================================
---
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
(added)
+++
incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageInTransactionTest.java
Fri Feb 17 03:05:10 2006
@@ -0,0 +1,180 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import javax.jms.*;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.io.File;
+
+
+public final class PublishOnQueueConsumedMessageInTransactionTest extends
TestCase implements MessageListener {
+
+ private Session producerSession;
+ private Session consumerSession;
+ private Destination queue;
+ private ActiveMQConnectionFactory factory;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private Connection connection;
+ private ObjectMessage objectMessage = null;
+ private List messages = createConcurrentList();
+ private final Object lock = new Object();
+ private String[] data;
+ private String DATAFILE_ROOT = "activemq-data";
+ private int messageCount = 3;
+ private String url = "vm://localhost";
+
+ // Invalid acknowledgment warning can be viewed on the console of a
remote broker
+ // The warning message is not thrown back to the client
+ //private String url = "tcp://localhost:61616";
+
+
+ protected void setUp() throws Exception {
+ File dataFile = new File(DATAFILE_ROOT);
+ recursiveDelete(dataFile);
+ try {
+ factory = new ActiveMQConnectionFactory(url);
+ connection = factory.createConnection();
+ producerSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ consumerSession = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ queue = new ActiveMQQueue("FOO.BAR");
+ data = new String[messageCount];
+
+ for (int i = 0; i < messageCount; i++) {
+ data[i] = "Message : " + i;
+ }
+ } catch (JMSException je) {
+ fail("Error setting up connection : " + je.toString());
+ }
+ }
+
+
+ public void testSendReceive() throws Exception {
+ sendMessage();
+
+ connection.start();
+ consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ waitForMessagesToBeDelivered();
+ assertEquals("Messages received doesn't equal messages sent",
messages.size(),data.length);
+
+ }
+
+
+ protected void sendMessage() throws JMSException {
+ messages.clear();
+ try {
+ for (int i = 0; i < data.length; ++i) {
+ producer = producerSession.createProducer(queue);
+ objectMessage = producerSession.createObjectMessage(data[i]);
+ producer.send(objectMessage);
+ producerSession.commit();
+ System.out.println("sending message :" + objectMessage);
+ }
+ } catch (Exception e) {
+ if (producerSession != null) {
+ producerSession.rollback();
+ System.out.println("rollback");
+ producerSession.close();
+ }
+
+ e.printStackTrace();
+ }
+ }
+
+
+ public synchronized void onMessage(Message m) {
+ try {
+ objectMessage = (ObjectMessage) m;
+ consumeMessage(objectMessage,messages);
+
+ System.out.println("consumer received message :" + objectMessage);
+ consumerSession.commit();
+
+ } catch (Exception e) {
+ try {
+ consumerSession.rollback();
+ System.out.println("rolled back transaction");
+ } catch (JMSException e1) {
+ System.out.println(e1);
+ e1.printStackTrace();
+ }
+ System.out.println(e);
+ e.printStackTrace();
+ }
+ }
+
+
+ protected void consumeMessage(Message message, List messageList) {
+ messageList.add(message);
+ if (messageList.size() >= data.length) {
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+
+ }
+
+
+ protected List createConcurrentList() {
+ return Collections.synchronizedList(new ArrayList());
+ }
+
+
+ protected void waitForMessagesToBeDelivered() {
+ long maxWaitTime = 5000;
+ long waitTime = maxWaitTime;
+ long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+ synchronized (lock) {
+ while (messages.size() <= data.length && waitTime >= 0) {
+ try {
+ lock.wait(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+ }
+ }
+ }
+
+
+ protected static void recursiveDelete(File file) {
+ if( file.isDirectory() ) {
+ File[] files = file.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ recursiveDelete(files[i]);
+ }
+ }
+ file.delete();
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+
+ super.tearDown();
+ }
+}
\ No newline at end of file