This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b084eac71d ARTEMIS-5065 Remove Mirrored Properties on send for
OpenWire and Core
b084eac71d is described below
commit b084eac71d168303019b799e0d267210c19677f7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 25 10:06:10 2024 -0400
ARTEMIS-5065 Remove Mirrored Properties on send for OpenWire and Core
---
.../artemis/utils/collections/TypedProperties.java | 42 +++++++++++++-
.../artemis/utils/TypedPropertiesTest.java | 14 +++++
.../apache/activemq/artemis/api/core/Message.java | 9 +++
.../artemis/core/message/impl/CoreMessage.java | 12 +++-
.../core/postoffice/impl/PostOfficeImpl.java | 3 +
.../tests/integration/amqp/AmqpTestSupport.java | 2 +-
.../integration/amqp/connect/BrokerInSyncTest.java | 66 +++++++++++++++++++++-
7 files changed, 140 insertions(+), 8 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 6450cd3bf1..c64db1de29 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -66,13 +66,21 @@ public class TypedProperties {
private final Predicate<SimpleString> internalPropertyPredicate;
private boolean internalProperties;
+ private final Predicate<SimpleString> amqpPropertyPredicate;
+ private boolean amqpProperties;
public TypedProperties() {
this.internalPropertyPredicate = null;
+ this.amqpPropertyPredicate = null;
}
public TypedProperties(Predicate<SimpleString> internalPropertyPredicate) {
+ this(internalPropertyPredicate, null);
+ }
+
+ public TypedProperties(Predicate<SimpleString> internalPropertyPredicate,
Predicate<SimpleString> amqpPropertyPredicate) {
this.internalPropertyPredicate = internalPropertyPredicate;
+ this.amqpPropertyPredicate = amqpPropertyPredicate;
}
/**
@@ -96,6 +104,8 @@ public class TypedProperties {
size = other.size;
internalPropertyPredicate = other.internalPropertyPredicate;
internalProperties = other.internalProperties;
+ amqpPropertyPredicate = other.amqpPropertyPredicate;
+ amqpProperties = other.amqpProperties;
}
}
@@ -340,6 +350,10 @@ public class TypedProperties {
return internalProperties && removeInternalProperties();
}
+ public synchronized boolean clearAMQPProperties() {
+ return amqpProperties && removeAMQPProperties();
+ }
+
private synchronized boolean removeInternalProperties() {
if (internalPropertyPredicate == null) {
return false;
@@ -350,20 +364,40 @@ public class TypedProperties {
if (properties.isEmpty()) {
return false;
}
+ boolean removed = removePredicate(internalPropertyPredicate);
+ internalProperties = false;
+ return removed;
+ }
+
+ private synchronized boolean removeAMQPProperties() {
+ if (amqpPropertyPredicate == null) {
+ return false;
+ }
+ if (properties == null) {
+ return false;
+ }
+ if (properties.isEmpty()) {
+ return false;
+ }
+ boolean removed = removePredicate(amqpPropertyPredicate);
+ amqpProperties = false;
+ return removed;
+ }
+
+ private boolean removePredicate(Predicate<SimpleString> predicate) {
int removedBytes = 0;
boolean removed = false;
final Iterator<Entry<SimpleString, PropertyValue>> keyNameIterator =
properties.entrySet().iterator();
while (keyNameIterator.hasNext()) {
final Entry<SimpleString, PropertyValue> entry =
keyNameIterator.next();
final SimpleString propertyName = entry.getKey();
- if (internalPropertyPredicate.test(propertyName)) {
+ if (predicate.test(propertyName)) {
final PropertyValue propertyValue = entry.getValue();
removedBytes += propertyName.sizeof() + propertyValue.encodeSize();
keyNameIterator.remove();
removed = true;
}
}
- internalProperties = false;
size -= removedBytes;
return removed;
}
@@ -645,6 +679,10 @@ public class TypedProperties {
internalProperties = true;
}
+ if (!amqpProperties && amqpPropertyPredicate != null &&
amqpPropertyPredicate.test(key)) {
+ amqpProperties = true;
+ }
+
if (properties == null) {
properties = new HashMap<>();
}
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 476f3f06de..d75de7d4f8 100644
---
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -239,6 +239,7 @@ public class TypedPropertiesTest {
}
private static final SimpleString PROP_NAME = SimpleString.of("TEST_PROP");
+ private static final SimpleString AMQP_NAME = SimpleString.of("AMQP_NAME");
@Test
public void testCannotClearInternalPropertiesIfEmpty() {
@@ -254,6 +255,19 @@ public class TypedPropertiesTest {
assertFalse(properties.containsProperty(PROP_NAME));
}
+ @Test
+ public void testClearAMQPPropertiesIfAny() {
+ TypedProperties properties = new TypedProperties(PROP_NAME::equals,
AMQP_NAME::equals);
+ properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
+ properties.putBooleanProperty(AMQP_NAME, RandomUtil.randomBoolean());
+ assertTrue(properties.clearInternalProperties());
+ assertFalse(properties.containsProperty(PROP_NAME));
+ assertTrue(properties.containsProperty(AMQP_NAME));
+ assertTrue(properties.clearAMQPProperties());
+ assertFalse(properties.clearInternalProperties());
+ assertFalse(properties.containsProperty(AMQP_NAME));
+ }
+
@Test
public void testCannotClearInternalPropertiesTwiceIfAny() {
TypedProperties properties = new TypedProperties(PROP_NAME::equals);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 2c6beaefda..81ae3c2b4b 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -80,12 +80,18 @@ public interface Message {
// The value is somewhat higher on 64 bit architectures, probably due to
different alignment
int memoryOffset = 352;
+ SimpleString PREFIX_AMQP_ANNOTATIONS = SimpleString.of("x-opt-amq");
+
// We use properties to establish routing context on clustering.
// However if the client resends the message after receiving, it needs to
be removed, so we mark these internal
Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) &&
!name.equals(Message.HDR_ROUTE_TO_IDS)) ||
(name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) &&
!name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
+ // We use certain AMQP properteis in mirror. We have to remove them in case
of protocol conversion between AMQP and CORE
+ Predicate<SimpleString> AMQP_PROPERTY_PREDICATE =
+ name -> (name.startsWith(Message.PREFIX_AMQP_ANNOTATIONS));
+
SimpleString HDR_ROUTE_TO_IDS = SimpleString.of("_AMQ_ROUTE_TO");
SimpleString HDR_SCALEDOWN_TO_IDS = SimpleString.of("_AMQ_SCALEDOWN_TO");
@@ -201,6 +207,9 @@ public interface Message {
// only on core
}
+ default void clearAMQPProperties() {
+ }
+
/**
* Search for the existence of the property: an implementor can save
* the message to be decoded, if possible.
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 8575fc573b..aa890f0584 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -160,6 +160,14 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
}
}
+ @Override
+ public void clearAMQPProperties() {
+ final TypedProperties properties = this.properties;
+ if (properties != null && properties.clearAMQPProperties()) {
+ messageChanged();
+ }
+ }
+
@Override
public Persister<Message> getPersister() {
return CoreMessagePersister.getInstance();
@@ -621,7 +629,7 @@ public class CoreMessage extends RefCountMessage implements
ICoreMessage {
try {
TypedProperties properties = this.properties;
if (properties == null) {
- properties = new
TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
+ properties = new
TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE, AMQP_PROPERTY_PREDICATE);
if (buffer != null && propertiesLocation >= 0) {
final ByteBuf byteBuf =
buffer.duplicate().readerIndex(propertiesLocation);
properties.decode(byteBuf, coreMessageObjectPools == null ?
null : coreMessageObjectPools.getPropertiesDecoderPools());
@@ -737,7 +745,7 @@ public class CoreMessage extends RefCountMessage implements
ICoreMessage {
properties = null;
propertiesLocation = buffer.readerIndex();
} else {
- properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
+ properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE,
AMQP_PROPERTY_PREDICATE);
properties.decode(buffer, pools == null ? null :
pools.getPropertiesDecoderPools());
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1d87833c06..8d38b131d4 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1189,6 +1189,9 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
} else {
startedTX = false;
}
+ if (context.getMirrorSource() == null) {
+ message.clearAMQPProperties();
+ }
message.clearInternalProperties();
Bindings bindings;
final AddressInfo addressInfo = checkAddress(context, address);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
index 1759d01b1f..faed01ad3b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -224,7 +224,7 @@ public class AmqpTestSupport extends ActiveMQTestBase {
}
protected String getConfiguredProtocols() {
- return "AMQP,OPENWIRE";
+ return "AMQP,OPENWIRE,CORE";
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 25849f6ba8..0ef3e5085f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -29,9 +29,11 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import java.io.PrintStream;
import java.net.URI;
+import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -159,9 +161,22 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
server.stop();
}
-
@Test
public void testSingleMessage() throws Exception {
+ testSingleMessage("AMQP");
+ }
+
+ @Test
+ public void testSingleMessageCore() throws Exception {
+ testSingleMessage("CORE");
+ }
+
+ @Test
+ public void testSingleMessageOpenWire() throws Exception {
+ testSingleMessage("OPENWIRE");
+ }
+
+ public void testSingleMessage(String protocol) throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" +
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
@@ -187,12 +202,12 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
- ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+ ConnectionFactory cf1 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true,
Session.SESSION_TRANSACTED);
connection1.start();
- ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT_2);
+ ConnectionFactory cf2 = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:" + AMQP_PORT_2);
Connection connection2 = cf2.createConnection();
Session session2 = connection2.createSession(true,
Session.SESSION_TRANSACTED);
connection2.start();
@@ -229,14 +244,59 @@ public class BrokerInSyncTest extends
AmqpClientTestSupport {
Wait.assertEquals(2, queueOnServer1::getMessageCount);
Wait.assertEquals(2, queueOnServer2::getMessageCount);
+
+ connection2.start();
+ try (MessageConsumer consumer = session2.createConsumer(queue)) {
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ assertNotNull(message);
+ checkProperties(connection2, receivedMessage);
+ session2.commit();
+ }
+
+ Wait.assertEquals(1L, queueOnServer1::getMessageCount, 5000, 100);
+ Wait.assertEquals(1L, queueOnServer2::getMessageCount, 5000, 100);
+
+ connection1.start();
+ try (MessageConsumer consumer = session1.createConsumer(queue)) {
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ assertNotNull(message);
+ checkProperties(connection1, receivedMessage);
+ session1.commit();
+ }
+
connection1.close();
connection2.close();
+ Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
+ Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
server_2.stop();
server.stop();
}
+ private void checkProperties(Connection connection, javax.jms.Message
message) throws Exception {
+ try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)) {
+ TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+ MessageProducer producer = session.createProducer(temporaryQueue);
+ producer.send(message);
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(temporaryQueue);
+ javax.jms.Message receivedMessage = consumer.receive(5000);
+ assertNotNull(receivedMessage);
+
+ // The cleanup for x-opt happens on server's side.
+ // we may receive if coming directly from a mirrored queue,
+ // however we should cleanup on the next send to avoid invalid IDs on
the server
+ Enumeration propertyNames = receivedMessage.getPropertyNames();
+ while (propertyNames.hasMoreElements()) {
+ String property = String.valueOf(propertyNames.nextElement());
+ assertFalse(property.startsWith("x-opt"));
+ }
+ }
+ }
+
+
private org.apache.activemq.artemis.core.server.Queue
locateQueueWithWait(ActiveMQServer server, String queueName) throws Exception {
Wait.assertTrue(() -> server.locateQueue(queueName) != null);
org.apache.activemq.artemis.core.server.Queue queue =
server.locateQueue(queueName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact