This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new bd1162d  ARTEMIS-2316 AMQPMessage missing ApplicationProperties from 
custom transformer
     new 7c6e5e6  This closes #2634
bd1162d is described below

commit bd1162d9b8b3563437dd4fc2dafa0fd4e56cd8df
Author: Luis De Bello <[email protected]>
AuthorDate: Tue Apr 23 16:34:13 2019 -0300

    ARTEMIS-2316 AMQPMessage missing ApplicationProperties from custom 
transformer
---
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  16 ++-
 .../amqp/AmqpBridgeApplicationProperties.java      | 121 +++++++++++++++++++++
 .../integration/amqp/AmqpClientTestSupport.java    |  22 +++-
 .../BridgeApplicationPropertiesTransformer.java    |  32 ++++++
 .../DivertApplicationPropertiesTransformer.java    |  34 ++++++
 5 files changed, 220 insertions(+), 5 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index df35115..a1c5830 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1223,7 +1223,9 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Object removeProperty(String key) {
-      return getApplicationPropertiesMap(false).remove(key);
+      Object removed = getApplicationPropertiesMap(false).remove(key);
+      messageChanged();
+      return removed;
    }
 
    @Override
@@ -1395,60 +1397,70 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public org.apache.activemq.artemis.api.core.Message 
putBooleanProperty(String key, boolean value) {
       getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putByteProperty(String 
key, byte value) {
       getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putBytesProperty(String 
key, byte[] value) {
       getApplicationPropertiesMap(true).put(key, value);
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putShortProperty(String 
key, short value) {
       getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putCharProperty(String 
key, char value) {
       getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putIntProperty(String 
key, int value) {
       getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putLongProperty(String 
key, long value) {
       getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message putFloatProperty(String 
key, float value) {
       getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message 
putDoubleProperty(String key, double value) {
       getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message 
putBooleanProperty(SimpleString key, boolean value) {
       getApplicationPropertiesMap(true).put(key.toString(), 
Boolean.valueOf(value));
+      messageChanged();
       return this;
    }
 
@@ -1495,12 +1507,14 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public org.apache.activemq.artemis.api.core.Message 
putStringProperty(String key, String value) {
       getApplicationPropertiesMap(true).put(key, value);
+      messageChanged();
       return this;
    }
 
    @Override
    public org.apache.activemq.artemis.api.core.Message 
putObjectProperty(String key, Object value) throws 
ActiveMQPropertyConversionException {
       getApplicationPropertiesMap(true).put(key, value);
+      messageChanged();
       return this;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
new file mode 100644
index 0000000..886d65e
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AmqpBridgeApplicationProperties extends AmqpClientTestSupport {
+
+   private ActiveMQServer server0;
+   private ActiveMQServer server1;
+
+   private SimpleString customNotificationQueue;
+   private SimpleString frameworkNotificationsQueue;
+   private SimpleString bridgeNotificationsQueue;
+   private SimpleString notificationsQueue;
+
+   private String getServer0URL() {
+      return "tcp://localhost:61616";
+   }
+
+   private String getServer1URL() {
+      return "tcp://localhost:61617";
+   }
+
+   @Override
+   public URI getBrokerAmqpConnectionURI() {
+      try {
+         return new URI(getServer0URL());
+      } catch (URISyntaxException e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server0 = createServer(false, createBasicConfig());
+      server1 = createServer(false, createBasicConfig());
+
+      server0.getConfiguration().addAcceptorConfiguration("acceptor", 
getServer0URL());
+      
server0.getConfiguration().addConnectorConfiguration("notification-broker", 
getServer1URL());
+      server1.getConfiguration().addAcceptorConfiguration("acceptor", 
getServer1URL());
+
+      DivertConfiguration customNotificationsDivert = new 
DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true).setTransformerConfiguration(new
 
TransformerConfiguration(DivertApplicationPropertiesTransformer.class.getCanonicalName()));
+      DivertConfiguration frameworkNotificationsDivert = new 
DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+
+      
server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
+      
server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivert);
+
+      customNotificationQueue = 
SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
+      frameworkNotificationsQueue = 
SimpleString.toSimpleString("FrameworkNotifications");
+      bridgeNotificationsQueue = 
SimpleString.toSimpleString("BridgeNotifications");
+      notificationsQueue = SimpleString.toSimpleString("Notifications");
+
+      server0.start();
+      server1.start();
+
+      server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, 
customNotificationQueue, null, true, false);
+      server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, 
frameworkNotificationsQueue, null, true, false);
+      server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, 
bridgeNotificationsQueue, null, true, false);
+      server1.createQueue(notificationsQueue, RoutingType.MULTICAST, 
notificationsQueue, null, true, false);
+
+      server0.deployBridge(new 
BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")).setTransformerConfiguration(new
 
TransformerConfiguration(BridgeApplicationPropertiesTransformer.class.getCanonicalName())));
+   }
+
+   @Test
+   public void testApplicationPropertiesFromTransformerForwardBridge() throws 
Exception {
+      Map<String, Object> applicationProperties = new HashMap<>();
+      applicationProperties.put(DivertApplicationPropertiesTransformer.TRX_ID, 
"100");
+
+      
sendMessages("uswest.Provider.AMC.Agent.f261d0fa-51bd-44bd-abe0-ce22d2a387cd.CustomNotification",
 1, RoutingType.ANYCAST, true);
+
+      try (ServerLocator locator = 
ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory 
sessionFactory = locator.createSessionFactory(); ClientSession session = 
sessionFactory.createSession(); ClientConsumer consumer = 
session.createConsumer(notificationsQueue)) {
+
+         session.start();
+
+         ClientMessage message = consumer.receive(5000);
+         assertNotNull(message);
+
+         assertEquals("1", message.getStringProperty("A"));
+         assertEquals("2", message.getStringProperty("B"));
+         assertEquals("3", message.getStringProperty("C"));
+         assertEquals("4", message.getStringProperty("D"));
+      }
+   }
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 5d8c681..ca34c23 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -16,15 +16,15 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -316,7 +316,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport 
{
       sendMessages(destinationName, count, routingType, false);
    }
 
-   protected void sendMessages(String destinationName, int count, RoutingType 
routingType, boolean durable) throws Exception {
+   protected void sendMessages(String destinationName,
+                               int count,
+                               RoutingType routingType,
+                               boolean durable) throws Exception {
+      sendMessages(destinationName, count, routingType, durable, 
Collections.emptyMap());
+   }
+
+   protected void sendMessages(String destinationName,
+                               int count,
+                               RoutingType routingType,
+                               boolean durable,
+                               Map<String, Object> applicationProperties) 
throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       try {
@@ -325,6 +336,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
 
          for (int i = 0; i < count; ++i) {
             AmqpMessage message = new AmqpMessage();
+            for (Map.Entry<String, Object> entry : 
applicationProperties.entrySet()) {
+               message.setApplicationProperty(entry.getKey(), 
entry.getValue());
+            }
             message.setMessageId("MessageID:" + i);
             message.setDurable(durable);
             if (routingType != null) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
new file mode 100644
index 0000000..9efb2af
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
+
+public class BridgeApplicationPropertiesTransformer implements Transformer {
+
+   @Override
+   public Message transform(final Message message) {
+
+      message.putStringProperty("C", "3");
+      message.putStringProperty("D", "4");
+
+      return message;
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
new file mode 100644
index 0000000..6620dc4
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
+
+public class DivertApplicationPropertiesTransformer implements Transformer {
+
+   public static final String TRX_ID = "trxId";
+
+   @Override
+   public Message transform(final Message message) {
+
+      message.putStringProperty("A", "1");
+      message.putStringProperty("B", "2");
+
+      return message;
+   }
+}

Reply via email to