ARTEMIS-607 Added interceptor support for MQTT protocol.

Also updated the maintainer's guide to clarify what is run in the PR builder.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7c746c71
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7c746c71
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7c746c71

Branch: refs/heads/master
Commit: 7c746c719e5018a0560fef27062ecd888dcc75d4
Parents: 05ac53a
Author: John D. Ament <[email protected]>
Authored: Sun Jul 3 22:30:13 2016 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Tue Jul 5 19:43:00 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTInterceptor.java     | 26 ++++++++
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  6 +-
 .../core/protocol/mqtt/MQTTProtocolManager.java | 35 ++++++++---
 .../mqtt/MQTTProtocolManagerFactory.java        | 10 +--
 .../protocol/stomp/StompProtocolManager.java    | 21 +------
 .../core/protocol/AbstractProtocolManager.java  | 46 ++++++++++++++
 docs/hacking-guide/en/maintainers.md            |  2 +-
 docs/user-manual/en/intercepting-operations.md  | 21 +++++--
 .../integration/mqtt/imported/MQTTTest.java     |  9 ++-
 .../mqtt/imported/MQTTTestSupport.java          | 64 +++++++++++++++++---
 10 files changed, 192 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java
new file mode 100644
index 0000000..ba22f25
--- /dev/null
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTInterceptor.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+
+public interface MQTTInterceptor extends BaseInterceptor<MqttMessage> {
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 306d146..17fc978 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -53,6 +53,7 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
    private MQTTSession session;
 
    private ActiveMQServer server;
+   private MQTTProtocolManager protocolManager;
 
    // This Channel Handler is not sharable, therefore it can only ever be 
associated with a single ctx.
    private ChannelHandlerContext ctx;
@@ -61,8 +62,9 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
 
    private boolean stopped = false;
 
-   public MQTTProtocolHandler(ActiveMQServer server) {
+   public MQTTProtocolHandler(ActiveMQServer server, MQTTProtocolManager 
protocolManager) {
       this.server = server;
+      this.protocolManager = protocolManager;
    }
 
    void setConnection(MQTTConnection connection, ConnectionEntry entry) throws 
Exception {
@@ -188,6 +190,7 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
    }
 
    void handlePublish(MqttPublishMessage message) throws Exception {
+      this.protocolManager.invokeIncoming(message, this.connection);
       
session.getMqttPublishManager().handleMessage(message.variableHeader().messageId(),
 message.variableHeader().topicName(), 
message.fixedHeader().qosLevel().value(), message.payload(), 
message.fixedHeader().isRetain());
    }
 
@@ -281,6 +284,7 @@ public class MQTTProtocolHandler extends 
ChannelInboundHandlerAdapter {
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, 
redelivery, MqttQoS.valueOf(qosLevel), false, 0);
       MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(topicName, messageId);
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
+      this.protocolManager.invokeOutgoing(publish, connection);
 
       ctx.write(publish);
       ctx.flush();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index d2c0793..1d38fcf 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -16,35 +16,42 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import java.util.List;
-
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.mqtt.MqttDecoder;
 import io.netty.handler.codec.mqtt.MqttEncoder;
+import io.netty.handler.codec.mqtt.MqttMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * MQTTProtocolManager
  */
-class MQTTProtocolManager implements ProtocolManager, NotificationListener {
+class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
+        implements NotificationListener {
 
    private ActiveMQServer server;
 
    private MQTTLogger log = MQTTLogger.LOGGER;
+   private final List<MQTTInterceptor> incomingInterceptors = new 
ArrayList<>();
+   private final List<MQTTInterceptor> outgoingInterceptors = new 
ArrayList<>();
 
-   MQTTProtocolManager(ActiveMQServer server) {
+   MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> 
incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
+      this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
    }
 
    @Override
@@ -58,8 +65,12 @@ class MQTTProtocolManager implements ProtocolManager, 
NotificationListener {
    }
 
    @Override
-   public void updateInterceptors(List incomingInterceptors, List 
outgoingInterceptors) {
-      // TODO handle interceptors
+   public void updateInterceptors(List incoming, List outgoing) {
+      this.incomingInterceptors.clear();
+      
this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+
+      this.outgoingInterceptors.clear();
+      
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
    }
 
    @Override
@@ -100,7 +111,7 @@ class MQTTProtocolManager implements ProtocolManager, 
NotificationListener {
       pipeline.addLast(new MqttEncoder());
       pipeline.addLast(new MqttDecoder(MQTTUtil.MAX_MESSAGE_SIZE));
 
-      pipeline.addLast(new MQTTProtocolHandler(server));
+      pipeline.addLast(new MQTTProtocolHandler(server, this));
    }
 
    @Override
@@ -126,4 +137,12 @@ class MQTTProtocolManager implements ProtocolManager, 
NotificationListener {
    @Override
    public void handshake(NettyServerConnection connection, ActiveMQBuffer 
buffer) {
    }
+
+   public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection 
connection) {
+      super.invokeInterceptors(this.incomingInterceptors, mqttMessage, 
connection);
+   }
+
+   public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection 
connection) {
+      super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, 
connection);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 982723f..58826f6 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -22,12 +22,13 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.osgi.service.component.annotations.Component;
 
 @Component(service = ProtocolManagerFactory.class)
-public class MQTTProtocolManagerFactory implements 
ProtocolManagerFactory<BaseInterceptor> {
+public class MQTTProtocolManagerFactory extends 
AbstractProtocolManagerFactory<MQTTInterceptor> {
 
    public static final String MQTT_PROTOCOL_NAME = "MQTT";
 
@@ -40,13 +41,12 @@ public class MQTTProtocolManagerFactory implements 
ProtocolManagerFactory<BaseIn
                                                 final Map<String, Object> 
parameters,
                                                 List<BaseInterceptor> 
incomingInterceptors,
                                                 List<BaseInterceptor> 
outgoingInterceptors) {
-      return new MQTTProtocolManager(server);
+      return new MQTTProtocolManager(server, incomingInterceptors, 
outgoingInterceptors);
    }
 
    @Override
-   public List filterInterceptors(List list) {
-      // TODO Add support for interceptors.
-      return null;
+   public List<MQTTInterceptor> filterInterceptors(List<BaseInterceptor> 
interceptors) {
+      return internalFilterInterceptors(MQTTInterceptor.class, interceptors);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 4b6f5ba..601d833 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -37,9 +37,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -52,7 +52,7 @@ import static 
org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
 /**
  * StompProtocolManager
  */
-class StompProtocolManager implements ProtocolManager<StompFrameInterceptor> {
+class StompProtocolManager extends 
AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -410,21 +410,4 @@ class StompProtocolManager implements 
ProtocolManager<StompFrameInterceptor> {
    public ActiveMQServer getServer() {
       return server;
    }
-
-   private void invokeInterceptors(List<StompFrameInterceptor> interceptors,
-                                   final StompFrame frame,
-                                   final StompConnection connection) {
-      if (interceptors != null && !interceptors.isEmpty()) {
-         for (StompFrameInterceptor interceptor : interceptors) {
-            try {
-               if (!interceptor.intercept(frame, connection)) {
-                  break;
-               }
-            }
-            catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.error(e);
-            }
-         }
-      }
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
new file mode 100644
index 0000000..ddefe86
--- /dev/null
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManager.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.activemq.artemis.spi.core.protocol;
+
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+
+import java.util.List;
+
+public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, 
C extends RemotingConnection>
+      implements ProtocolManager<I> {
+
+   protected void invokeInterceptors(final List<I> interceptors,
+                                     final P message,
+                                     final C connection) {
+      if (interceptors != null && !interceptors.isEmpty()) {
+         for (I interceptor : interceptors) {
+            try {
+               if (!interceptor.intercept(message, connection)) {
+                  break;
+               }
+            }
+            catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.error(e);
+            }
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/docs/hacking-guide/en/maintainers.md
----------------------------------------------------------------------
diff --git a/docs/hacking-guide/en/maintainers.md 
b/docs/hacking-guide/en/maintainers.md
index e0e6dad..4319f5a 100644
--- a/docs/hacking-guide/en/maintainers.md
+++ b/docs/hacking-guide/en/maintainers.md
@@ -11,7 +11,7 @@ What does it mean to be reasonably confident? If the 
developer has run the same
 builds are running they can be reasonably confident. Currently the [PR 
build](https://builds.apache.org/job/ActiveMQ-Artemis-PR-Build/)
 runs this command:
 
-    mvn compile test-compile javadoc:javadoc -Pfast-tests -Pextra-tests test
+    mvn -Pfast-tests -Pextra-tests install
 
 However, if the changes are significant, touches a wide area of code, or even 
if the developer just wants a second
 opinion they are encouraged to engage other members of the community to obtain 
an additional review prior to pushing.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/docs/user-manual/en/intercepting-operations.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/intercepting-operations.md 
b/docs/user-manual/en/intercepting-operations.md
index 6c6a4af..2b1d0d4 100644
--- a/docs/user-manual/en/intercepting-operations.md
+++ b/docs/user-manual/en/intercepting-operations.md
@@ -9,7 +9,9 @@ makes interceptors powerful, but also potentially dangerous.
 
 ## Implementing The Interceptors
 
-An interceptor must implement the `Interceptor interface`:
+All interceptors are protocol specific.
+
+An interceptor for the core protocol must implement the interface 
`Interceptor`:
 
 ``` java
 package org.apache.artemis.activemq.api.core.interceptor;
@@ -20,14 +22,25 @@ public interface Interceptor
 }
 ```
 
-For stomp protocol an interceptor must implement the `StompFrameInterceptor 
class`:
+For stomp protocol an interceptor must implement the interface 
`StompFrameInterceptor`:
 
 ``` java
 package org.apache.activemq.artemis.core.protocol.stomp;
 
-public interface StompFrameInterceptor
+public interface StompFrameInterceptor extends BaseInterceptor<StompFrame>
+{
+   boolean intercept(StompFrame stompFrame, RemotingConnection connection);
+}
+```
+
+Likewise for MQTT protocol, an interceptor must implement the interface 
`MQTTInterceptor`:
+ 
+``` java
+package org.apache.activemq.artemis.core.protocol.mqtt;
+
+public interface MQTTInterceptor extends BaseInterceptor<MqttMessage>
 {
-   public abstract boolean intercept(StompFrame stompFrame, RemotingConnection 
connection);
+    boolean intercept(MqttMessage mqttMessage, RemotingConnection connection);
 }
 ```
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index b305c80..1b0964b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -246,7 +246,9 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test(timeout = 60 * 1000)
-   public void testSendAndReceiveExactlyOnce() throws Exception {
+   public void testSendAndReceiveExactlyOnceWithInterceptors() throws 
Exception {
+      MQTTIncomingInterceptor.clear();
+      MQTTOutoingInterceptor.clear();
       final MQTTClientProvider publisher = getMQTTClientProvider();
       initializeConnection(publisher);
 
@@ -263,6 +265,8 @@ public class MQTTTest extends MQTTTestSupport {
       }
       subscriber.disconnect();
       publisher.disconnect();
+      assertEquals(NUM_MESSAGES, MQTTIncomingInterceptor.getMessageCount());
+      assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
    }
 
    @Test(timeout = 60 * 1000)
@@ -380,7 +384,8 @@ public class MQTTTest extends MQTTTestSupport {
          Message msg = connection.receive(5, TimeUnit.SECONDS);
          do {
             assertNotNull("RETAINED null " + wildcard, msg);
-            assertTrue("RETAINED prefix " + wildcard, new 
String(msg.getPayload()).startsWith(RETAINED));
+            String msgPayload = new String(msg.getPayload());
+            assertTrue("RETAINED prefix " + wildcard + " msg " + msgPayload, 
msgPayload.startsWith(RETAINED));
             assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), 
pattern.matcher(msg.getTopic()).matches());
             msg.ack();
             msg = connection.receive(5000, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7c746c71/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 73489af..010949c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -29,13 +29,19 @@ import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.handler.codec.mqtt.MqttMessage;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Tracer;
@@ -47,6 +53,8 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Collections.singletonList;
+
 public class MQTTTestSupport extends ActiveMQTestBase {
 
    private ActiveMQServer server;
@@ -79,11 +87,6 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       return new File(new 
File(protectionDomain.getCodeSource().getLocation().getPath()), 
"../..").getCanonicalFile();
    }
 
-   public MQTTTestSupport(String connectorScheme, boolean useSSL) {
-      this.protocolScheme = connectorScheme;
-      this.useSSL = useSSL;
-   }
-
    @Override
    public String getName() {
       return name.getMethodName();
@@ -120,7 +123,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
    public void startBroker() throws Exception {
       // TODO Add SSL
       super.setUp();
-      server = createServer(true, true);
+      server = createServerForMQTT();
       addCoreConnector();
       addMQTTConnector();
       AddressSettings addressSettings = new AddressSettings();
@@ -132,12 +135,19 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       server.waitForActivation(10, TimeUnit.SECONDS);
    }
 
+   private ActiveMQServer createServerForMQTT() throws Exception {
+      Configuration defaultConfig = createDefaultConfig(true)
+              
.setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName()))
+              
.setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
+      return createServer(true, defaultConfig);
+   }
+
    protected void addCoreConnector() throws Exception {
       // Overrides of this method can add additional configuration options or 
add multiple
       // MQTT transport connectors as needed, the port variable is always 
supposed to be
       // assigned the primary MQTT connector's port.
 
-      HashMap<String, Object> params = new HashMap<>();
+      Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, "" + 5445);
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "CORE");
       TransportConfiguration transportConfiguration = new 
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@@ -151,7 +161,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       // MQTT transport connectors as needed, the port variable is always 
supposed to be
       // assigned the primary MQTT connector's port.
 
-      HashMap<String, Object> params = new HashMap<>();
+      Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, "" + port);
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
       TransportConfiguration transportConfiguration = new 
TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@@ -336,4 +346,42 @@ public class MQTTTestSupport extends ActiveMQTestBase {
          return new X509Certificate[0];
       }
    }
+
+   public static class MQTTIncomingInterceptor implements MQTTInterceptor {
+
+      private static int messageCount = 0;
+
+      @Override
+      public boolean intercept(MqttMessage packet, RemotingConnection 
connection) throws ActiveMQException {
+         messageCount++;
+         return true;
+      }
+
+      public static void clear() {
+         messageCount = 0;
+      }
+
+      public static int getMessageCount() {
+         return messageCount;
+      }
+   }
+
+   public static class MQTTOutoingInterceptor implements MQTTInterceptor {
+
+      private static int messageCount = 0;
+
+      @Override
+      public boolean intercept(MqttMessage packet, RemotingConnection 
connection) throws ActiveMQException {
+         messageCount++;
+         return true;
+      }
+
+      public static void clear() {
+         messageCount = 0;
+      }
+
+      public static int getMessageCount() {
+         return messageCount;
+      }
+   }
 }

Reply via email to