This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 27008758fe ARTEMIS-3986 CME when using LVQ
     new 0c95fff865 This closes #4218
27008758fe is described below

commit 27008758fe7ea1290e51845576ae6f806b567340
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Sep 14 22:39:34 2022 -0500

    ARTEMIS-3986 CME when using LVQ
    
    The map used by LastValueQueue was inadvertently changed to a
    non-thread-safe implementation in
    4a4765c39cb73438ea2199b6e0937566d3556c10. This resulted in an occasional
    ConcurrentModificationException from the hashCode implementation.
    
    This commit restores the thread-safe map implementation and adds a test
    which brute-forces a CME when using the non-thread-safe implementation.
---
 .../artemis/core/server/impl/LastValueQueue.java   |  4 +-
 .../artemis/tests/integration/server/LVQTest.java  | 71 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 82cc672450..52631c3a4e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,9 +17,9 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -54,7 +54,7 @@ import org.jboss.logging.Logger;
 public class LastValueQueue extends QueueImpl {
 
    private static final Logger logger = Logger.getLogger(LastValueQueue.class);
-   private final Map<SimpleString, MessageReference> map = new HashMap<>();
+   private final Map<SimpleString, MessageReference> map = new 
ConcurrentHashMap<>();
    private final SimpleString lastValueKey;
 
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index e9a1049ebb..c12cf08be9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.server;
 
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -33,6 +37,7 @@ import 
org.apache.activemq.artemis.core.server.impl.LastValueQueue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.RetryMethod;
 import org.apache.activemq.artemis.utils.RetryRule;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@@ -829,6 +834,72 @@ public class LVQTest extends ActiveMQTestBase {
       assertEquals(queue.getDeliveringSize(), 0);
    }
 
+   @Test
+   public void testConcurrency() throws Exception {
+      AtomicBoolean cme = new AtomicBoolean(false);
+
+      AtomicBoolean hash = new AtomicBoolean(true);
+      Queue lvq = server.locateQueue(qName1);
+      Thread hashCodeThread = new Thread(() -> {
+         while (hash.get()) {
+            try {
+               int hashCode = lvq.hashCode();
+            } catch (ConcurrentModificationException e) {
+               cme.set(true);
+               return;
+            }
+         }
+      });
+      hashCodeThread.start();
+
+      AtomicBoolean consume = new AtomicBoolean(true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      clientSessionTxReceives.start();
+      Thread consumerThread = new Thread(() -> {
+         while (consume.get()) {
+            try {
+               ClientMessage m = consumer.receive();
+               m.acknowledge();
+               clientSessionTxReceives.commit();
+            } catch (ActiveMQException e) {
+               e.printStackTrace();
+               return;
+            }
+         }
+      });
+      consumerThread.start();
+
+      ClientProducer producer = clientSessionTxSends.createProducer(address);
+      SimpleString lastValue = RandomUtil.randomSimpleString();
+      AtomicBoolean produce = new AtomicBoolean(true);
+      Thread producerThread = new Thread(() -> {
+         for (int i = 0; !cme.get() && produce.get(); i++) {
+            ClientMessage m = createTextMessage(clientSession, "m" + i, false);
+            m.putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValue);
+            try {
+               producer.send(m);
+               clientSessionTxSends.commit();
+            } catch (ActiveMQException e) {
+               e.printStackTrace();
+               return;
+            }
+         }
+      });
+      producerThread.start();
+      producerThread.join(5000);
+
+      try {
+         assertFalse(cme.get());
+      } finally {
+         produce.set(false);
+         producerThread.join();
+         consume.set(false);
+         consumerThread.join();
+         hash.set(false);
+         hashCodeThread.join();
+      }
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {

Reply via email to