Author: gtully
Date: Wed Feb 25 23:03:10 2009
New Revision: 747951
URL: http://svn.apache.org/viewvc?rev=747951&view=rev
Log:
resolve AMQ-2123, deal with the topic dispatch case where a subscription
arrives between store of message and dispatch of message
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=747951&r1=747950&r2=747951&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Wed Feb 25 23:03:10 2009
@@ -164,8 +164,11 @@
}
}
}else{
- //no message held
- removeMessage = true;
+
+ if (ackContainer.isEmpty() ||
isUnreferencedBySubscribers(subscriberMessages, messageId)) {
+ // no message reference held
+ removeMessage = true;
+ }
}
}
}finally {
@@ -174,6 +177,28 @@
return removeMessage;
}
+ // verify that no subscriber has a reference to this message. In the case
where the subscribers
+ // references are persisted but more than the persisted consumers get the
message, the ack from the non
+ // persisted consumer would remove the message in error
+ //
+ // see: https://issues.apache.org/activemq/browse/AMQ-2123
+ private boolean isUnreferencedBySubscribers(
+ Map<String, TopicSubContainer> subscriberContainers, MessageId
messageId) {
+ boolean isUnreferenced = true;
+ for (TopicSubContainer container: subscriberContainers.values()) {
+ if (!container.isEmpty()) {
+ for (Iterator i = container.iterator(); i.hasNext();) {
+ ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
+ if (messageId.equals(ref.getMessageId())) {
+ isUnreferenced = false;
+ break;
+ }
+ }
+ }
+ }
+ return isUnreferenced;
+ }
+
public void acknowledge(ConnectionContext context,
String clientId, String subscriptionName, MessageId
messageId) throws IOException {
acknowledgeReference(context, clientId, subscriptionName,
messageId);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=747951&r1=747950&r2=747951&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Wed Feb 25 23:03:10 2009
@@ -16,6 +16,13 @@
*/
package org.apache.activemq.bugs;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -28,6 +35,7 @@
import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
@@ -44,15 +52,92 @@
protected BrokerService broker;
protected String bindAddress="tcp://localhost:61616";
-
-
-
-
- protected byte[] payload = new byte[1024*16];
+ protected byte[] payload = new byte[1024*32];
protected ConnectionFactory factory;
+ protected Vector<Exception> exceptions = new Vector<Exception>();
+ public void testConcurrentDurableConsumer() throws Exception {
+ factory = createConnectionFactory();
+ final String topicName = getName();
+ final int numMessages = 500;
+ int numConsumers = 20;
+ final CountDownLatch counsumerStarted = new CountDownLatch(0);
+ final AtomicInteger receivedCount = new AtomicInteger();
+ Runnable consumer = new Runnable() {
+ public void run() {
+ final String consumerName = Thread.currentThread().getName();
+ int acked = 0;
+ int received = 0;
+
+
+ try {
+ while (acked < numMessages/2) {
+ // take one message and close, ack on occasion
+ Connection consumerConnection =
factory.createConnection();
+
((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false);
+ consumerConnection.setClientID(consumerName);
+ Session consumerSession =
consumerConnection.createSession(false,
+ Session.CLIENT_ACKNOWLEDGE);
+ Topic topic = consumerSession.createTopic(topicName);
+ consumerConnection.start();
+
+ MessageConsumer consumer = consumerSession
+ .createDurableSubscriber(topic, consumerName);
+
+ counsumerStarted.countDown();
+ Message msg = null;
+ do {
+ msg = consumer.receive(5000);
+ if (msg != null) {
+ receivedCount.incrementAndGet();
+ if (received++ % 2 == 0) {
+ msg.acknowledge();
+ acked++;
+ }
+ }
+ } while (msg == null);
+
+ consumerConnection.close();
+ }
+ assertTrue(received >= acked);
+ } catch (Exception e) {
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+ }
+ };
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ for (int i=0; i<numConsumers ; i++) {
+ executor.execute(consumer);
+ }
+
+ assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
+
+ Connection producerConnection = factory.createConnection();
+
((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false);
+ Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = producerSession.createTopic(topicName);
+ MessageProducer producer = producerSession.createProducer(topic);
+ producerConnection.start();
+ for (int i =0; i < numMessages; i++) {
+ BytesMessage msg = producerSession.createBytesMessage();
+ msg.writeBytes(payload);
+ producer.send(msg);
+ if (i != 0 && i%100==0) {
+ LOG.info("Sent msg " + i);
+ }
+ }
+
+ Thread.sleep(2000);
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ assertTrue("got some messages: " + receivedCount.get(),
receivedCount.get() > numMessages);
+ assertTrue(exceptions.isEmpty());
+ }
public void testConsumer() throws Exception{
factory = createConnectionFactory();
@@ -107,8 +192,6 @@
if (broker == null) {
broker = createBroker(true);
}
-
-
super.setUp();
}
@@ -144,6 +227,8 @@
answer.setDeleteAllMessagesOnStartup(deleteStore);
answer.addConnector(bindAddress);
answer.setUseShutdownHook(false);
+ answer.setUseJmx(false);
+ answer.setAdvisorySupport(false);
}
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {