This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new bd1162d ARTEMIS-2316 AMQPMessage missing ApplicationProperties from
custom transformer
new 7c6e5e6 This closes #2634
bd1162d is described below
commit bd1162d9b8b3563437dd4fc2dafa0fd4e56cd8df
Author: Luis De Bello <[email protected]>
AuthorDate: Tue Apr 23 16:34:13 2019 -0300
ARTEMIS-2316 AMQPMessage missing ApplicationProperties from custom
transformer
---
.../artemis/protocol/amqp/broker/AMQPMessage.java | 16 ++-
.../amqp/AmqpBridgeApplicationProperties.java | 121 +++++++++++++++++++++
.../integration/amqp/AmqpClientTestSupport.java | 22 +++-
.../BridgeApplicationPropertiesTransformer.java | 32 ++++++
.../DivertApplicationPropertiesTransformer.java | 34 ++++++
5 files changed, 220 insertions(+), 5 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index df35115..a1c5830 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1223,7 +1223,9 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object removeProperty(String key) {
- return getApplicationPropertiesMap(false).remove(key);
+ Object removed = getApplicationPropertiesMap(false).remove(key);
+ messageChanged();
+ return removed;
}
@Override
@@ -1395,60 +1397,70 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message
putBooleanProperty(String key, boolean value) {
getApplicationPropertiesMap(true).put(key, Boolean.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putByteProperty(String
key, byte value) {
getApplicationPropertiesMap(true).put(key, Byte.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putBytesProperty(String
key, byte[] value) {
getApplicationPropertiesMap(true).put(key, value);
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putShortProperty(String
key, short value) {
getApplicationPropertiesMap(true).put(key, Short.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putCharProperty(String
key, char value) {
getApplicationPropertiesMap(true).put(key, Character.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putIntProperty(String
key, int value) {
getApplicationPropertiesMap(true).put(key, Integer.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putLongProperty(String
key, long value) {
getApplicationPropertiesMap(true).put(key, Long.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message putFloatProperty(String
key, float value) {
getApplicationPropertiesMap(true).put(key, Float.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message
putDoubleProperty(String key, double value) {
getApplicationPropertiesMap(true).put(key, Double.valueOf(value));
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message
putBooleanProperty(SimpleString key, boolean value) {
getApplicationPropertiesMap(true).put(key.toString(),
Boolean.valueOf(value));
+ messageChanged();
return this;
}
@@ -1495,12 +1507,14 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message
putStringProperty(String key, String value) {
getApplicationPropertiesMap(true).put(key, value);
+ messageChanged();
return this;
}
@Override
public org.apache.activemq.artemis.api.core.Message
putObjectProperty(String key, Object value) throws
ActiveMQPropertyConversionException {
getApplicationPropertiesMap(true).put(key, value);
+ messageChanged();
return this;
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
new file mode 100644
index 0000000..886d65e
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeApplicationProperties.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AmqpBridgeApplicationProperties extends AmqpClientTestSupport {
+
+ private ActiveMQServer server0;
+ private ActiveMQServer server1;
+
+ private SimpleString customNotificationQueue;
+ private SimpleString frameworkNotificationsQueue;
+ private SimpleString bridgeNotificationsQueue;
+ private SimpleString notificationsQueue;
+
+ private String getServer0URL() {
+ return "tcp://localhost:61616";
+ }
+
+ private String getServer1URL() {
+ return "tcp://localhost:61617";
+ }
+
+ @Override
+ public URI getBrokerAmqpConnectionURI() {
+ try {
+ return new URI(getServer0URL());
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ server0 = createServer(false, createBasicConfig());
+ server1 = createServer(false, createBasicConfig());
+
+ server0.getConfiguration().addAcceptorConfiguration("acceptor",
getServer0URL());
+
server0.getConfiguration().addConnectorConfiguration("notification-broker",
getServer1URL());
+ server1.getConfiguration().addAcceptorConfiguration("acceptor",
getServer1URL());
+
+ DivertConfiguration customNotificationsDivert = new
DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true).setTransformerConfiguration(new
TransformerConfiguration(DivertApplicationPropertiesTransformer.class.getCanonicalName()));
+ DivertConfiguration frameworkNotificationsDivert = new
DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
+
+
server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);
+
server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivert);
+
+ customNotificationQueue =
SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
+ frameworkNotificationsQueue =
SimpleString.toSimpleString("FrameworkNotifications");
+ bridgeNotificationsQueue =
SimpleString.toSimpleString("BridgeNotifications");
+ notificationsQueue = SimpleString.toSimpleString("Notifications");
+
+ server0.start();
+ server1.start();
+
+ server0.createQueue(customNotificationQueue, RoutingType.ANYCAST,
customNotificationQueue, null, true, false);
+ server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST,
frameworkNotificationsQueue, null, true, false);
+ server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST,
bridgeNotificationsQueue, null, true, false);
+ server1.createQueue(notificationsQueue, RoutingType.MULTICAST,
notificationsQueue, null, true, false);
+
+ server0.deployBridge(new
BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")).setTransformerConfiguration(new
TransformerConfiguration(BridgeApplicationPropertiesTransformer.class.getCanonicalName())));
+ }
+
+ @Test
+ public void testApplicationPropertiesFromTransformerForwardBridge() throws
Exception {
+ Map<String, Object> applicationProperties = new HashMap<>();
+ applicationProperties.put(DivertApplicationPropertiesTransformer.TRX_ID,
"100");
+
+
sendMessages("uswest.Provider.AMC.Agent.f261d0fa-51bd-44bd-abe0-ce22d2a387cd.CustomNotification",
1, RoutingType.ANYCAST, true);
+
+ try (ServerLocator locator =
ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory
sessionFactory = locator.createSessionFactory(); ClientSession session =
sessionFactory.createSession(); ClientConsumer consumer =
session.createConsumer(notificationsQueue)) {
+
+ session.start();
+
+ ClientMessage message = consumer.receive(5000);
+ assertNotNull(message);
+
+ assertEquals("1", message.getStringProperty("A"));
+ assertEquals("2", message.getStringProperty("B"));
+ assertEquals("3", message.getStringProperty("C"));
+ assertEquals("4", message.getStringProperty("D"));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 5d8c681..ca34c23 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -16,15 +16,15 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -316,7 +316,18 @@ public class AmqpClientTestSupport extends AmqpTestSupport
{
sendMessages(destinationName, count, routingType, false);
}
- protected void sendMessages(String destinationName, int count, RoutingType
routingType, boolean durable) throws Exception {
+ protected void sendMessages(String destinationName,
+ int count,
+ RoutingType routingType,
+ boolean durable) throws Exception {
+ sendMessages(destinationName, count, routingType, durable,
Collections.emptyMap());
+ }
+
+ protected void sendMessages(String destinationName,
+ int count,
+ RoutingType routingType,
+ boolean durable,
+ Map<String, Object> applicationProperties)
throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
@@ -325,6 +336,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
for (int i = 0; i < count; ++i) {
AmqpMessage message = new AmqpMessage();
+ for (Map.Entry<String, Object> entry :
applicationProperties.entrySet()) {
+ message.setApplicationProperty(entry.getKey(),
entry.getValue());
+ }
message.setMessageId("MessageID:" + i);
message.setDurable(durable);
if (routingType != null) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
new file mode 100644
index 0000000..9efb2af
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/BridgeApplicationPropertiesTransformer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
+
+public class BridgeApplicationPropertiesTransformer implements Transformer {
+
+ @Override
+ public Message transform(final Message message) {
+
+ message.putStringProperty("C", "3");
+ message.putStringProperty("D", "4");
+
+ return message;
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
new file mode 100644
index 0000000..6620dc4
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/DivertApplicationPropertiesTransformer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.transformer.Transformer;
+
+public class DivertApplicationPropertiesTransformer implements Transformer {
+
+ public static final String TRX_ID = "trxId";
+
+ @Override
+ public Message transform(final Message message) {
+
+ message.putStringProperty("A", "1");
+ message.putStringProperty("B", "2");
+
+ return message;
+ }
+}