Author: chetanm
Date: Wed Sep 21 13:28:18 2016
New Revision: 1761729

URL: http://svn.apache.org/viewvc?rev=1761729&view=rev
Log:
SLING-5647 - Provide ActiveMQ implementation of the MoM API in SLING-5646

Map passed in add call may be immutable so add internal state on a copy and 
then ensure that such internal props are not passed to reader.

Also fix the case around casting of numtries

Modified:
    
sling/trunk/contrib/commons/mom/jms/src/main/java/org/apache/sling/jms/JMSQueueManager.java
    
sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java

Modified: 
sling/trunk/contrib/commons/mom/jms/src/main/java/org/apache/sling/jms/JMSQueueManager.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/main/java/org/apache/sling/jms/JMSQueueManager.java?rev=1761729&r1=1761728&r2=1761729&view=diff
==============================================================================
--- 
sling/trunk/contrib/commons/mom/jms/src/main/java/org/apache/sling/jms/JMSQueueManager.java
 (original)
+++ 
sling/trunk/contrib/commons/mom/jms/src/main/java/org/apache/sling/jms/JMSQueueManager.java
 Wed Sep 21 13:28:18 2016
@@ -30,6 +30,7 @@ import java.io.Closeable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -46,6 +47,7 @@ public class JMSQueueManager implements
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSQueueManager.class);
     private static final String NRETRIES = "_nr";
+    private static final Set<String> INTERNAL_PROPS = 
Collections.singleton(NRETRIES);
 
     @Reference
     private ConnectionFactoryService connectionFactoryService;
@@ -92,8 +94,10 @@ public class JMSQueueManager implements
         Session session = null;
         try {
             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-            message.put(NRETRIES, 0L); // set the number of retries to 0.
-            TextMessage textMessage = 
session.createTextMessage(Json.toJson(message));
+            //TODO Instead of copy do addition at JSON writer level
+            Map<String, Object> msgCopy = new HashMap<>(message);
+            msgCopy.put(NRETRIES, 0L); // set the number of retries to 0.
+            TextMessage textMessage = 
session.createTextMessage(Json.toJson(msgCopy));
             textMessage.setJMSType(JMSMessageTypes.JSON.toString());
             LOGGER.info("Sending to {} message {} ", name, textMessage);
             
session.createProducer(session.createQueue(name.toString())).send(textMessage);
@@ -189,6 +193,13 @@ public class JMSQueueManager implements
         }
     }
 
+    private static Map<String,Object> filter(Map<String, Object> map) {
+        //Filter out internal properties
+        for (String internalKey : INTERNAL_PROPS){
+            map.remove(internalKey);
+        }
+        return map;
+    }
 
     public static class JMSQueueSession implements Closeable, MessageListener {
         private static final Logger LOGGER = 
LoggerFactory.getLogger(JMSQueueSession.class);
@@ -248,7 +259,7 @@ public class JMSQueueManager implements
                             final Map<String, Object> mapMessage = 
Json.toMap(textMessage.getText());
                             Types.QueueName queueName = 
Types.queueName(queue.getQueueName());
                             if (queueName.equals(name) && 
messageFilter.accept(queueName, mapMessage)) {
-                                queueReader.onMessage(queueName, mapMessage);
+                                queueReader.onMessage(queueName, 
filter(mapMessage));
                                 session.commit();
                                 // all ok.
                                 committed = true;
@@ -260,7 +271,7 @@ public class JMSQueueManager implements
                     LOGGER.info("QueueReader requested requeue of message ", 
e);
                     if (retryByRequeue && textMessage != null) {
                         Map<String, Object> mapMessage = 
Json.toMap(textMessage.getText());
-                        if ((int)mapMessage.get(NRETRIES) < maxRetries) {
+                        if ((long)mapMessage.get(NRETRIES) < maxRetries) {
                             mapMessage.put(NRETRIES, ((long) 
mapMessage.get(NRETRIES)) + 1);
                             TextMessage retryMessage = 
session.createTextMessage(Json.toJson(mapMessage));
                             
retryMessage.setJMSType(JMSMessageTypes.JSON.toString());

Modified: 
sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java?rev=1761729&r1=1761728&r2=1761729&view=diff
==============================================================================
--- 
sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
 (original)
+++ 
sling/trunk/contrib/commons/mom/jms/src/test/java/org/apache/sling/jms/JMSQueueManagerTest.java
 Wed Sep 21 13:28:18 2016
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
@@ -204,7 +205,7 @@ public class JMSQueueManagerTest {
         // make the test map unique, if the dequeue fails, then the message 
wont be the first.
         testMap.put("testing", queueName + System.currentTimeMillis());
         LOGGER.info("Sending message to queue");
-        jmsQueueManager.add(Types.queueName(queueName), testMap);
+        jmsQueueManager.add(Types.queueName(queueName), 
Collections.unmodifiableMap(testMap));
         LOGGER.info("Sent message to queue ... receiving from queue");
 
         checkMessagesInQueue(queueName, 1);


Reply via email to