Author: dejanb
Date: Tue Dec  1 17:20:29 2009
New Revision: 885841

URL: http://svn.apache.org/viewvc?rev=885841&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2519 - duplicate messages and 
jdbc store

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 Tue Dec  1 17:20:29 2009
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.broker.ConnectionContext;
@@ -25,24 +27,28 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.ByteSequenceData;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision: 1.10 $
  */
 public class JDBCMessageStore extends AbstractMessageStore {
 
+    private static final Log LOG = LogFactory.getLog(JDBCMessageStore.class);
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastMessageId = new AtomicLong(-1);
+    protected Map<ProducerId, Long> addedMessages = new HashMap<ProducerId, 
Long>();
 
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, 
JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
         super(destination);
@@ -53,22 +59,32 @@
 
     public void addMessage(ConnectionContext context, Message message) throws 
IOException {
 
+        MessageId messageId = message.getMessageId();
+        Long lastAddedMessage = addedMessages.get(messageId.getProducerId());
+        if (lastAddedMessage != null && lastAddedMessage >= 
messageId.getProducerSequenceId()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Message " + message + " already added to the 
database. Skipping.");
+            }
+            return;
+        }
+        
         // Serialize the Message..
         byte data[];
         try {
             ByteSequence packet = wireFormat.marshal(message);
             data = ByteSequenceData.toByteArray(packet);
         } catch (IOException e) {
-            throw IOExceptionSupport.create("Failed to broker message: " + 
message.getMessageId() + " in container: " + e, e);
+            throw IOExceptionSupport.create("Failed to broker message: " + 
messageId + " in container: " + e, e);
         }
 
         // Get a connection and insert the message into the DB.
         TransactionContext c = 
persistenceAdapter.getTransactionContext(context);
         try {
-            adapter.doAddMessage(c, message.getMessageId(), destination, data, 
message.getExpiration());
+            adapter.doAddMessage(c, messageId, destination, data, 
message.getExpiration());
+            addedMessages.put(messageId.getProducerId(), 
messageId.getProducerSequenceId());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
-            throw IOExceptionSupport.create("Failed to broker message: " + 
message.getMessageId() + " in container: " + e, e);
+            throw IOExceptionSupport.create("Failed to broker message: " + 
messageId + " in container: " + e, e);
         } finally {
             c.close();
         }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java?rev=885841&r1=885840&r2=885841&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterTest.java
 Tue Dec  1 17:20:29 2009
@@ -20,6 +20,11 @@
 
 import junit.framework.AssertionFailedError;
 
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterTestSupport;
 import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -42,14 +47,4 @@
         return jdbc;
     }
     
-    @Override
-    public void testStoreCanHandleDupMessages() throws Exception {
-        try {
-            super.testStoreCanHandleDupMessages();
-            fail("We expect this test to fail as it would be too expensive to 
add additional " +
-                 "unique constraints in the JDBC implementation to detect the 
duplicate messages.");
-        } catch (AssertionFailedError expected) {
-        }
-    }
-    
 }


Reply via email to