Updated Branches: refs/heads/trunk fe36820b8 -> 6683eb652
Fix for https://issues.apache.org/jira/browse/AMQ-4927 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6683eb65 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6683eb65 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6683eb65 Branch: refs/heads/trunk Commit: 6683eb652f0f4efe6130bb72410ae36e93b6868a Parents: fe36820 Author: rajdavies <[email protected]> Authored: Tue Dec 10 13:35:39 2013 +0000 Committer: rajdavies <[email protected]> Committed: Tue Dec 10 13:36:09 2013 +0000 ---------------------------------------------------------------------- .../activemq/broker/jmx/ProducerView.java | 4 +- .../activemq/broker/jmx/ProducerViewMBean.java | 2 +- .../activemq/broker/jmx/SubscriptionView.java | 4 +- .../activemq/broker/region/AbstractRegion.java | 3 +- .../broker/region/AbstractSubscription.java | 19 +++-- .../activemq/broker/region/Subscription.java | 14 ++-- .../apache/activemq/command/ProducerInfo.java | 17 ++++- .../transport/mqtt/MQTTProtocolConverter.java | 41 +++++----- .../transport/mqtt/MQTTRetainedMessages.java | 80 ++++++++++++++++++++ .../transport/mqtt/FuseMQQTTClientProvider.java | 7 +- .../transport/mqtt/MQTTClientProvider.java | 1 + .../activemq/transport/mqtt/MQTTTest.java | 51 ++++++++++++- .../region/QueueDuplicatesFromStoreTest.java | 12 ++- 13 files changed, 208 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java index 6905c72..e211b75 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java @@ -188,12 +188,12 @@ public class ProducerView implements ProducerViewMBean { @Override public void resetStatistics() { if (info != null){ - info.getSentCount().reset(); + info.resetSentCount(); } } @Override public long getSentCount() { - return info != null ? info.getSentCount().getCount() :0; + return info != null ? info.getSentCount() :0; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java index 14c2073..4776283 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ProducerViewMBean.java @@ -102,7 +102,7 @@ public interface ProducerViewMBean { @MBeanInfo("Resets statistics.") void resetStatistics(); - @MBeanInfo("Messages consumed") + @MBeanInfo("Messages dispatched by Producer") long getSentCount(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java index 443a266..deefdb4 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/SubscriptionView.java @@ -421,12 +421,12 @@ public class SubscriptionView implements SubscriptionViewMBean { @Override public void resetStatistics() { if (subscription != null){ - subscription.getConsumedCount().reset(); + subscription.resetConsumedCount(); } } @Override public long getConsumedCount() { - return subscription != null ? subscription.getConsumedCount().getCount() : 0; + return subscription != null ? subscription.getConsumedCount() : 0; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 16deed4..efa02cb 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -392,8 +392,9 @@ public abstract class AbstractRegion implements Region { } producerExchange.getRegionDestination().send(producerExchange, messageSend); + if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){ - producerExchange.getProducerState().getInfo().getSentCount().increment(); + producerExchange.getProducerState().getInfo().incrementSentCount(); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java index b2ff01c..3a2e2ee 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java @@ -20,11 +20,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.management.ObjectName; - import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; @@ -36,7 +36,6 @@ import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.LogicExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NoLocalExpression; -import org.apache.activemq.management.CountStatisticImpl; import org.apache.activemq.selector.SelectorParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription { private int cursorMemoryHighWaterMark = 70; private boolean slowConsumer; private long lastAckTime; - private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed"); + private AtomicLong consumedCount = new AtomicLong(); public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { this.broker = broker; @@ -90,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription { @Override public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception { this.lastAckTime = System.currentTimeMillis(); - this.consumedCount.increment(); + this.consumedCount.incrementAndGet(); } @Override @@ -280,7 +279,15 @@ public abstract class AbstractSubscription implements Subscription { this.lastAckTime = value; } - public CountStatisticImpl getConsumedCount(){ - return consumedCount; + public long getConsumedCount(){ + return consumedCount.get(); + } + + public void incrementConsumedCount(){ + consumedCount.incrementAndGet(); + } + + public void resetConsumedCount(){ + consumedCount.set(0); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java index b79b37e..a2c4502 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java @@ -21,7 +21,6 @@ import java.util.List; import javax.jms.InvalidSelectorException; import javax.management.ObjectName; - import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; @@ -30,7 +29,6 @@ import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; import org.apache.activemq.filter.MessageEvaluationContext; -import org.apache.activemq.management.CountStatisticImpl; /** * @@ -48,7 +46,6 @@ public interface Subscription extends SubscriptionRecovery { /** * Used when client acknowledge receipt of dispatched message. - * @param node * @throws IOException * @throws Exception */ @@ -70,7 +67,7 @@ public interface Subscription extends SubscriptionRecovery { /** * Is the subscription interested in messages in the destination? - * @param context + * @param destination * @return */ boolean matches(ActiveMQDestination destination); @@ -93,7 +90,6 @@ public interface Subscription extends SubscriptionRecovery { /** * The ConsumerInfo object that created the subscription. - * @param destination */ ConsumerInfo getConsumerInfo(); @@ -200,7 +196,7 @@ public interface Subscription extends SubscriptionRecovery { /** * Informs the Broker if the subscription needs to intervention to recover it's state * e.g. DurableTopicSubscriber may do - * @see org.apache.activemq.region.cursors.PendingMessageCursor + * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor * @return true if recovery required */ boolean isRecoveryRequired(); @@ -235,6 +231,10 @@ public interface Subscription extends SubscriptionRecovery { */ long getTimeOfLastMessageAck(); - CountStatisticImpl getConsumedCount(); + long getConsumedCount(); + + void incrementConsumedCount(); + + void resetConsumedCount(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java index 05ef3a4..7189347 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ProducerInfo.java @@ -16,7 +16,8 @@ */ package org.apache.activemq.command; -import org.apache.activemq.management.CountStatisticImpl; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.activemq.state.CommandVisitor; /** @@ -33,7 +34,7 @@ public class ProducerInfo extends BaseCommand { protected BrokerId[] brokerPath; protected boolean dispatchAsync; protected int windowSize; - protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker"); + protected AtomicLong sentCount = new AtomicLong(); public ProducerInfo() { } @@ -137,8 +138,16 @@ public class ProducerInfo extends BaseCommand { this.windowSize = windowSize; } - public CountStatisticImpl getSentCount(){ - return sentCount; + public long getSentCount(){ + return sentCount.get(); + } + + public void incrementSentCount(){ + sentCount.incrementAndGet(); + } + + public void resetSentCount(){ + sentCount.set(0); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index d4c05eb..34f53a4 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -17,7 +17,6 @@ package org.apache.activemq.transport.mqtt; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,8 +27,6 @@ import java.util.zip.Inflater; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; - -import org.apache.activemq.broker.BrokerContext; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.*; import org.apache.activemq.store.PersistenceAdapterSupport; @@ -44,21 +41,7 @@ import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; -import org.fusesource.mqtt.codec.CONNACK; -import org.fusesource.mqtt.codec.CONNECT; -import org.fusesource.mqtt.codec.DISCONNECT; -import org.fusesource.mqtt.codec.MQTTFrame; -import org.fusesource.mqtt.codec.PINGREQ; -import org.fusesource.mqtt.codec.PINGRESP; -import org.fusesource.mqtt.codec.PUBACK; -import org.fusesource.mqtt.codec.PUBCOMP; -import org.fusesource.mqtt.codec.PUBLISH; -import org.fusesource.mqtt.codec.PUBREC; -import org.fusesource.mqtt.codec.PUBREL; -import org.fusesource.mqtt.codec.SUBACK; -import org.fusesource.mqtt.codec.SUBSCRIBE; -import org.fusesource.mqtt.codec.UNSUBACK; -import org.fusesource.mqtt.codec.UNSUBSCRIBE; +import org.fusesource.mqtt.codec.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,10 +79,12 @@ public class MQTTProtocolConverter { private long defaultKeepAlive; private int activeMQSubscriptionPrefetch=1; private final String QOS_PROPERTY_NAME = "QoSPropertyName"; + private final MQTTRetainedMessages retainedMessages; public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { this.mqttTransport = mqttTransport; this.brokerService = brokerService; + this.retainedMessages = MQTTRetainedMessages.getMQTTRetainedMessages(brokerService); this.defaultKeepAlive = 0; } @@ -319,6 +304,23 @@ public class MQTTProtocolConverter { } else { LOG.warn("No topics defined for Subscription " + command); } + //check retained messages + if (topics != null){ + for (Topic topic:topics){ + Buffer buffer = retainedMessages.getMessage(topic.name().toString()); + if (buffer != null){ + PUBLISH msg = new PUBLISH(); + msg.payload(buffer); + msg.topicName(topic.name()); + try { + getMQTTTransport().sendToMQTT(msg.encode()); + } catch (IOException e) { + LOG.warn("Couldn't send retained message " + msg, e); + } + } + } + } + } QoS onSubscribe(Topic topic) throws MQTTProtocolException { @@ -415,6 +417,9 @@ public class MQTTProtocolConverter { void onMQTTPublish(PUBLISH command) throws IOException, JMSException { checkConnected(); + if (command.retain()){ + retainedMessages.addMessage(command.topicName().toString(),command.payload()); + } ActiveMQMessage message = convertMessage(command); message.setProducerId(producerId); message.onSend(); http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java new file mode 100644 index 0000000..e502dce --- /dev/null +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.Service; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.LRUCache; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MQTTRetainedMessages extends ServiceSupport { + private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class); + private static final Object LOCK = new Object(); + private LRUCache<String,Buffer> cache = new LRUCache<String, Buffer>(10000); + + private MQTTRetainedMessages(){ + } + + @Override + protected void doStop(ServiceStopper stopper) throws Exception { + cache.clear(); + } + + @Override + protected void doStart() throws Exception { + } + + public void addMessage(String destination,Buffer payload){ + cache.put(destination,payload); + } + + public Buffer getMessage(String destination){ + return cache.get(destination); + } + + public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){ + MQTTRetainedMessages result = null; + if (broker != null){ + synchronized (LOCK){ + Service[] services = broker.getServices(); + if (services != null){ + for (Service service:services){ + if (service instanceof MQTTRetainedMessages){ + return (MQTTRetainedMessages) service; + } + } + } + result = new MQTTRetainedMessages(); + broker.addService(result); + if (broker != null && broker.isStarted()){ + try { + result.start(); + } catch (Exception e) { + LOG.warn("Couldn't start MQTTRetainedMessages"); + } + } + } + } + + + return result; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java index 257517b..d329066 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/FuseMQQTTClientProvider.java @@ -48,7 +48,12 @@ class FuseMQQTTClientProvider implements MQTTClientProvider { @Override public void publish(String topic, byte[] payload, int qos) throws Exception { - connection.publish(topic,payload, QoS.values()[qos],false); + publish(topic,payload,qos,false); + } + + @Override + public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception { + connection.publish(topic,payload, QoS.values()[qos],retained); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java index e5d411a..574a6d8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTClientProvider.java @@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt; public interface MQTTClientProvider { void connect(String host) throws Exception; void disconnect() throws Exception; + public void publish(String topic, byte[] payload, int qos, boolean retained) throws Exception; void publish(String topic,byte[] payload,int qos) throws Exception; void subscribe(String topic,int qos) throws Exception; void unsubscribe(String topic) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 994be67..bf4fac5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,9 +16,18 @@ */ package org.apache.activemq.transport.mqtt; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; @@ -35,9 +44,6 @@ import org.fusesource.mqtt.codec.MQTTFrame; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import javax.jms.*; - import static org.junit.Assert.assertArrayEquals; public class MQTTTest extends AbstractMQTTTest { @@ -281,6 +287,45 @@ public class MQTTTest extends AbstractMQTTTest { } @Test(timeout=60 * 1000) + public void testSendAndReceiveRetainedMessages() throws Exception { + + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + final MQTTClientProvider subscriber = getMQTTClientProvider(); + initializeConnection(subscriber); + + String RETAINED = "retained"; + publisher.publish("foo",RETAINED.getBytes(),AT_LEAST_ONCE,true); + + List<String> messages = new ArrayList<String>(); + for (int i = 0; i < 10; i++){ + messages.add("TEST MESSAGE:" + i); + } + + subscriber.subscribe("foo",AT_LEAST_ONCE); + + for (int i = 0; i < 10; i++) { + publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE); + } + byte[] msg = subscriber.receive(5000); + assertNotNull(msg); + assertEquals(RETAINED,new String(msg)); + + for (int i =0; i < 10; i++){ + msg = subscriber.receive(5000); + assertNotNull(msg); + assertEquals(messages.get(i),new String(msg)); + } + subscriber.disconnect(); + publisher.disconnect(); + } + + + @Test(timeout=60 * 1000) public void testSendMQTTReceiveJMS() throws Exception { addMQTTConnector(); TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); http://git-wip-us.apache.org/repos/asf/activemq/blob/6683eb65/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 9da839d..e9c6664 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -342,8 +342,16 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public CountStatisticImpl getConsumedCount() { - return null; + public long getConsumedCount() { + return 0; + } + + public void incrementConsumedCount(){ + + } + + public void resetConsumedCount(){ + } };
