[ 
https://issues.apache.org/jira/browse/ARTEMIS-3285?focusedWorklogId=601359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-601359
 ]

ASF GitHub Bot logged work on ARTEMIS-3285:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/21 19:49
            Start Date: 24/May/21 19:49
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on a change in pull request 
#3583:
URL: https://github.com/apache/activemq-artemis/pull/3583#discussion_r638231866



##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -56,9 +58,27 @@
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new 
ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected volatile LinkedList<MessageReference> nextDeliveries = new 
LinkedList<>();

Review comment:
       I almost forgot.. replace volatile by final here please?
   
   it used to be an element on your logic. .but as it's a collection now it 
needs to be a final linkedList (at least on the way it is implemented now.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -56,9 +58,27 @@
 @SuppressWarnings("ALL")
 public class LastValueQueue extends QueueImpl {
 
+   private static final Logger logger = Logger.getLogger(LastValueQueue.class);
    private final Map<SimpleString, HolderReference> map = new 
ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
+   // only use this within synchronized methods or synchronized(this) blocks
+   protected volatile LinkedList<MessageReference> nextDeliveries = new 
LinkedList<>();

Review comment:
       nice! that's what I thought you were going to need!

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -928,7 +927,7 @@ public Queue getQueue() {
 
             acks++;
          }
-         while (ref.getMessageID() != messageID);
+         while (ref != null && ref.getMessageID() != messageID);

Review comment:
       was this on purpose? it seems a left over from a debug, since it was 
never supposed to hit null here (the throw exception is done when ref==null)

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
##########
@@ -918,7 +917,7 @@ public Queue getQueue() {
             }
 
             if (ref == null) {
-               ActiveMQIllegalStateException ils = 
ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, 
messageQueue.getName());
+               ActiveMQIllegalStateException ils = new 
ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);

Review comment:
       Was this on purpose? The former seemed more complete to me.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
##########
@@ -151,25 +171,20 @@ public synchronized void addTail(final MessageReference 
ref, final boolean direc
          HolderReference hr = map.get(prop);
 
          if (hr != null) {
-            // We need to overwrite the old ref with the new one and ack the 
old one
-
-            replaceLVQMessage(ref, hr);
-
-            if (isNonDestructive() && hr.isDelivered()) {
-               hr.resetDelivered();
-               // 
--------------------------------------------------------------------------------
-               // If non Destructive, and if a reference was previously 
delivered
-               // we would not be able to receive this message again
-               // unless we reset the iterators
-               // The message is not removed, so we can't actually remove it
-               // a result of this operation is that previously delivered 
messages
-               // will probably be delivered again.
-               // if we ever want to avoid other redeliveries we would have to 
implement a reset or redeliver
-               // operation on the iterator for a single message
-               resetAllIterators();
-               deliverAsync();
-            }
+            if (isNonDestructive() && hr.isInDelivery()) {
+               // if the ref is already being delivered we'll do the replace 
in the postAcknowledge
+               hr.setReplacementRef(ref);
+            } else {
+               // We need to overwrite the old ref with the new one and ack 
the old one
+               replaceLVQMessage(ref, hr);
 
+               if (isNonDestructive() && hr.isDelivered()) {
+                  hr.resetDelivered();
+                  // since we're replacing a ref that was already delivered we 
want to trigger a "special" delivery for this new replacement

Review comment:
       I - stale comment... the "special" word...
   
   ii - just an idea... what about rename repeatNextDelivery(messageReference) 
as nextDelivery(messageReference)
   
   and call it here instead of using the collection directly? 
   
   just a suggestion!
   

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
##########
@@ -588,4 +599,87 @@ private void sendLVQTombstone(ConnectionSupplier 
producerConnectionSupplier, Str
          producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, 
javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
       }
    }
+
+   @Test
+   public void testMultipleLastValuesCore() throws Exception {
+      testMultipleLastValues(CoreConnection);
+   }
+
+   @Test
+   public void testMultipleLastValuesAMQP() throws Exception {
+      testMultipleLastValues(AMQPConnection);
+   }
+
+   private void testMultipleLastValues(ConnectionSupplier connectionSupplier) 
throws Exception {
+      HashMap<String, List<String>> results = new HashMap<>();
+      results.put("0", new ArrayList<>());
+      results.put("1", new ArrayList<>());
+      results.put("2", new ArrayList<>());
+      HashMap<String, Integer> dups = new HashMap<>();
+
+      try (Connection connection = connectionSupplier.createConnection();
+           Connection connection2 = connectionSupplier.createConnection()) {
+         Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+         MessageProducer producer = session2.createProducer(queue);
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+
+         Thread t = new Thread(() -> {
+            for (int i = 0; i < 100; i++) {
+               String lastval = "" + (i % 3);
+               try {
+                  TextMessage message = session2.createTextMessage();
+                  message.setText("" + i);
+                  message.setStringProperty("data", "" + i);
+                  message.setStringProperty("lastval", lastval);
+                  
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+                  producer.send(message);
+               } catch (JMSException e) {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         t.start();
+         while (true) {
+            TextMessage tm = (TextMessage) consumer.receive(500);
+            if (tm == null) {
+               break;
+            }
+            results.get(tm.getStringProperty("lastval")).add(tm.getText());
+            tm.acknowledge();
+         }
+      }
+      for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+         StringBuilder message = new StringBuilder();
+         message.append("Messages received with lastval=" + entry.getKey() + " 
(");
+         for (String s : entry.getValue()) {
+            int occurrences = Collections.frequency(entry.getValue(), s);
+            if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+               dups.put(s, occurrences);
+            }
+            message.append(s + ",");
+         }
+         logger.info(message + ")");
+
+      }
+      if (dups.size() > 0) {
+         StringBuffer sb = new StringBuffer();
+         for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) 
{
+            sb.append(stringIntegerEntry.getKey() + "(" + 
stringIntegerEntry.getValue() + "),");
+         }
+         Assert.fail("Duplicate messages received " + sb);
+      }
+
+      if (persistenceEnabled) {
+         Wait.assertTrue(() -> {

Review comment:
       Ohh.. I see. I didn't know it was running non persistently. thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 601359)
    Time Spent: 5h  (was: 4h 50m)

> Potential duplicate messages with LVQ + non-destructive
> -------------------------------------------------------
>
>                 Key: ARTEMIS-3285
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3285
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to