Repository: activemq-artemis Updated Branches: refs/heads/master 25e9fd78d -> db5a9597a
[ARTEMIS-1209] JMS OpenWire client cannot read notifications from activemq.notifications topic Issue: https://issues.apache.org/jira/browse/ARTEMIS-1209 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/45321c65 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/45321c65 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/45321c65 Branch: refs/heads/master Commit: 45321c65bd5e95bf92b1c29c072334f11ed4ca00 Parents: 25e9fd7 Author: Ingo Weiss <[email protected]> Authored: Tue Jun 6 16:46:58 2017 +0100 Committer: Clebert Suconic <[email protected]> Committed: Wed Jun 7 16:26:33 2017 -0400 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 5 ++++- .../openwire/SimpleOpenWireTest.java | 22 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/45321c65/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index dd7879c..508bac9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -94,6 +94,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE"; private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED"; + private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications"; + private final WireFormat marshaller; public OpenWireMessageConverter(WireFormat marshaller) { @@ -774,7 +776,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag if (props != null) { for (SimpleString s : props) { String keyStr = s.toString(); - if (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) { + if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) && + !(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) { continue; } Object prop = coreMessage.getObjectProperty(s); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/45321c65/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 6eb45a8..cb4bd11 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -38,6 +38,7 @@ import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; +import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; @@ -1467,6 +1468,27 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testNotificationProperties() throws Exception { + try (TopicConnection topicConnection = factory.createTopicConnection()) { + TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic notificationsTopic = topicSession.createTopic("activemq.notifications"); + TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic); + List<Message> receivedMessages = new ArrayList<>(); + subscriber.setMessageListener(receivedMessages::add); + topicConnection.start(); + + while (receivedMessages.size() == 0) { + Thread.sleep(1000); + } + + for (Message message : receivedMessages) { + assertNotNull(message); + assertNotNull(message.getStringProperty("_AMQ_NotifType")); + } + } + } + private void checkQueueEmpty(String qName) { PostOffice po = server.getPostOffice(); LocalQueueBinding binding = (LocalQueueBinding) po.getBinding(SimpleString.toSimpleString(qName));
