Repository: activemq Updated Branches: refs/heads/trunk a2c5c22ec -> d5470254a
https://issues.apache.org/jira/browse/AMQ-5530 - default mqtt subscription prefetch Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d5470254 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d5470254 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d5470254 Branch: refs/heads/trunk Commit: d5470254afc9f83ed5718fd710f871936fd4992e Parents: a2c5c22 Author: Dejan Bosanac <[email protected]> Authored: Mon Jan 26 12:53:29 2015 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Mon Jan 26 12:53:56 2015 +0100 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 2 +- .../strategy/MQTTDefaultSubscriptionStrategy.java | 8 +++++++- .../MQTTVirtualTopicSubscriptionStrategy.java | 14 +++++++++++--- .../org/apache/activemq/transport/mqtt/MQTTTest.java | 1 + 4 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 5f34f17..4e0b0df 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -115,7 +115,7 @@ public class MQTTProtocolConverter { private CONNECT connect; private String clientId; private long defaultKeepAlive; - private int activeMQSubscriptionPrefetch = 1; + private int activeMQSubscriptionPrefetch = -1; private final MQTTPacketIdGenerator packetIdGenerator; private boolean publishDollarTopics; http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java index 14530bd..61619d2 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerInfo; @@ -70,12 +71,17 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(destination); - consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); // create durable subscriptions only when clean session is false if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); + consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH); + } + + if (protocol.getActiveMQSubscriptionPrefetch() > 0) { + consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); } return doSubscribe(consumerInfo, topicName, requestedQoS); http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java index 835a5f8..99917c7 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.StringTokenizer; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -85,21 +86,25 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti @Override public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { ActiveMQDestination destination = null; + ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." + VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); destination = new ActiveMQQueue(converted); + consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); } else { String converted = convertMQTTToActiveMQ(topicName); if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); } destination = new ActiveMQTopic(converted); + consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); } - ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(destination); - consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + if (protocol.getActiveMQSubscriptionPrefetch() > 0) { + consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + } consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); @@ -211,7 +216,10 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(queue); - consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); + if (protocol.getActiveMQSubscriptionPrefetch() > 0) { + consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); + } consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); http://git-wip-us.apache.org/repos/asf/activemq/blob/d5470254/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3bb8758..5f5af92 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -151,6 +151,7 @@ public class MQTTTest extends MQTTTestSupport { for (int i = 0; i < NUM_MESSAGES; i++) { String payload = "Message " + i; if (i == NUM_MESSAGES / 2) { + latch.await(20, TimeUnit.SECONDS); subscriptionProvider.unsubscribe(topic); } publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
