Author: jstrachan
Date: Fri Dec 30 07:36:19 2005
New Revision: 360108
URL: http://svn.apache.org/viewcvs?rev=360108&view=rev
Log:
added test cases to demonstrate shared and individual DLQ strategies; which
highlight a bug in the rollback logic in the client
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
(with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Dec 30 07:36:19 2005
@@ -175,13 +175,14 @@
Message message = node.getMessage();
if( message !=null ) {
- // TODO is this meant to be == null?
- if( message.getOriginalDestination()!=null )
+ // TODO is this meant to be == null - it was != ?
+ if( message.getOriginalDestination()==null )
message.setOriginalDestination(message.getDestination());
ActiveMQDestination originalDestination =
message.getOriginalDestination();
DeadLetterStrategy deadLetterStrategy =
node.getRegionDestination().getDeadLetterStrategy();
-
message.setDestination(deadLetterStrategy.getDeadLetterQueueFor(originalDestination));
+ ActiveMQDestination deadLetterDestination =
deadLetterStrategy.getDeadLetterQueueFor(originalDestination);
+ message.setDestination(deadLetterDestination);
if( message.getOriginalTransactionId()!=null )
message.setOriginalTransactionId(message.getTransactionId());
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Dec 30 07:36:19 2005
@@ -41,11 +41,17 @@
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
+ if (deadLetterStrategy != null) {
+ queue.setDeadLetterStrategy(deadLetterStrategy);
+ }
}
public void configure(Topic topic) {
if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy);
+ }
+ if (deadLetterStrategy != null) {
+ topic.setDeadLetterStrategy(deadLetterStrategy);
}
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java?rev=360108&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
Fri Dec 30 07:36:19 2005
@@ -0,0 +1,91 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.RedeliveryPolicy;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class DeadLetterTest extends DeadLetterTestSupport {
+
+ private int rollbackCount;
+
+ protected void doTest() throws Exception {
+ connection.start();
+
+ ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+ rollbackCount =
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+ System.out.println("Will redeliver messages: " + rollbackCount + "
times");
+
+ makeConsumer();
+ makeDlqConsumer();
+
+ sendMessages();
+
+ // now lets receive and rollback N times
+ for (int i = 0; i < messageCount; i++) {
+ consumeAndRollback(i);
+ }
+
+ for (int i = 0; i < messageCount; i++) {
+ Message msg = dlqConsumer.receive(1000);
+ assertMessage(msg, i);
+ assertNotNull("Should be a DLQ message for loop: " + i, msg);
+ }
+ }
+
+ protected void consumeAndRollback(int messageCounter) throws Exception {
+ for (int i = 0; i < rollbackCount; i++) {
+ Message message = consumer.receive(5000);
+ assertNotNull("No message received for message: " + messageCounter
+ " and rollback loop: " + i, message);
+ assertMessage(message, messageCounter);
+
+ session.rollback();
+ }
+ System.out.println("Rolled back: " + rollbackCount + " times");
+ }
+
+ protected void setUp() throws Exception {
+ transactedMode = true;
+ super.setUp();
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
+ ActiveMQConnectionFactory answer = super.createConnectionFactory();
+ RedeliveryPolicy policy = new RedeliveryPolicy();
+ policy.setMaximumRedeliveries(3);
+ policy.setBackOffMultiplier((short) 1);
+ policy.setInitialRedeliveryDelay(10);
+ policy.setUseExponentialBackOff(false);
+ answer.setRedeliveryPolicy(policy);
+ return answer;
+ }
+
+ protected Destination createDlqDestination() {
+ return new ActiveMQQueue("ActiveMQ.DLQ");
+ }
+
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=360108&r1=360107&r2=360108&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
Fri Dec 30 07:36:19 2005
@@ -18,9 +18,6 @@
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -30,6 +27,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
/**
@@ -49,6 +47,8 @@
protected Destination dlqDestination;
protected MessageConsumer dlqConsumer;
protected BrokerService broker;
+ protected boolean transactedMode = false;
+ protected int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
protected void setUp() throws Exception {
super.setUp();
@@ -57,7 +57,7 @@
connection = createConnection();
connection.setClientID(toString());
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ session = connection.createSession(transactedMode, acknowledgeMode);
connection.start();
}
@@ -80,6 +80,7 @@
protected void makeConsumer() throws JMSException {
Destination destination = getDestination();
+ System.out.println("Consuming from: " + destination);
if (durableSubscriber) {
consumer = session.createDurableSubscriber((Topic) destination,
destination.toString());
}
@@ -96,17 +97,34 @@
}
protected void sendMessages() throws JMSException {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(getDestination());
producer.setDeliveryMode(deliveryMode);
producer.setTimeToLive(timeToLive);
System.out.println("Sending " + messageCount + " messages to: " +
getDestination());
for (int i = 0; i < messageCount; i++) {
- Message message = session.createTextMessage("msg: " + i);
+ Message message = createMessage(session, i);
producer.send(message);
}
}
+ protected TextMessage createMessage(Session session, int i) throws
JMSException {
+ return session.createTextMessage(getMessageText(i));
+ }
+
+ protected String getMessageText(int i) {
+ return "message: " + i;
+ }
+
+ protected void assertMessage(Message message, int i) throws Exception {
+ System.out.println("Received message: " + message);
+ assertNotNull("No message received for index: " + i, message);
+ assertTrue("Should be a TextMessage not: " + message, message
instanceof TextMessage);
+ TextMessage textMessage = (TextMessage) message;
+ assertEquals("text of message: " + i, getMessageText(i), textMessage
.getText());
+ }
+
protected abstract Destination createDlqDestination();
public void testTransientTopicMessage() throws Exception {
@@ -143,5 +161,4 @@
}
return destination;
}
-
}
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java?rev=360108&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
Fri Dec 30 07:36:19 2005
@@ -0,0 +1,52 @@
+/**
+ *
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+import javax.jms.Destination;
+
+/**
+ *
+ * @version $Revision$
+ */
+public class IndividualDeadLetterTest extends DeadLetterTest {
+
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = super.createBroker();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy());
+
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+
+ return broker;
+ }
+
+ protected Destination createDlqDestination() {
+ return new ActiveMQQueue("ActiveMQ.DLQ.Queue." + getClass().getName()
+ "." + getName());
+ }
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/IndividualDeadLetterTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain