This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 27008758fe ARTEMIS-3986 CME when using LVQ
new 0c95fff865 This closes #4218
27008758fe is described below
commit 27008758fe7ea1290e51845576ae6f806b567340
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Sep 14 22:39:34 2022 -0500
ARTEMIS-3986 CME when using LVQ
The map used by LastValueQueue was inadvertently changed to a
non-thread-safe implementation in
4a4765c39cb73438ea2199b6e0937566d3556c10. This resulted in an occasional
ConcurrentModificationException from the hashCode implementation.
This commit restores the thread-safe map implementation and adds a test
which brute-forces a CME when using the non-thread-safe implementation.
---
.../artemis/core/server/impl/LastValueQueue.java | 4 +-
.../artemis/tests/integration/server/LVQTest.java | 71 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 82cc672450..52631c3a4e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -54,7 +54,7 @@ import org.jboss.logging.Logger;
public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class);
- private final Map<SimpleString, MessageReference> map = new HashMap<>();
+ private final Map<SimpleString, MessageReference> map = new
ConcurrentHashMap<>();
private final SimpleString lastValueKey;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index e9a1049ebb..c12cf08be9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.server;
+import java.util.ConcurrentModificationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -33,6 +37,7 @@ import
org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.RetryMethod;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
@@ -829,6 +834,72 @@ public class LVQTest extends ActiveMQTestBase {
assertEquals(queue.getDeliveringSize(), 0);
}
+ @Test
+ public void testConcurrency() throws Exception {
+ AtomicBoolean cme = new AtomicBoolean(false);
+
+ AtomicBoolean hash = new AtomicBoolean(true);
+ Queue lvq = server.locateQueue(qName1);
+ Thread hashCodeThread = new Thread(() -> {
+ while (hash.get()) {
+ try {
+ int hashCode = lvq.hashCode();
+ } catch (ConcurrentModificationException e) {
+ cme.set(true);
+ return;
+ }
+ }
+ });
+ hashCodeThread.start();
+
+ AtomicBoolean consume = new AtomicBoolean(true);
+ ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+ clientSessionTxReceives.start();
+ Thread consumerThread = new Thread(() -> {
+ while (consume.get()) {
+ try {
+ ClientMessage m = consumer.receive();
+ m.acknowledge();
+ clientSessionTxReceives.commit();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ });
+ consumerThread.start();
+
+ ClientProducer producer = clientSessionTxSends.createProducer(address);
+ SimpleString lastValue = RandomUtil.randomSimpleString();
+ AtomicBoolean produce = new AtomicBoolean(true);
+ Thread producerThread = new Thread(() -> {
+ for (int i = 0; !cme.get() && produce.get(); i++) {
+ ClientMessage m = createTextMessage(clientSession, "m" + i, false);
+ m.putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValue);
+ try {
+ producer.send(m);
+ clientSessionTxSends.commit();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ });
+ producerThread.start();
+ producerThread.join(5000);
+
+ try {
+ assertFalse(cme.get());
+ } finally {
+ produce.set(false);
+ producerThread.join();
+ consume.set(false);
+ consumerThread.join();
+ hash.set(false);
+ hashCodeThread.join();
+ }
+ }
+
@Override
@Before
public void setUp() throws Exception {