Author: jstrachan
Date: Mon Dec 19 09:43:22 2005
New Revision: 357732
URL: http://svn.apache.org/viewcvs?rev=357732&view=rev
Log:
* added test case to demonstrate query-based subscription recovery policy in
action.
* minor refactor to the SubscriptionRecoveryPolicy API to make it easy to
generate messages from inside the recovery policy
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
(with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
(with props)
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQMessageTransformation.java
Mon Dec 19 09:43:22 2005
@@ -48,6 +48,11 @@
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.ActiveMQTopic;
+/**
+ * A helper class for converting normal JMS interfaces into ActiveMQ specific
ones.
+ *
+ * @version $Revision: 1.1 $
+ */
public class ActiveMQMessageTransformation {
/**
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/Topic.java
Mon Dec 19 09:43:22 2005
@@ -90,7 +90,7 @@
}
else {
if (sub.getConsumerInfo().isRetroactive()) {
- subscriptionRecoveryPolicy.recover(context, sub);
+ subscriptionRecoveryPolicy.recover(context, this, sub);
}
consumers.add(sub);
}
@@ -272,8 +272,9 @@
dispatchValve.increment();
MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
-
- subscriptionRecoveryPolicy.add(context, message);
+ if (! subscriptionRecoveryPolicy.add(context, message)) {
+ return;
+ }
if (consumers.isEmpty())
return;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -24,6 +24,7 @@
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.memory.list.DestinationBasedMessageList;
import org.activemq.memory.list.MessageList;
@@ -44,11 +45,12 @@
private int maximumSize = 100 * 64 * 1024;
private boolean useSharedBuffer = true;
- public void add(ConnectionContext context, MessageReference message)
throws Throwable {
+ public boolean add(ConnectionContext context, MessageReference message)
throws Throwable {
buffer.add(message);
+ return true;
}
- public void recover(ConnectionContext context, Subscription sub) throws
Throwable {
+ public void recover(ConnectionContext context, Topic topic, Subscription
sub) throws Throwable {
// Re-dispatch the messages from the buffer.
List copy = buffer.getMessages(sub);
if( !copy.isEmpty() ) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -21,6 +21,7 @@
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
/**
@@ -35,11 +36,12 @@
volatile private MessageReference lastImage;
- public void add(ConnectionContext context, MessageReference node) throws
Throwable {
+ public boolean add(ConnectionContext context, MessageReference node)
throws Throwable {
lastImage = node;
+ return true;
}
- public void recover(ConnectionContext context, Subscription sub) throws
Throwable {
+ public void recover(ConnectionContext context, Topic topic, Subscription
sub) throws Throwable {
// Re-dispatch the last message seen.
MessageReference node = lastImage;
if( node != null ){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/MessageQuery.java
Mon Dec 19 09:43:22 2005
@@ -18,11 +18,12 @@
package org.activemq.broker.region.policy;
import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.Message;
import javax.jms.MessageListener;
/**
- * Represents some kind of query which will load messages from some source.
+ * Represents some kind of query which will load initial messages from some
source for a new topic subscriber.
*
* @version $Revision$
*/
@@ -34,6 +35,20 @@
* @param destination the destination on which the query is to be performed
* @param listener is the listener to notify as each message is created or
loaded
*/
- public void execute(ActiveMQDestination destination, MessageListener
listener);
+ public void execute(ActiveMQDestination destination, MessageListener
listener) throws Exception;
+
+ /**
+ * Returns true if the given update is valid and does not overlap with the
initial message query.
+ * When performing an initial load from some source, there is a chance
that an update may occur which is logically before
+ * the message sent on the initial load - so this method provides a hook
where the query instance can keep track of the version IDs
+ * of the messages sent so that if an older version is sent as an update
it can be excluded to avoid going backwards in time.
+ *
+ * e.g. if the execute() method creates version 2 of an object and then an
update message is sent for version 1, this method should return false to
+ * hide the old update message.
+ *
+ * @param message the update message which may have been sent before the
query actually completed
+ * @return true if the update message is valid otherwise false in which
case the update message will be discarded.
+ */
+ public boolean validateUpdate(Message message);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -21,6 +21,7 @@
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
/**
* This is the default Topic recovery policy which does not recover any
messages.
@@ -31,10 +32,11 @@
*/
public class NoSubscriptionRecoveryPolicy implements
SubscriptionRecoveryPolicy {
- public void add(ConnectionContext context, MessageReference node) throws
Throwable {
+ public boolean add(ConnectionContext context, MessageReference node)
throws Throwable {
+ return true;
}
- public void recover(ConnectionContext context, Subscription sub) throws
Throwable {
+ public void recover(ConnectionContext context, Topic topic, Subscription
sub) throws Throwable {
}
public void start() throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -18,13 +18,22 @@
**/
package org.activemq.broker.region.policy;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
+
import org.activemq.ActiveMQMessageTransformation;
import org.activemq.broker.ConnectionContext;
+import org.activemq.broker.region.Destination;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMessage;
+import org.activemq.command.ConnectionId;
+import org.activemq.command.MessageId;
+import org.activemq.command.ProducerId;
+import org.activemq.command.SessionId;
import org.activemq.filter.MessageEvaluationContext;
+import org.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,18 +53,25 @@
private static final Log log =
LogFactory.getLog(QueryBasedSubscriptionRecoveryPolicy.class);
private MessageQuery query;
+ private AtomicLong messageSequence = new AtomicLong(0);
+ private IdGenerator idGenerator = new IdGenerator();
+ private ProducerId producerId = createProducerId();
+
+ public QueryBasedSubscriptionRecoveryPolicy() {
+ }
- public void add(ConnectionContext context, MessageReference message)
throws Throwable {
+ public boolean add(ConnectionContext context, MessageReference message)
throws Throwable {
+ return query.validateUpdate(message.getMessage());
}
- public void recover(ConnectionContext context, final Subscription sub)
throws Throwable {
+ public void recover(ConnectionContext context, final Topic topic, final
Subscription sub) throws Throwable {
if (query != null) {
final MessageEvaluationContext msgContext =
context.getMessageEvaluationContext();
try {
ActiveMQDestination destination =
sub.getConsumerInfo().getDestination();
query.execute(destination, new MessageListener() {
public void onMessage(Message message) {
- dispatchInitialMessage(message, msgContext, sub);
+ dispatchInitialMessage(message, topic, msgContext,
sub);
}
});
}
@@ -66,7 +82,7 @@
}
public void start() throws Exception {
- if (query != null) {
+ if (query == null) {
throw new IllegalArgumentException("No query property configured");
}
}
@@ -85,10 +101,17 @@
this.query = query;
}
- protected void dispatchInitialMessage(Message message,
MessageEvaluationContext msgContext, Subscription sub) {
+ protected void dispatchInitialMessage(Message message, Destination
regionDestination, MessageEvaluationContext msgContext, Subscription sub) {
try {
ActiveMQMessage activeMessage =
ActiveMQMessageTransformation.transformMessage(message, null);
- msgContext.setDestination(activeMessage.getDestination());
+ ActiveMQDestination destination = activeMessage.getDestination();
+ if (destination == null) {
+ destination = sub.getConsumerInfo().getDestination();
+ activeMessage.setDestination(destination);
+ }
+ activeMessage.setRegionDestination(regionDestination);
+ configure(activeMessage);
+ msgContext.setDestination(destination);
msgContext.setMessageReference(activeMessage);
if (sub.matches(activeMessage, msgContext)) {
sub.add(activeMessage);
@@ -97,5 +120,19 @@
catch (Throwable e) {
log.warn("Failed to dispatch initial message: " + message + " into
subscription. Reason: " + e, e);
}
+ }
+
+ protected void configure(ActiveMQMessage msg) {
+ long sequenceNumber = messageSequence.incrementAndGet();
+ msg.setMessageId(new MessageId(producerId, sequenceNumber));
+ msg.onSend();
+ msg.setProducerId(producerId);
+ }
+
+ protected ProducerId createProducerId() {
+ String id = idGenerator.generateId();
+ ConnectionId connectionId = new ConnectionId(id);
+ SessionId sessionId = new SessionId(connectionId, 1);
+ return new ProducerId(sessionId, 1);
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -22,6 +22,7 @@
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
/**
* Abstraction to allow different recovery policies to be plugged
@@ -37,17 +38,20 @@
*
* @param context
* @param node
+ * @return TODO
* @throws Throwable
*/
- void add(ConnectionContext context, MessageReference message) throws
Throwable;
+ boolean add(ConnectionContext context, MessageReference message) throws
Throwable;
/**
* Let a subscription recover message held by the policy.
*
* @param context
+ * @param topic TODO
+ * @param topic
* @param node
* @throws Throwable
*/
- void recover(ConnectionContext context, Subscription sub) throws Throwable;
+ void recover(ConnectionContext context, Topic topic, Subscription sub)
throws Throwable;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=357732&r1=357731&r2=357732&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
Mon Dec 19 09:43:22 2005
@@ -27,6 +27,7 @@
import org.activemq.broker.ConnectionContext;
import org.activemq.broker.region.MessageReference;
import org.activemq.broker.region.Subscription;
+import org.activemq.broker.region.Topic;
import org.activemq.filter.MessageEvaluationContext;
import org.activemq.thread.Scheduler;
@@ -66,11 +67,12 @@
}
};
- public void add(ConnectionContext context, MessageReference message)
throws Throwable {
+ public boolean add(ConnectionContext context, MessageReference message)
throws Throwable {
buffer.add(new TimestampWrapper(message, lastGCRun));
+ return true;
}
- public void recover(ConnectionContext context, Subscription sub) throws
Throwable {
+ public void recover(ConnectionContext context, Topic topic, Subscription
sub) throws Throwable {
// Re-dispatch the messages from the buffer.
ArrayList copy = new ArrayList(buffer);
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java?rev=357732&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,47 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.test.retroactive;
+
+import org.activemq.broker.region.policy.MessageQuery;
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQTextMessage;
+import org.activemq.command.Message;
+
+import javax.jms.MessageListener;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class DummyMessageQuery implements MessageQuery {
+
+ public static int messageCount = 10;
+
+ public void execute(ActiveMQDestination destination, MessageListener
listener) throws Exception {
+ System.out.println("Initial query is creating: " + messageCount + "
messages");
+ for (int i = 0; i < messageCount; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Initial message: " + i + " loaded from query");
+ listener.onMessage(message);
+ }
+ }
+
+ public boolean validateUpdate(Message message) {
+ return true;
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/DummyMessageQuery.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java?rev=357732&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,108 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.test.retroactive;
+
+import org.activemq.ActiveMQConnectionFactory;
+import org.activemq.EmbeddedBrokerTestSupport;
+import org.activemq.broker.BrokerService;
+import org.activemq.util.MessageList;
+import org.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import java.util.Date;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class RetroactiveConsumerWithMessageQueryTest extends
EmbeddedBrokerTestSupport {
+ protected int messageCount = 20;
+ protected Connection connection;
+ protected Session session;
+
+ public void testConsumeAndReceiveInitialQueryBeforeUpdates() throws
Exception {
+
+ // lets some messages
+ connection = createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageList listener = new MessageList();
+ listener.setVerbose(true);
+ consumer.setMessageListener(listener);
+
+ MessageProducer producer = session.createProducer(destination);
+ int updateMessageCount = messageCount - DummyMessageQuery.messageCount;
+ for (int i = 0; i < updateMessageCount; i++) {
+ TextMessage message = session.createTextMessage("Update Message: "
+ i + " sent at: " + new Date());
+ producer.send(message);
+ }
+ producer.close();
+ System.out.println("Sent: " + updateMessageCount + " update messages");
+
+ listener.assertMessagesReceived(messageCount);
+ }
+
+ protected void setUp() throws Exception {
+ useTopic = true;
+ bindAddress = "vm://localhost";
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ if (session != null) {
+ session.close();
+ session = null;
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ ActiveMQConnectionFactory answer = new
ActiveMQConnectionFactory(bindAddress);
+ answer.setUseRetroactiveConsumer(true);
+ return answer;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ String uri = getBrokerXml();
+ System.out.println("Loading broker configuration from the classpath
with URI: " + uri);
+ BrokerFactoryBean factory = new BrokerFactoryBean(new
ClassPathResource(uri));
+ factory.afterPropertiesSet();
+ return factory.getBroker();
+ }
+
+ protected void startBroker() throws Exception {
+ // broker already started by XBean
+ }
+
+ protected String getBrokerXml() {
+ return "org/activemq/test/retroactive/activemq-message-query.xml";
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/test/retroactive/RetroactiveConsumerWithMessageQueryTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml?rev=357732&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
Mon Dec 19 09:43:22 2005
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans xmlns="http://activemq.org/config/1.0"
+ xmlns:s="http://xbean.org/spring/">
+
+ <broker persistent="false">
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic="org.activemq.test.>">
+ <subscriptionRecoveryPolicy>
+ <queryBasedSubscriptionRecoveryPolicy query="#myQuery" />
+ </subscriptionRecoveryPolicy>
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+
+ <bean id="myQuery"
+ class="org.activemq.test.retroactive.DummyMessageQuery" />
+</beans>
+<!-- END SNIPPET: xbean -->
Propchange:
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/test/resources/org/activemq/test/retroactive/activemq-message-query.xml
------------------------------------------------------------------------------
svn:mime-type = text/xml