This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 91debf25db ARTEMIS-4418 use consumer delivery sequence in messageId
for openwire broker sequence id, makes delivery count calculation independent
of message order
91debf25db is described below
commit 91debf25dbf0f3924d511ee615c6f9b0545e2a5f
Author: Gary Tully <[email protected]>
AuthorDate: Wed Sep 6 17:51:47 2023 +0100
ARTEMIS-4418 use consumer delivery sequence in messageId for openwire
broker sequence id, makes delivery count calculation independent of message
order
---
.../openwire/OpenWireMessageConverter.java | 9 +++++----
.../core/protocol/openwire/amq/AMQConsumer.java | 6 ++++--
.../openwire/OpenWireMessageConverterTest.java | 22 +++++++++++-----------
.../core/server/impl/ServerConsumerImpl.java | 1 +
.../PrefetchRedeliveryCountOpenwireTest.java | 14 ++++++++++++--
5 files changed, 33 insertions(+), 19 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 915809d377..e6d69757d3 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -481,15 +481,16 @@ public final class OpenWireMessageConverter {
public static MessageDispatch createMessageDispatch(MessageReference
reference,
ICoreMessage message,
WireFormat marshaller,
- AMQConsumer consumer,
UUID serverNodeUUID) throws IOException {
+ AMQConsumer consumer,
+ UUID serverNodeUUID,
+ long
consumerDeliverySequenceId) throws IOException {
ActiveMQMessage amqMessage = toAMQMessage(reference, message,
marshaller, consumer, serverNodeUUID);
- //we can use core message id for sequenceId
- amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
+
amqMessage.getMessageId().setBrokerSequenceId(consumerDeliverySequenceId);
MessageDispatch md = new MessageDispatch();
md.setConsumerId(consumer.getId());
md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
-
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
+ md.setDeliverySequenceId(consumerDeliverySequenceId);
md.setMessage(amqMessage);
ActiveMQDestination destination = amqMessage.getDestination();
md.setDestination(destination);
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3165b14faf..38a249e04c 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -81,6 +82,7 @@ public class AMQConsumer {
private boolean internalAddress = false;
private volatile Set<MessageReference> rolledbackMessageRefs;
private ScheduledFuture<?> delayedDispatchPrompter;
+ private AtomicLong deliveredSequenceId = new AtomicLong(0);
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
@@ -292,7 +294,7 @@ public class AMQConsumer {
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
}
//handleDeliver is performed by an executor (see JBPAPP-6030): any
AMQConsumer can share the session.wireFormat()
- dispatch = OpenWireMessageConverter.createMessageDispatch(reference,
message, session.wireFormat(), this,
session.getCoreServer().getNodeManager().getUUID());
+ dispatch = OpenWireMessageConverter.createMessageDispatch(reference,
message, session.wireFormat(), this,
session.getCoreServer().getNodeManager().getUUID(),
deliveredSequenceId.getAndIncrement());
int size = dispatch.getMessage().getSize();
reference.setProtocolData(MessageId.class,
dispatch.getMessage().getMessageId());
session.deliverMessage(dispatch);
@@ -446,7 +448,7 @@ public class AMQConsumer {
// treat as delivered
return true;
}
- if (ref.getMessageID() <= info.getLastDeliveredSequenceId() &&
!isRolledBack(ref)) {
+ if (ref.getProtocolData(MessageId.class).getBrokerSequenceId() <=
info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
// treat as delivered
return true;
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index 92a1bfb3c1..0580f5cad0 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -73,7 +73,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch dispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, msg,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch dispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, msg,
openWireFormat, amqConsumer, nodeUUID, i);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@@ -95,7 +95,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch dispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, msg,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch dispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, msg,
openWireFormat, amqConsumer, nodeUUID, i);
MessageId messageId = dispatch.getMessage().getMessageId();
assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@@ -114,7 +114,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey)
instanceof String);
}
@@ -130,7 +130,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch marshalled = (MessageDispatch)
openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference,
coreMessage, openWireFormat, amqConsumer, nodeUUID)));
+ MessageDispatch marshalled = (MessageDispatch)
openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference,
coreMessage, openWireFormat, amqConsumer, nodeUUID, 0)));
assertEquals(5, marshalled.getMessage().getProperties().keySet().size());
Message converted =
OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null);
for (int i = 0; i < 5; i++) {
@@ -161,7 +161,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch classicMessageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage)
artemisMessage, openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch classicMessageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage)
artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(PRODUCER_ID,
classicMessageDispatch.getMessage().getProducerId().toString());
}
@@ -178,7 +178,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(PRODUCER_ID,
messageDispatch.getMessage().getProducerId().toString());
}
@@ -194,7 +194,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch classicMessageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage)
artemisMessage, openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch classicMessageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage)
artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(MESSAGE_ID,
classicMessageDispatch.getMessage().getMessageId().toString());
}
@@ -211,7 +211,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(MESSAGE_ID,
messageDispatch.getMessage().getMessageId().toString());
}
@@ -228,7 +228,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals("queue://" + DESTINATION,
messageDispatch.getMessage().getOriginalDestination().toString());
}
@@ -245,7 +245,7 @@ public class OpenWireMessageConverterTest {
MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals("queue://" + DESTINATION,
messageDispatch.getMessage().getReplyTo().toString());
}
@@ -269,7 +269,7 @@ public class OpenWireMessageConverterTest {
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
- MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID, 0);
assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index fa389b0912..c9d1215ae1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -656,6 +656,7 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
browserDeliverer.close();
} else {
messageQueue.removeConsumer(this);
+ messageQueue.deliverAsync();
}
session.removeConsumer(id);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index b5dab5f344..9d086a1f54 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -55,11 +55,20 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
}
@Test(timeout = 60_000)
- public void testConsumerSingleMessageLoop() throws Exception {
+ public void testConsumerSingleMessageLoopExclusive() throws Exception {
+ doTestConsumerSingleMessageLoop(true);
+ }
+
+ @Test(timeout = 60_000)
+ public void testConsumerSingleMessageLoopNonExclusive() throws Exception {
+ doTestConsumerSingleMessageLoop(false);
+ }
+
+ public void doTestConsumerSingleMessageLoop(boolean exclusive) throws
Exception {
Connection exConn = null;
SimpleString durableQueue = new SimpleString("exampleQueue");
- this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(exclusive));
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
@@ -157,6 +166,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
}
session.commit();
+ // force a local socket close such that the broker sees an
exception on the connection and fails the consumer via close
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
exConn.close();
}