Author: gtully
Date: Tue Nov  2 12:05:00 2010
New Revision: 1030013

URL: http://svn.apache.org/viewvc?rev=1030013&view=rev
Log:
resolve duplicate issue with concurrent durable subs and dlq, suppress the 
duplicates in the default Dead letter strategy. With no duplicates, 
concurrentStoreAndDispatchQueues true is fine. resolve 
https://issues.apache.org/activemq/browse/AMQ-2584

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Tue Nov  2 12:05:00 2010
@@ -832,7 +832,8 @@ public class RegionBroker extends EmptyB
                                                }
                                        } else {
                                            if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Expired message with 
no DLQ strategy in place");
+                                               LOG.debug("Dead Letter message 
with no DLQ strategy in place, message id: "
+                                    + message.getMessageId() + ", destination: 
" + message.getDestination());
                                            }
                                        }
                                }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
 Tue Nov  2 12:05:00 2010
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.broker.region.policy;
 
+import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.command.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A strategy for choosing which destination is used for dead letter queue
@@ -25,13 +28,21 @@ import org.apache.activemq.command.Messa
  * @version $Revision: 426366 $
  */
 public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy 
{
+    private static final Log LOG = 
LogFactory.getLog(AbstractDeadLetterStrategy.class);
     private boolean processNonPersistent = false;
     private boolean processExpired = true;
+    private ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
 
     public boolean isSendToDeadLetterQueue(Message message) {
         boolean result = false;
         if (message != null) {
             result = true;
+            if (audit.isDuplicate(message)) {
+                result = false;
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Not adding duplicate to DLQ: " + 
message.getMessageId() + ", dest: " + message.getDestination());
+                }
+            }
             if (!message.isPersistent() && !processNonPersistent) {
                 result = false;
             }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java?rev=1030013&r1=1030012&r2=1030013&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java
 Tue Nov  2 12:05:00 2010
@@ -224,7 +224,12 @@ public class AMQ2584ConcurrentDlqTest ex
         properties.put("maxFileLength", maxFileLengthVal);
         properties.put("cleanupInterval", "2000");
         properties.put("checkpointInterval", "2000");
-        properties.put("concurrentStoreAndDispatchQueues", "false");
+        // there are problems with duplicate dispatch in the cursor, which 
maintain
+        // a map of messages. A dup dispatch can be dropped.
+        // see: org.apache.activemq.broker.region.cursors.OrderedPendingList
+        // Adding duplicate detection to the default DLQ strategy removes the 
problem
+        // which means we can leave the default for concurrent store and 
dispatch q
+        //properties.put("concurrentStoreAndDispatchQueues", "false");
 
         IntrospectionSupport.setProperties(persistenceAdapter, properties);
     }


Reply via email to