This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 3621258 ARTEMIS-3285 potential duplicate messages with LVQ +
non-destructive
3621258 is described below
commit 36212584582779f8b57dea06f80292eb6f89599a
Author: Justin Bertram <[email protected]>
AuthorDate: Thu May 6 14:22:17 2021 -0500
ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive
---
.../org/apache/activemq/artemis/utils/Wait.java | 10 +-
.../artemis/core/server/impl/LastValueQueue.java | 74 +++++++++---
.../artemis/core/server/impl/QueueImpl.java | 44 +++++---
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../integration/amqp/JMSNonDestructiveTest.java | 125 +++++++++++++++++++++
5 files changed, 221 insertions(+), 34 deletions(-)
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 8e8d61d..62c8c2d 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -66,10 +66,16 @@ public class Wait {
}
public static void assertEquals(Long size, LongCondition condition, long
timeout, long sleepMillis) throws Exception {
- boolean result = waitFor(() -> condition.getCount() == size, timeout,
sleepMillis);
+ assertEquals(size, condition, timeout, sleepMillis, true);
+ }
+
+ public static void assertEquals(Long size, LongCondition condition, long
timeout, long sleepMillis, boolean printThreadDump) throws Exception {
+ boolean result = waitFor(() -> condition.getCount() == size, timeout,
sleepMillis, printThreadDump);
if (!result) {
- System.out.println(ThreadDumpUtil.threadDump("thread dump"));
+ if (printThreadDump) {
+ System.out.println(ThreadDumpUtil.threadDump("thread dump"));
+ }
Assert.fail(size + " != " + condition.getCount());
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 4305585..1ea280c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +45,7 @@ import
org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.jboss.logging.Logger;
/**
* A queue that will discard messages if a newer message with the same
@@ -56,9 +58,27 @@ import
org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@SuppressWarnings("ALL")
public class LastValueQueue extends QueueImpl {
+ private static final Logger logger = Logger.getLogger(LastValueQueue.class);
private final Map<SimpleString, HolderReference> map = new
ConcurrentHashMap<>();
private final SimpleString lastValueKey;
+ // only use this within synchronized methods or synchronized(this) blocks
+ protected final LinkedList<MessageReference> nextDeliveries = new
LinkedList<>();
+
+
+ /* in certain cases we need to redeliver a message */
+ @Override
+ protected MessageReference nextDelivery() {
+ return nextDeliveries.poll();
+ }
+
+ @Override
+ protected void repeatNextDelivery(MessageReference reference) {
+ // put the ref back onto the head of the list so that the next time
poll() is called this ref is returned
+ nextDeliveries.addFirst(reference);
+ }
+
+
@Deprecated
public LastValueQueue(final long persistenceID,
final SimpleString address,
@@ -151,25 +171,20 @@ public class LastValueQueue extends QueueImpl {
HolderReference hr = map.get(prop);
if (hr != null) {
- // We need to overwrite the old ref with the new one and ack the
old one
-
- replaceLVQMessage(ref, hr);
-
- if (isNonDestructive() && hr.isDelivered()) {
- hr.resetDelivered();
- //
--------------------------------------------------------------------------------
- // If non Destructive, and if a reference was previously
delivered
- // we would not be able to receive this message again
- // unless we reset the iterators
- // The message is not removed, so we can't actually remove it
- // a result of this operation is that previously delivered
messages
- // will probably be delivered again.
- // if we ever want to avoid other redeliveries we would have to
implement a reset or redeliver
- // operation on the iterator for a single message
- resetAllIterators();
- deliverAsync();
- }
+ if (isNonDestructive() && hr.isInDelivery()) {
+ // if the ref is already being delivered we'll do the replace
in the postAcknowledge
+ hr.setReplacementRef(ref);
+ } else {
+ // We need to overwrite the old ref with the new one and ack
the old one
+ replaceLVQMessage(ref, hr);
+ if (isNonDestructive() && hr.isDelivered()) {
+ hr.resetDelivered();
+ // since we're replacing a ref that was already delivered we
want to trigger a delivery for this new replacement
+ nextDeliveries.add(hr);
+ deliverAsync();
+ }
+ }
} else {
hr = new HolderReference(prop, ref);
@@ -247,6 +262,19 @@ public class LastValueQueue extends QueueImpl {
}
@Override
+ public void postAcknowledge(final MessageReference ref, AckReason reason) {
+ if (isNonDestructive()) {
+ if (ref instanceof HolderReference) {
+ HolderReference hr = (HolderReference) ref;
+ if (hr.getReplacementRef() != null) {
+ replaceLVQMessage(hr.getReplacementRef(), hr);
+ }
+ }
+ }
+ super.postAcknowledge(ref, reason);
+ }
+
+ @Override
public boolean allowsReferenceCallback() {
return false;
}
@@ -358,11 +386,21 @@ public class LastValueQueue extends QueueImpl {
private volatile MessageReference ref;
+ private volatile MessageReference replacementRef;
+
private long consumerID;
private boolean hasConsumerID = false;
+ public MessageReference getReplacementRef() {
+ return replacementRef;
+ }
+
+ public void setReplacementRef(MessageReference replacementRef) {
+ this.replacementRef = replacementRef;
+ }
+
public void resetDelivered() {
delivered = false;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 082c9cb..a8ce01d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -340,6 +340,17 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private volatile long ringSize;
+ /* in certain cases we need to redeliver a message directly.
+ * it's useful for usecases last LastValueQueue */
+ protected MessageReference nextDelivery() {
+ return null;
+ }
+
+ protected void repeatNextDelivery(MessageReference reference) {
+
+ }
+
+
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@@ -2969,12 +2980,16 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
holder.iter = messageReferences.iterator();
}
- if (holder.iter.hasNext()) {
+ ref = nextDelivery();
+ boolean nextDelivery = false;
+ if (ref != null) {
+ nextDelivery = true;
+ }
+
+ if (ref == null && holder.iter.hasNext()) {
ref = holder.iter.next();
- } else {
- ref = null;
- existingMemoryEstimate = 0;
}
+
if (ref == null) {
noDelivery++;
} else {
@@ -3022,14 +3037,18 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
- try {
- holder.iter.repeat();
- } catch (NoSuchElementException e) {
- // this could happen if there was an exception on the
queue handling
- // and it returned BUSY because of that exception
- //
- // We will just log it as there's nothing else we can do
now.
- logger.warn(e.getMessage(), e);
+ if (nextDelivery) {
+ repeatNextDelivery(ref);
+ } else {
+ try {
+ holder.iter.repeat();
+ } catch (NoSuchElementException e) {
+ // this could happen if there was an exception on the
queue handling
+ // and it returned BUSY because of that exception
+ //
+ // We will just log it as there's nothing else we can
do now.
+ logger.warn(e.getMessage(), e);
+ }
}
noDelivery++;
@@ -3067,7 +3086,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (handledconsumer != null) {
proceedDeliver(handledconsumer, ref);
}
-
}
return true;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 10b131a..6c4519b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -984,7 +984,7 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
}
if (ref == null) {
- ActiveMQIllegalStateException ils = new
ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
+ ActiveMQIllegalStateException ils =
ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID,
messageQueue.getName());
tx.markAsRollbackOnly(ils);
throw ils;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index c39381a..609d712 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -24,8 +24,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -44,6 +49,8 @@ import
org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RetryRule;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -52,6 +59,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport {
+ private static final Logger logger =
Logger.getLogger(JMSNonDestructiveTest.class);
+
@Rule
public RetryRule retryRule = new RetryRule(2);
@@ -588,4 +597,120 @@ public class JMSNonDestructiveTest extends
JMSClientTestSupport {
producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE,
javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
}
}
+
+ @Test
+ public void testMultipleLastValuesCore() throws Exception {
+ testMultipleLastValues(CoreConnection);
+ }
+
+ @Test
+ public void testMultipleLastValuesAMQP() throws Exception {
+ testMultipleLastValues(AMQPConnection);
+ }
+
+ private void testMultipleLastValues(ConnectionSupplier connectionSupplier)
throws Exception {
+ final int GROUP_COUNT = 5;
+ final int MESSAGE_COUNT_PER_GROUP = 25;
+ final int PRODUCER_COUNT = 5;
+
+ HashMap<String, List<String>> results = new HashMap<>();
+ for (int i = 0; i < GROUP_COUNT; i++) {
+ results.put(i + "", new ArrayList<>());
+ }
+
+ HashMap<String, Integer> dups = new HashMap<>();
+ List<Producer> producers = new ArrayList<>();
+
+ try (Connection connection = connectionSupplier.createConnection()) {
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < PRODUCER_COUNT; i++) {
+ producers.add(new Producer(connectionSupplier,
MESSAGE_COUNT_PER_GROUP, GROUP_COUNT, i));
+ }
+
+ for (Producer producer : producers) {
+ new Thread(producer).start();
+ }
+
+ while (true) {
+ TextMessage tm = (TextMessage) consumer.receive(500);
+ if (tm == null) {
+ break;
+ }
+ results.get(tm.getStringProperty("lastval")).add(tm.getText());
+ tm.acknowledge();
+ }
+
+ for (Producer producer : producers) {
+ assertFalse("Producer failed!", producer.failed);
+ }
+ }
+ for (Map.Entry<String, List<String>> entry : results.entrySet()) {
+ StringBuilder logMessage = new StringBuilder();
+ logMessage.append("Messages received with lastval=" + entry.getKey()
+ " (");
+ for (String s : entry.getValue()) {
+ int occurrences = Collections.frequency(entry.getValue(), s);
+ if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
+ dups.put(s, occurrences);
+ }
+ logMessage.append(s + ",");
+ }
+ logger.info(logMessage + ")");
+ }
+ if (dups.size() > 0) {
+ StringBuffer sb = new StringBuffer();
+ for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet())
{
+ sb.append(stringIntegerEntry.getKey() + "(" +
stringIntegerEntry.getValue() + "),");
+ }
+ Assert.fail("Duplicate messages received " + sb);
+ }
+
+ Wait.assertEquals((long) GROUP_COUNT, () ->
server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000,
100, false);
+ }
+
+ private class Producer implements Runnable {
+ private final ConnectionSupplier connectionSupplier;
+ private final int messageCount;
+ private final int groupCount;
+ private final int offset;
+
+ public boolean failed = false;
+
+ Producer(ConnectionSupplier connectionSupplier, int messageCount, int
groupCount, int offset) {
+ this.connectionSupplier = connectionSupplier;
+ this.messageCount = messageCount;
+ this.groupCount = groupCount;
+ this.offset = offset;
+ }
+
+ @Override
+ public void run() {
+ try (Connection connection = connectionSupplier.createConnection()) {
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
+ MessageProducer producer = session.createProducer(queue);
+
+ int startingPoint = offset * (messageCount * groupCount);
+ int messagesToSend = messageCount * groupCount;
+
+ for (int i = startingPoint; i < messagesToSend + startingPoint;
i++) {
+ String lastval = "" + (i % groupCount);
+ TextMessage message = session.createTextMessage();
+ message.setText("" + i);
+ message.setStringProperty("data", "" + i);
+ message.setStringProperty("lastval", lastval);
+
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
+ producer.send(message);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ failed = true;
+ return;
+ }
+ }
+ }
}
\ No newline at end of file