Durable subscribers receives nothing when reconnecting with a prefetch size
less than the number of messages that don't match a message selector
------------------------------------------------------------------------------------------------------------------------------------------------
Key: AMQ-2580
URL: https://issues.apache.org/activemq/browse/AMQ-2580
Project: ActiveMQ
Issue Type: Bug
Components: Message Store
Affects Versions: 5.3.0
Environment: phillip:~ henryp$ uname -a
Darwin phillip.fritz.box 9.8.0 Darwin Kernel Version 9.8.0: Wed Jul 15 16:55:01
PDT 2009; root:xnu-1228.15.4~1/RELEASE_I386 i386 i386
phillip:~ henryp$ java -version
java version "1.5.0_20"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.5.0_20-b02-315)
Java HotSpot(TM) Client VM (build 1.5.0_20-141, mixed mode, sharing)
Reporter: Phillip Henry
1. Create a connection factory with a message prefetch size of PREFETCH_SIZE.
2. Create a durable subscriber to a Topic with a message selector of "a=X".
3. Disconnect.
4. More than PREFETCH_SIZE messages are then put onto the Topic with a string
property "a=Y".
5. Just one message is put onto the Topic with string property "a=X".
6. The durable subscriber connects again but it does not get the message with
string property "a=X". In fact, it gets nothing.
It appears that upon reconnecting, the message selector is not respected when
retrieving the message from storage.
I've got a unit test to demonstrate this plus a proposed fix.
{code}
### Eclipse Workspace Patch 1.0
#P activemq
Index:
activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
===================================================================
---
activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(revision 900353)
+++
activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(working copy)
@@ -306,7 +306,7 @@
count++;
container.setBatchEntry(msg.getMessageId(),
entry);
} else {
- break;
+ //break;
}
} else {
container.reset();
Index:
activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
===================================================================
---
activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
(revision 900353)
+++
activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
(working copy)
@@ -46,10 +46,11 @@
public boolean recoverMessage(Message message) throws Exception {
if (listener.hasSpace()) {
- listener.recoverMessage(message);
- lastRecovered = message.getMessageId();
- count++;
- return true;
+ if (listener.recoverMessage(message)) {
+ lastRecovered = message.getMessageId();
+ count++;
+ return true;
+ }
}
return false;
}
Index: activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
===================================================================
--- activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
(revision 0)
+++ activemq-pool/src/test/java/org/apache/activemq/pool/PrefetchTest.java
(revision 0)
@@ -0,0 +1,174 @@
+package org.apache.activemq.pool;
+
+import java.io.File;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+
+public class PrefetchTest extends TestCase {
+
+ private static final String TOPIC_NAME = "topicName";
+ private static final String CLIENT_ID = "client_id";
+ private static final String textOfSelectedMsg = "good_message";
+
+ protected TopicConnection connection;
+
+ private Topic topic;
+ private Session session;
+ private MessageProducer producer;
+ private PooledConnectionFactory connectionFactory;
+ private TopicConnection topicConnection;
+ private String bindAddress;
+ private BrokerService service;
+
+ protected void setUp() throws Exception {
+ bindAddress = "tcp://localhost:61616";
+ super.setUp();
+ initDurableBroker();
+ initConnectionFactory();
+ initTopic();
+
+ }
+
+ protected void tearDown() throws Exception {
+ shutdownClient();
+ connectionFactory.stop();
+ service.stop();
+ super.tearDown();
+ }
+
+ private void initConnection() throws JMSException {
+ System.out.println("Initializing connection");
+ connection = (TopicConnection)
connectionFactory.createConnection();
+ connection.start();
+ }
+
+ public void testTopicIsDurableSmokeTest() throws Exception {
+
+ initClient();
+ MessageConsumer consumer = createMessageConsumer();
+ System.out.println("Consuming message");
+ assertNull(consumer.receive(1));
+ shutdownClient();
+ consumer.close();
+
+ sendMessages();
+ shutdownClient();
+
+ initClient();
+ consumer = createMessageConsumer();
+
+ System.out.println("Consuming message");
+ TextMessage answer1 = (TextMessage)consumer.receive(1000);
+ assertNotNull(answer1);
+
+ consumer.close();
+ }
+
+ private MessageConsumer createMessageConsumer() throws JMSException {
+ System.out.println("creating durable subscriber");
+ return session.createDurableSubscriber(topic,
+ TOPIC_NAME,
+ "name='value'",
+ false);
+ }
+
+ private void initClient() throws JMSException {
+ System.out.println("Initializing client");
+
+ initConnection();
+ initSession();
+ }
+
+ private void shutdownClient()
+ throws JMSException {
+ System.out.println("Closing session and connection");
+ session.close();
+ connection.close();
+ session = null;
+ connection = null;
+ }
+
+ private void sendMessages()
+ throws JMSException {
+ initConnection();
+
+ initSession();
+
+ System.out.println("Creating producer");
+ producer = session.createProducer(topic);
+
+ sendMessageThatFailsSelection();
+
+ sendMessage(textOfSelectedMsg, "value");
+ }
+
+ private void initSession() throws JMSException {
+ System.out.println("Initializing session");
+ session = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void sendMessageThatFailsSelection() throws JMSException {
+ for (int i = 0 ; i < 5 ; i++) {
+ String textOfNotSelectedMsg = "Msg_" + i;
+ sendMessage(textOfNotSelectedMsg, "not_value");
+ System.out.println("#");
+ }
+ }
+
+ private void sendMessage(
+ String msgText,
+ String propertyValue) throws JMSException {
+ System.out.println("Creating message: " + msgText);
+ TextMessage messageToSelect =
session.createTextMessage(msgText);
+ messageToSelect.setStringProperty("name", propertyValue);
+ System.out.println("Sending message");
+ producer.send(messageToSelect);
+ }
+
+ protected void initConnectionFactory() {
+ ActiveMQConnectionFactory activeMqConnectionFactory =
createActiveMqConnectionFactory();
+ connectionFactory = new
PooledConnectionFactory(activeMqConnectionFactory);
+ }
+
+
+ private ActiveMQConnectionFactory createActiveMqConnectionFactory() {
+ ActiveMQConnectionFactory activeMqConnectionFactory = new
ActiveMQConnectionFactory();
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setDurableTopicPrefetch(2);
+ activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy );
+ activeMqConnectionFactory.setClientID(CLIENT_ID);
+ return activeMqConnectionFactory;
+ }
+
+ private void initDurableBroker() throws Exception {
+ service = new BrokerService();
+ PersistenceAdapter persistenceAdaptor =
service.getPersistenceAdapter();
+ File file = new File("phills_durable_dir");
+ persistenceAdaptor.setDirectory(file);
+ service.setTransportConnectorURIs(new String[] { bindAddress }
);
+ service.setPersistent(true);
+ service.setUseJmx(true);
+ service.start();
+
+ }
+
+ private void initTopic() throws JMSException {
+ topicConnection = (TopicConnection)
connectionFactory.createConnection();
+ TopicSession topicSession =
topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = topicSession.createTopic(TOPIC_NAME);
+ }
+}
{code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.