This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b084eac71d ARTEMIS-5065 Remove Mirrored Properties on send for 
OpenWire and Core
b084eac71d is described below

commit b084eac71d168303019b799e0d267210c19677f7
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Sep 25 10:06:10 2024 -0400

    ARTEMIS-5065 Remove Mirrored Properties on send for OpenWire and Core
---
 .../artemis/utils/collections/TypedProperties.java | 42 +++++++++++++-
 .../artemis/utils/TypedPropertiesTest.java         | 14 +++++
 .../apache/activemq/artemis/api/core/Message.java  |  9 +++
 .../artemis/core/message/impl/CoreMessage.java     | 12 +++-
 .../core/postoffice/impl/PostOfficeImpl.java       |  3 +
 .../tests/integration/amqp/AmqpTestSupport.java    |  2 +-
 .../integration/amqp/connect/BrokerInSyncTest.java | 66 +++++++++++++++++++++-
 7 files changed, 140 insertions(+), 8 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
index 6450cd3bf1..c64db1de29 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/TypedProperties.java
@@ -66,13 +66,21 @@ public class TypedProperties {
 
    private final Predicate<SimpleString> internalPropertyPredicate;
    private boolean internalProperties;
+   private final Predicate<SimpleString> amqpPropertyPredicate;
+   private boolean amqpProperties;
 
    public TypedProperties() {
       this.internalPropertyPredicate = null;
+      this.amqpPropertyPredicate = null;
    }
 
    public TypedProperties(Predicate<SimpleString> internalPropertyPredicate) {
+      this(internalPropertyPredicate, null);
+   }
+
+   public TypedProperties(Predicate<SimpleString> internalPropertyPredicate, 
Predicate<SimpleString> amqpPropertyPredicate) {
       this.internalPropertyPredicate = internalPropertyPredicate;
+      this.amqpPropertyPredicate = amqpPropertyPredicate;
    }
 
    /**
@@ -96,6 +104,8 @@ public class TypedProperties {
          size = other.size;
          internalPropertyPredicate = other.internalPropertyPredicate;
          internalProperties = other.internalProperties;
+         amqpPropertyPredicate = other.amqpPropertyPredicate;
+         amqpProperties = other.amqpProperties;
       }
    }
 
@@ -340,6 +350,10 @@ public class TypedProperties {
       return internalProperties && removeInternalProperties();
    }
 
+   public synchronized boolean clearAMQPProperties() {
+      return amqpProperties && removeAMQPProperties();
+   }
+
    private synchronized boolean removeInternalProperties() {
       if (internalPropertyPredicate == null) {
          return false;
@@ -350,20 +364,40 @@ public class TypedProperties {
       if (properties.isEmpty()) {
          return false;
       }
+      boolean removed = removePredicate(internalPropertyPredicate);
+      internalProperties = false;
+      return removed;
+   }
+
+   private synchronized boolean removeAMQPProperties() {
+      if (amqpPropertyPredicate == null) {
+         return false;
+      }
+      if (properties == null) {
+         return false;
+      }
+      if (properties.isEmpty()) {
+         return false;
+      }
+      boolean removed = removePredicate(amqpPropertyPredicate);
+      amqpProperties = false;
+      return removed;
+   }
+
+   private boolean removePredicate(Predicate<SimpleString> predicate) {
       int removedBytes = 0;
       boolean removed = false;
       final Iterator<Entry<SimpleString, PropertyValue>> keyNameIterator = 
properties.entrySet().iterator();
       while (keyNameIterator.hasNext()) {
          final Entry<SimpleString, PropertyValue> entry = 
keyNameIterator.next();
          final SimpleString propertyName = entry.getKey();
-         if (internalPropertyPredicate.test(propertyName)) {
+         if (predicate.test(propertyName)) {
             final PropertyValue propertyValue = entry.getValue();
             removedBytes += propertyName.sizeof() + propertyValue.encodeSize();
             keyNameIterator.remove();
             removed = true;
          }
       }
-      internalProperties = false;
       size -= removedBytes;
       return removed;
    }
@@ -645,6 +679,10 @@ public class TypedProperties {
          internalProperties = true;
       }
 
+      if (!amqpProperties && amqpPropertyPredicate != null && 
amqpPropertyPredicate.test(key)) {
+         amqpProperties = true;
+      }
+
       if (properties == null) {
          properties = new HashMap<>();
       }
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
index 476f3f06de..d75de7d4f8 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/TypedPropertiesTest.java
@@ -239,6 +239,7 @@ public class TypedPropertiesTest {
    }
 
    private static final SimpleString PROP_NAME = SimpleString.of("TEST_PROP");
+   private static final SimpleString AMQP_NAME = SimpleString.of("AMQP_NAME");
 
    @Test
    public void testCannotClearInternalPropertiesIfEmpty() {
@@ -254,6 +255,19 @@ public class TypedPropertiesTest {
       assertFalse(properties.containsProperty(PROP_NAME));
    }
 
+   @Test
+   public void testClearAMQPPropertiesIfAny() {
+      TypedProperties properties = new TypedProperties(PROP_NAME::equals, 
AMQP_NAME::equals);
+      properties.putBooleanProperty(PROP_NAME, RandomUtil.randomBoolean());
+      properties.putBooleanProperty(AMQP_NAME, RandomUtil.randomBoolean());
+      assertTrue(properties.clearInternalProperties());
+      assertFalse(properties.containsProperty(PROP_NAME));
+      assertTrue(properties.containsProperty(AMQP_NAME));
+      assertTrue(properties.clearAMQPProperties());
+      assertFalse(properties.clearInternalProperties());
+      assertFalse(properties.containsProperty(AMQP_NAME));
+   }
+
    @Test
    public void testCannotClearInternalPropertiesTwiceIfAny() {
       TypedProperties properties = new TypedProperties(PROP_NAME::equals);
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 2c6beaefda..81ae3c2b4b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -80,12 +80,18 @@ public interface Message {
    // The value is somewhat higher on 64 bit architectures, probably due to 
different alignment
    int memoryOffset = 352;
 
+   SimpleString PREFIX_AMQP_ANNOTATIONS = SimpleString.of("x-opt-amq");
+
    // We use properties to establish routing context on clustering.
    // However if the client resends the message after receiving, it needs to 
be removed, so we mark these internal
    Predicate<SimpleString> INTERNAL_PROPERTY_NAMES_PREDICATE =
       name -> (name.startsWith(Message.HDR_ROUTE_TO_IDS) && 
!name.equals(Message.HDR_ROUTE_TO_IDS)) ||
       (name.startsWith(Message.HDR_ROUTE_TO_ACK_IDS) && 
!name.equals(Message.HDR_ROUTE_TO_ACK_IDS));
 
+   // We use certain AMQP properteis in mirror. We have to remove them in case 
of protocol conversion between AMQP and CORE
+   Predicate<SimpleString> AMQP_PROPERTY_PREDICATE =
+      name -> (name.startsWith(Message.PREFIX_AMQP_ANNOTATIONS));
+
    SimpleString HDR_ROUTE_TO_IDS = SimpleString.of("_AMQ_ROUTE_TO");
 
    SimpleString HDR_SCALEDOWN_TO_IDS = SimpleString.of("_AMQ_SCALEDOWN_TO");
@@ -201,6 +207,9 @@ public interface Message {
       // only on core
    }
 
+   default void clearAMQPProperties() {
+   }
+
    /**
     * Search for the existence of the property: an implementor can save
     * the message to be decoded, if possible.
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 8575fc573b..aa890f0584 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -160,6 +160,14 @@ public class CoreMessage extends RefCountMessage 
implements ICoreMessage {
       }
    }
 
+   @Override
+   public void clearAMQPProperties() {
+      final TypedProperties properties = this.properties;
+      if (properties != null && properties.clearAMQPProperties()) {
+         messageChanged();
+      }
+   }
+
    @Override
    public Persister<Message> getPersister() {
       return CoreMessagePersister.getInstance();
@@ -621,7 +629,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
       try {
          TypedProperties properties = this.properties;
          if (properties == null) {
-            properties = new 
TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
+            properties = new 
TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE, AMQP_PROPERTY_PREDICATE);
             if (buffer != null && propertiesLocation >= 0) {
                final ByteBuf byteBuf = 
buffer.duplicate().readerIndex(propertiesLocation);
                properties.decode(byteBuf, coreMessageObjectPools == null ? 
null : coreMessageObjectPools.getPropertiesDecoderPools());
@@ -737,7 +745,7 @@ public class CoreMessage extends RefCountMessage implements 
ICoreMessage {
          properties = null;
          propertiesLocation = buffer.readerIndex();
       } else {
-         properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE);
+         properties = new TypedProperties(INTERNAL_PROPERTY_NAMES_PREDICATE, 
AMQP_PROPERTY_PREDICATE);
          properties.decode(buffer, pools == null ? null : 
pools.getPropertiesDecoderPools());
       }
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1d87833c06..8d38b131d4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1189,6 +1189,9 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       } else {
          startedTX = false;
       }
+      if (context.getMirrorSource() == null) {
+         message.clearAMQPProperties();
+      }
       message.clearInternalProperties();
       Bindings bindings;
       final AddressInfo addressInfo = checkAddress(context, address);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
index 1759d01b1f..faed01ad3b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -224,7 +224,7 @@ public class AmqpTestSupport extends ActiveMQTestBase {
    }
 
    protected String getConfiguredProtocols() {
-      return "AMQP,OPENWIRE";
+      return "AMQP,OPENWIRE,CORE";
    }
 
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
index 25849f6ba8..0ef3e5085f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/BrokerInSyncTest.java
@@ -29,9 +29,11 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 import java.io.PrintStream;
 import java.net.URI;
+import java.util.Enumeration;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -159,9 +161,22 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       server.stop();
    }
 
-
    @Test
    public void testSingleMessage() throws Exception {
+      testSingleMessage("AMQP");
+   }
+
+   @Test
+   public void testSingleMessageCore() throws Exception {
+      testSingleMessage("CORE");
+   }
+
+   @Test
+   public void testSingleMessageOpenWire() throws Exception {
+      testSingleMessage("OPENWIRE");
+   }
+
+   public void testSingleMessage(String protocol) throws Exception {
       server.setIdentity("Server1");
       {
          AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + 
AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
@@ -187,12 +202,12 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
       Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
 
-      ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+      ConnectionFactory cf1 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT);
       Connection connection1 = cf1.createConnection();
       Session session1 = connection1.createSession(true, 
Session.SESSION_TRANSACTED);
       connection1.start();
 
-      ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT_2);
+      ConnectionFactory cf2 = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:" + AMQP_PORT_2);
       Connection connection2 = cf2.createConnection();
       Session session2 = connection2.createSession(true, 
Session.SESSION_TRANSACTED);
       connection2.start();
@@ -229,14 +244,59 @@ public class BrokerInSyncTest extends 
AmqpClientTestSupport {
       Wait.assertEquals(2, queueOnServer1::getMessageCount);
       Wait.assertEquals(2, queueOnServer2::getMessageCount);
 
+
+      connection2.start();
+      try (MessageConsumer consumer = session2.createConsumer(queue)) {
+         javax.jms.Message receivedMessage = consumer.receive(5000);
+         assertNotNull(message);
+         checkProperties(connection2, receivedMessage);
+         session2.commit();
+      }
+
+      Wait.assertEquals(1L, queueOnServer1::getMessageCount, 5000, 100);
+      Wait.assertEquals(1L, queueOnServer2::getMessageCount, 5000, 100);
+
+      connection1.start();
+      try (MessageConsumer consumer = session1.createConsumer(queue)) {
+         javax.jms.Message receivedMessage = consumer.receive(5000);
+         assertNotNull(message);
+         checkProperties(connection1, receivedMessage);
+         session1.commit();
+      }
+
       connection1.close();
       connection2.close();
 
+      Wait.assertEquals(0L, queueOnServer1::getMessageCount, 5000, 100);
+      Wait.assertEquals(0L, queueOnServer2::getMessageCount, 5000, 100);
+
       server_2.stop();
       server.stop();
    }
 
 
+   private void checkProperties(Connection connection, javax.jms.Message 
message) throws Exception {
+      try (Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+         TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+         MessageProducer producer = session.createProducer(temporaryQueue);
+         producer.send(message);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(temporaryQueue);
+         javax.jms.Message receivedMessage = consumer.receive(5000);
+         assertNotNull(receivedMessage);
+
+         // The cleanup for x-opt happens on server's side.
+         // we may receive if coming directly from a mirrored queue,
+         // however we should cleanup on the next send to avoid invalid IDs on 
the server
+         Enumeration propertyNames = receivedMessage.getPropertyNames();
+         while (propertyNames.hasMoreElements()) {
+            String property = String.valueOf(propertyNames.nextElement());
+            assertFalse(property.startsWith("x-opt"));
+         }
+      }
+   }
+
+
    private org.apache.activemq.artemis.core.server.Queue 
locateQueueWithWait(ActiveMQServer server, String queueName) throws Exception {
       Wait.assertTrue(() -> server.locateQueue(queueName) != null);
       org.apache.activemq.artemis.core.server.Queue queue = 
server.locateQueue(queueName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to