This is an automated email from the ASF dual-hosted git repository. rombert pushed a commit to annotated tag org.apache.sling.jms-1.0.0 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-jms.git
commit 47f7bfd5cf7b25920460d94039b695de3073f969 Author: Chetan Mehrotra <[email protected]> AuthorDate: Wed Sep 21 13:28:18 2016 +0000 SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646 Map passed in add call may be immutable so add internal state on a copy and then ensure that such internal props are not passed to reader. Also fix the case around casting of numtries git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/contrib/commons/mom/jms@1761729 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/sling/jms/JMSQueueManager.java | 19 +++++++++++++++---- .../org/apache/sling/jms/JMSQueueManagerTest.java | 3 ++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/sling/jms/JMSQueueManager.java b/src/main/java/org/apache/sling/jms/JMSQueueManager.java index 7c7b83b..4fcc904 100644 --- a/src/main/java/org/apache/sling/jms/JMSQueueManager.java +++ b/src/main/java/org/apache/sling/jms/JMSQueueManager.java @@ -30,6 +30,7 @@ import java.io.Closeable; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -46,6 +47,7 @@ public class JMSQueueManager implements QueueManager { private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueManager.class); private static final String NRETRIES = "_nr"; + private static final Set<String> INTERNAL_PROPS = Collections.singleton(NRETRIES); @Reference private ConnectionFactoryService connectionFactoryService; @@ -92,8 +94,10 @@ public class JMSQueueManager implements QueueManager { Session session = null; try { session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - message.put(NRETRIES, 0L); // set the number of retries to 0. - TextMessage textMessage = session.createTextMessage(Json.toJson(message)); + //TODO Instead of copy do addition at JSON writer level + Map<String, Object> msgCopy = new HashMap<>(message); + msgCopy.put(NRETRIES, 0L); // set the number of retries to 0. + TextMessage textMessage = session.createTextMessage(Json.toJson(msgCopy)); textMessage.setJMSType(JMSMessageTypes.JSON.toString()); LOGGER.info("Sending to {} message {} ", name, textMessage); session.createProducer(session.createQueue(name.toString())).send(textMessage); @@ -189,6 +193,13 @@ public class JMSQueueManager implements QueueManager { } } + private static Map<String,Object> filter(Map<String, Object> map) { + //Filter out internal properties + for (String internalKey : INTERNAL_PROPS){ + map.remove(internalKey); + } + return map; + } public static class JMSQueueSession implements Closeable, MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(JMSQueueSession.class); @@ -248,7 +259,7 @@ public class JMSQueueManager implements QueueManager { final Map<String, Object> mapMessage = Json.toMap(textMessage.getText()); Types.QueueName queueName = Types.queueName(queue.getQueueName()); if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) { - queueReader.onMessage(queueName, mapMessage); + queueReader.onMessage(queueName, filter(mapMessage)); session.commit(); // all ok. committed = true; @@ -260,7 +271,7 @@ public class JMSQueueManager implements QueueManager { LOGGER.info("QueueReader requested requeue of message ", e); if (retryByRequeue && textMessage != null) { Map<String, Object> mapMessage = Json.toMap(textMessage.getText()); - if ((int)mapMessage.get(NRETRIES) < maxRetries) { + if ((long)mapMessage.get(NRETRIES) < maxRetries) { mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1); TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage)); retryMessage.setJMSType(JMSMessageTypes.JSON.toString()); diff --git a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java index ddf20a7..4c3aa36 100644 --- a/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java +++ b/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import javax.jms.*; import java.lang.reflect.Field; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; @@ -204,7 +205,7 @@ public class JMSQueueManagerTest { // make the test map unique, if the dequeue fails, then the message wont be the first. testMap.put("testing", queueName + System.currentTimeMillis()); LOGGER.info("Sending message to queue"); - jmsQueueManager.add(Types.queueName(queueName), testMap); + jmsQueueManager.add(Types.queueName(queueName), Collections.unmodifiableMap(testMap)); LOGGER.info("Sent message to queue ... receiving from queue"); checkMessagesInQueue(queueName, 1); -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
