This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 3621258  ARTEMIS-3285 potential duplicate messages with LVQ + 
non-destructive
3621258 is described below

commit 36212584582779f8b57dea06f80292eb6f89599a
Author: Justin Bertram <[email protected]>
AuthorDate: Thu May 6 14:22:17 2021 -0500

    ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive
---
 .../org/apache/activemq/artemis/utils/Wait.java    |  10 +-
 .../artemis/core/server/impl/LastValueQueue.java   |  74 +++++++++---
 .../artemis/core/server/impl/QueueImpl.java        |  44 +++++---
 .../core/server/impl/ServerConsumerImpl.java       |   2 +-
 .../integration/amqp/JMSNonDestructiveTest.java    | 125 +++++++++++++++++++++
 5 files changed, 221 insertions(+), 34 deletions(-)

diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 8e8d61d..62c8c2d 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -66,10 +66,16 @@ public class Wait {
    }
 
    public static void assertEquals(Long size, LongCondition condition, long 
timeout, long sleepMillis) throws Exception {
-      boolean result = waitFor(() -> condition.getCount() == size, timeout, 
sleepMillis);
+      assertEquals(size, condition, timeout, sleepMillis, true);
+   }
+
+   public static void assertEquals(Long size, LongCondition condition, long 
timeout, long sleepMillis, boolean printThreadDump) throws Exception {
+      boolean result = waitFor(() -> condition.getCount() == size, timeout, 
sleepMillis, printThreadDump);
 
       if (!result) {
-         System.out.println(ThreadDumpUtil.threadDump("thread dump"));
+         if (printThreadDump) {
+            System.out.println(ThreadDumpUtil.threadDump("thread dump"));
+         }
          Assert.fail(size + " != " + condition.getCount());
       }
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 4305585..1ea280c 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +45,7 @@ import 
org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.jboss.logging.Logger;
 
 /**
  * A queue that will discard messages if a newer message with the same
@@ -56,9 +58,27 @@ import 
org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new 
ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected final LinkedList<MessageReference> nextDeliveries = new 
LinkedList<>();
+
+
+   /* in certain cases we need to redeliver a message */
+   @Override
+   protected MessageReference nextDelivery() {
+      return nextDeliveries.poll();
+   }
+
+   @Override
+   protected void repeatNextDelivery(MessageReference reference) {
+      // put the ref back onto the head of the list so that the next time 
poll() is called this ref is returned
+      nextDeliveries.addFirst(reference);
+   }
+
+
    @Deprecated
    public LastValueQueue(final long persistenceID,
                          final SimpleString address,
@@ -151,25 +171,20 @@ public class LastValueQueue extends QueueImpl {
          HolderReference hr = map.get(prop);
 
          if (hr != null) {
-            // We need to overwrite the old ref with the new one and ack the 
old one
-
-            replaceLVQMessage(ref, hr);
-
-            if (isNonDestructive() && hr.isDelivered()) {
-               hr.resetDelivered();
-               // 
--------------------------------------------------------------------------------
-               // If non Destructive, and if a reference was previously 
delivered
-               // we would not be able to receive this message again
-               // unless we reset the iterators
-               // The message is not removed, so we can't actually remove it
-               // a result of this operation is that previously delivered 
messages
-               // will probably be delivered again.
-               // if we ever want to avoid other redeliveries we would have to 
implement a reset or redeliver
-               // operation on the iterator for a single message
-               resetAllIterators();
-               deliverAsync();
-            }
+            if (isNonDestructive() && hr.isInDelivery()) {
+               // if the ref is already being delivered we'll do the replace 
in the postAcknowledge
+               hr.setReplacementRef(ref);
+            } else {
+               // We need to overwrite the old ref with the new one and ack 
the old one
+               replaceLVQMessage(ref, hr);
 
+               if (isNonDestructive() && hr.isDelivered()) {
+                  hr.resetDelivered();
+                  // since we're replacing a ref that was already delivered we 
want to trigger a delivery for this new replacement
+                  nextDeliveries.add(hr);
+                  deliverAsync();
+               }
+            }
          } else {
             hr = new HolderReference(prop, ref);
 
@@ -247,6 +262,19 @@ public class LastValueQueue extends QueueImpl {
    }
 
    @Override
+   public void postAcknowledge(final MessageReference ref, AckReason reason) {
+      if (isNonDestructive()) {
+         if (ref instanceof HolderReference) {
+            HolderReference hr = (HolderReference) ref;
+            if (hr.getReplacementRef() != null) {
+               replaceLVQMessage(hr.getReplacementRef(), hr);
+            }
+         }
+      }
+      super.postAcknowledge(ref, reason);
+   }
+
+   @Override
    public boolean allowsReferenceCallback() {
       return false;
    }
@@ -358,11 +386,21 @@ public class LastValueQueue extends QueueImpl {
 
       private volatile MessageReference ref;
 
+      private volatile MessageReference replacementRef;
+
       private long consumerID;
 
       private boolean hasConsumerID = false;
 
 
+      public MessageReference getReplacementRef() {
+         return replacementRef;
+      }
+
+      public void setReplacementRef(MessageReference replacementRef) {
+         this.replacementRef = replacementRef;
+      }
+
       public void resetDelivered() {
          delivered = false;
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 082c9cb..a8ce01d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -340,6 +340,17 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private volatile long ringSize;
 
+   /* in certain cases we need to redeliver a message directly.
+   * it's useful for usecases last LastValueQueue */
+   protected MessageReference nextDelivery() {
+      return null;
+   }
+
+   protected void repeatNextDelivery(MessageReference reference) {
+
+   }
+
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -2969,12 +2980,16 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                holder.iter = messageReferences.iterator();
             }
 
-            if (holder.iter.hasNext()) {
+            ref = nextDelivery();
+            boolean nextDelivery = false;
+            if (ref != null) {
+               nextDelivery = true;
+            }
+
+            if (ref == null && holder.iter.hasNext()) {
                ref = holder.iter.next();
-            } else {
-               ref = null;
-               existingMemoryEstimate = 0;
             }
+
             if (ref == null) {
                noDelivery++;
             } else {
@@ -3022,14 +3037,18 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   handled++;
                   consumers.reset();
                } else if (status == HandleStatus.BUSY) {
-                  try {
-                     holder.iter.repeat();
-                  } catch (NoSuchElementException e) {
-                     // this could happen if there was an exception on the 
queue handling
-                     // and it returned BUSY because of that exception
-                     //
-                     // We will just log it as there's nothing else we can do 
now.
-                     logger.warn(e.getMessage(), e);
+                  if (nextDelivery) {
+                     repeatNextDelivery(ref);
+                  } else {
+                     try {
+                        holder.iter.repeat();
+                     } catch (NoSuchElementException e) {
+                        // this could happen if there was an exception on the 
queue handling
+                        // and it returned BUSY because of that exception
+                        //
+                        // We will just log it as there's nothing else we can 
do now.
+                        logger.warn(e.getMessage(), e);
+                     }
                   }
 
                   noDelivery++;
@@ -3067,7 +3086,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          if (handledconsumer != null) {
             proceedDeliver(handledconsumer, ref);
          }
-
       }
 
       return true;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 10b131a..6c4519b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -984,7 +984,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
          }
 
          if (ref == null) {
-            ActiveMQIllegalStateException ils = new 
ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
+            ActiveMQIllegalStateException ils = 
ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, 
messageQueue.getName());
             tx.markAsRollbackOnly(ils);
             throw ils;
          }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index c39381a..609d712 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -24,8 +24,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -44,6 +49,8 @@ import 
org.apache.activemq.artemis.core.server.impl.LastValueQueue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.RetryRule;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -52,6 +59,8 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class JMSNonDestructiveTest extends JMSClientTestSupport {
 
+   private static final Logger logger = 
Logger.getLogger(JMSNonDestructiveTest.class);
+
    @Rule
    public RetryRule retryRule = new RetryRule(2);
 
@@ -588,4 +597,120 @@ public class JMSNonDestructiveTest extends 
JMSClientTestSupport {
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) 
throws Exception {
+      final int GROUP_COUNT = 5;
+      final int MESSAGE_COUNT_PER_GROUP = 25;
+      final int PRODUCER_COUNT = 5;
+
+      HashMap<String, List<String>> results = new HashMap<>();
+      for (int i = 0; i < GROUP_COUNT; i++) {
+         results.put(i + "", new ArrayList<>());
+      }
+
+      HashMap<String, Integer> dups = new HashMap<>();
+      List<Producer> producers = new ArrayList<>();
+
+      try (Connection connection = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         for (int i = 0; i < PRODUCER_COUNT; i++) {
+            producers.add(new Producer(connectionSupplier, 
MESSAGE_COUNT_PER_GROUP, GROUP_COUNT, i));
+         }
+
+         for (Producer producer : producers) {
+            new Thread(producer).start();
+         }
+
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+
+         for (Producer producer : producers) {
+            assertFalse("Producer failed!", producer.failed);
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder logMessage = new StringBuilder();
+         logMessage.append("Messages received with lastval=" + entry.getKey() 
+ " (");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            logMessage.append(s + ",");
+         }
+         logger.info(logMessage + ")");
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) 
{
+            sb.append(stringIntegerEntry.getKey() + "(" + 
stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      Wait.assertEquals((long) GROUP_COUNT, () -> 
server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 
100, false);
+   }
+
+   private class Producer implements Runnable {
+      private final ConnectionSupplier connectionSupplier;
+      private final int messageCount;
+      private final int groupCount;
+      private final int offset;
+
+      public boolean failed = false;
+
+      Producer(ConnectionSupplier connectionSupplier, int messageCount, int 
groupCount, int offset) {
+         this.connectionSupplier = connectionSupplier;
+         this.messageCount = messageCount;
+         this.groupCount = groupCount;
+         this.offset = offset;
+      }
+
+      @Override
+      public void run() {
+         try (Connection connection = connectionSupplier.createConnection()) {
+            Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+            MessageProducer producer = session.createProducer(queue);
+
+            int startingPoint = offset * (messageCount * groupCount);
+            int messagesToSend = messageCount * groupCount;
+
+            for (int i = startingPoint; i < messagesToSend + startingPoint; 
i++) {
+               String lastval = "" + (i % groupCount);
+               TextMessage message = session.createTextMessage();
+               message.setText("" + i);
+               message.setStringProperty("data", "" + i);
+               message.setStringProperty("lastval", lastval);
+               
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+               producer.send(message);
+            }
+         } catch (JMSException e) {
+            e.printStackTrace();
+            failed = true;
+            return;
+         }
+      }
+   }
 }
\ No newline at end of file

Reply via email to