Repository: activemq
Updated Branches:
  refs/heads/trunk a2c5c22ec -> d5470254a


https://issues.apache.org/jira/browse/AMQ-5530 - default mqtt subscription 
prefetch


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d5470254
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d5470254
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d5470254

Branch: refs/heads/trunk
Commit: d5470254afc9f83ed5718fd710f871936fd4992e
Parents: a2c5c22
Author: Dejan Bosanac <[email protected]>
Authored: Mon Jan 26 12:53:29 2015 +0100
Committer: Dejan Bosanac <[email protected]>
Committed: Mon Jan 26 12:53:56 2015 +0100

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java         |  2 +-
 .../strategy/MQTTDefaultSubscriptionStrategy.java     |  8 +++++++-
 .../MQTTVirtualTopicSubscriptionStrategy.java         | 14 +++++++++++---
 .../org/apache/activemq/transport/mqtt/MQTTTest.java  |  1 +
 4 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 5f34f17..4e0b0df 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -115,7 +115,7 @@ public class MQTTProtocolConverter {
     private CONNECT connect;
     private String clientId;
     private long defaultKeepAlive;
-    private int activeMQSubscriptionPrefetch = 1;
+    private int activeMQSubscriptionPrefetch = -1;
     private final MQTTPacketIdGenerator packetIdGenerator;
     private boolean publishDollarTopics;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
index 14530bd..61619d2 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerInfo;
@@ -70,12 +71,17 @@ public class MQTTDefaultSubscriptionStrategy extends 
AbstractMQTTSubscriptionStr
 
         ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
         consumerInfo.setDestination(destination);
-        
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
         consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
         // create durable subscriptions only when clean session is false
         if (!protocol.isCleanSession() && protocol.getClientId() != null && 
requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
             consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
+            
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH);
+        }
+
+        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
+            
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
         }
 
         return doSubscribe(consumerInfo, topicName, requestedQoS);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 835a5f8..99917c7 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.StringTokenizer;
 
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -85,21 +86,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends 
AbstractMQTTSubscripti
     @Override
     public byte onSubscribe(String topicName, QoS requestedQoS) throws 
MQTTProtocolException {
         ActiveMQDestination destination = null;
+        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
         if (!protocol.isCleanSession() && protocol.getClientId() != null && 
requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
             String converted = VIRTUALTOPIC_CONSUMER_PREFIX + 
protocol.getClientId() + ":" + requestedQoS + "." +
                                VIRTUALTOPIC_PREFIX + 
convertMQTTToActiveMQ(topicName);
             destination = new ActiveMQQueue(converted);
+            
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
         } else {
             String converted = convertMQTTToActiveMQ(topicName);
             if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
                 converted = VIRTUALTOPIC_PREFIX + 
convertMQTTToActiveMQ(topicName);
             }
             destination = new ActiveMQTopic(converted);
+            
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
         }
 
-        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
         consumerInfo.setDestination(destination);
-        
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
+            
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+        }
         consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
 
@@ -211,7 +216,10 @@ public class MQTTVirtualTopicSubscriptionStrategy extends 
AbstractMQTTSubscripti
 
                 ConsumerInfo consumerInfo = new 
ConsumerInfo(getNextConsumerId());
                 consumerInfo.setDestination(queue);
-                
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+                
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
+                if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
+                    
consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
+                }
                 consumerInfo.setRetroactive(true);
                 consumerInfo.setDispatchAsync(true);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3bb8758..5f5af92 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -151,6 +151,7 @@ public class MQTTTest extends MQTTTestSupport {
         for (int i = 0; i < NUM_MESSAGES; i++) {
             String payload = "Message " + i;
             if (i == NUM_MESSAGES / 2) {
+                latch.await(20, TimeUnit.SECONDS);
                 subscriptionProvider.unsubscribe(topic);
             }
             publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);

Reply via email to