Repository: activemq-artemis
Updated Branches:
  refs/heads/master 303d97c76 -> 5391d42e4


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
new file mode 100644
index 0000000..9f6b4ea
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import 
org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CorePluginTest extends JMSTestBase {
+
+   private Queue queue;
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new 
MethodCalledVerifier(methodCalls);
+   public static final String INVM_CONNECTOR_FACTORY = 
InVMConnectorFactory.class.getCanonicalName();
+
+   @Override
+   protected Configuration createDefaultConfig(boolean netty) throws Exception 
{
+      Configuration config = super.createDefaultConfig(netty);
+      config.registerBrokerPlugin(verifier);
+      return config;
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      queue = createQueue("queue1");
+   }
+
+
+   @Test
+   public void testSendReceive() throws Exception {
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      MessageConsumer cons = sess.createConsumer(queue);
+
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      TextMessage received1 = (TextMessage)cons.receive(1000);
+      assertNotNull(received1);
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+            BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
+      verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, 
BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, 
BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, 
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+   }
+
+   @Test
+   public void testDestroyQueue() throws Exception {
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      sess.createProducer(queue);
+      conn.close();
+
+      server.destroyQueue(new SimpleString(queue.getQueueName()));
+
+      verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, 
AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
+            AFTER_DESTROY_QUEUE);
+   }
+
+   @Test
+   public void testMessageExpireServer() throws Exception {
+      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+      conn = cf.createConnection();
+      conn.setClientID("test");
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      prod.setTimeToLive(1);
+      MessageConsumer cons = sess.createConsumer(queue);
+      Thread.sleep(100);
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      Thread.sleep(100);
+      assertNull(cons.receive(100));
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, 
BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, 
BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, 
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+
+   }
+
+   @Test
+   public void testMessageExpireClient() throws Exception {
+      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+
+      conn = cf.createConnection();
+      conn.start();
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue);
+      prod.setTimeToLive(500);
+      MessageConsumer cons = sess.createConsumer(queue);
+
+      TextMessage msg1 = sess.createTextMessage("test");
+      prod.send(msg1);
+      Thread.sleep(500);
+      assertNull(cons.receive(500));
+
+      conn.close();
+
+      verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, 
AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION,
+            BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, 
BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
+            BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, 
BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
+            AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, 
MESSAGE_EXPIRED);
+      verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, 
AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
+            AFTER_CLOSE_SESSION);
+
+   }
+
+   @Test
+   public void testSimpleBridge() throws Exception {
+      server.stop();
+      ActiveMQServer server0;
+      ActiveMQServer server1;
+
+      Map<String, Object> server0Params = new HashMap<>();
+      server0 = createClusteredServerWithParams(false, 0, false, 
server0Params);
+
+      Map<String, Object> server1Params = new HashMap<>();
+      server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      server1 = createClusteredServerWithParams(false, 1, false, 
server1Params);
+
+      final String testAddress = "testAddress";
+      final String queueName0 = "queue0";
+      final String forwardAddress = "forwardAddress";
+
+      TransportConfiguration server1tc = new 
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params);
+
+      HashMap<String, TransportConfiguration> connectors = new HashMap<>();
+      connectors.put(server1tc.getName(), server1tc);
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+      server0.registerBrokerPlugin(verifier);
+
+      ArrayList<String> connectorConfig = new ArrayList<>();
+      connectorConfig.add(server1tc.getName());
+      BridgeConfiguration bridgeConfiguration = new 
BridgeConfiguration().setName("bridge1")
+                                                                         
.setQueueName(queueName0)
+                                                                         
.setForwardingAddress(forwardAddress)
+                                                                         
.setRetryInterval(1000)
+                                                                         
.setReconnectAttemptsOnSameNode(-1)
+                                                                         
.setUseDuplicateDetection(false)
+                                                                         
.setStaticConnectors(connectorConfig);
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      CoreQueueConfiguration queueConfig0 = new 
CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
+      List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+      server1.start();
+      server0.start();
+
+      verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, 
AFTER_DEPLOY_BRIDGE);
+
+      server0.stop();
+      server1.stop();
+
+   }
+
+   private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
+
+      @Override
+      public void messageAcknowledged(MessageReference ref, AckReason reason) {
+         assertEquals(AckReason.EXPIRED, reason);
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
new file mode 100644
index 0000000..14aa4a1
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -0,0 +1,276 @@
+/**
+ *
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
+import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueConfig;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.Bridge;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+public class MethodCalledVerifier implements ActiveMQServerPlugin {
+
+   private final Map<String, AtomicInteger> methodCalls;
+
+   public static final String AFTER_CREATE_CONNECTION = 
"afterCreateConnection";
+   public static final String AFTER_DESTROY_CONNECTION = 
"afterDestroyConnection";
+   public static final String BEFORE_CREATE_SESSION = "beforeCreateSession";
+   public static final String AFTER_CREATE_SESSION = "afterCreateSession";
+   public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession";
+   public static final String AFTER_CLOSE_SESSION = "afterCloseSession";
+   public static final String BEFORE_SESSION_METADATA_ADDED = 
"beforeSessionMetadataAdded";
+   public static final String AFTER_SESSION_METADATA_ADDED = 
"afterSessionMetadataAdded";
+   public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer";
+   public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer";
+   public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer";
+   public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer";
+   public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue";
+   public static final String AFTER_CREATE_QUEUE = "afterCreateQueue";
+   public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue";
+   public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue";
+   public static final String MESSAGE_EXPIRED = "messageExpired";
+   public static final String MESSAGE_ACKED = "messageAcknowledged";
+   public static final String BEFORE_SEND = "beforeSend";
+   public static final String AFTER_SEND = "afterSend";
+   public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute";
+   public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute";
+   public static final String BEFORE_DELIVER = "beforeDeliver";
+   public static final String AFTER_DELIVER = "afterDeliver";
+   public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge";
+   public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge";
+
+   /**
+    * @param methods
+    */
+   public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) {
+      super();
+      this.methodCalls = methodCalls;
+   }
+
+   @Override
+   public void afterCreateConnection(RemotingConnection connection) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(AFTER_CREATE_CONNECTION);
+   }
+
+   @Override
+   public void afterDestroyConnection(RemotingConnection connection) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(AFTER_DESTROY_CONNECTION);
+   }
+
+   @Override
+   public void beforeCreateSession(String name, String username, int 
minLargeMessageSize, RemotingConnection connection,
+                                   boolean autoCommitSends, boolean 
autoCommitAcks, boolean preAcknowledge, boolean xa,
+                                   String defaultAddress, SessionCallback 
callback, boolean autoCreateQueues,
+                                   OperationContext context, Map<SimpleString, 
RoutingType> prefixes) {
+      Preconditions.checkNotNull(connection);
+      methodCalled(BEFORE_CREATE_SESSION);
+   }
+
+   @Override
+   public void afterCreateSession(ServerSession session) {
+      Preconditions.checkNotNull(session);
+      methodCalled(AFTER_CREATE_SESSION);
+   }
+
+   @Override
+   public void beforeCloseSession(ServerSession session, boolean failed) {
+      Preconditions.checkNotNull(session);
+      methodCalled(BEFORE_CLOSE_SESSION);
+   }
+
+   @Override
+   public void afterCloseSession(ServerSession session, boolean failed) {
+      Preconditions.checkNotNull(session);
+      methodCalled(AFTER_CLOSE_SESSION);
+   }
+
+   @Override
+   public void beforeSessionMetadataAdded(ServerSession session, String key, 
String data) {
+      Preconditions.checkNotNull(key);
+      methodCalled(BEFORE_SESSION_METADATA_ADDED);
+   }
+
+   @Override
+   public void afterSessionMetadataAdded(ServerSession session, String key, 
String data) {
+      Preconditions.checkNotNull(key);
+      methodCalled(AFTER_SESSION_METADATA_ADDED);
+   }
+
+   @Override
+   public void beforeCreateConsumer(long consumerID, SimpleString queueName, 
SimpleString filterString,
+                                    boolean browseOnly, boolean 
supportLargeMessage) {
+      Preconditions.checkNotNull(queueName);
+      methodCalled(BEFORE_CREATE_CONSUMER);
+   }
+
+   @Override
+   public void afterCreateConsumer(ServerConsumer consumer) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(AFTER_CREATE_CONSUMER);
+   }
+
+   @Override
+   public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(BEFORE_CLOSE_CONSUMER);
+   }
+
+   @Override
+   public void afterCloseConsumer(ServerConsumer consumer, boolean failed) {
+      Preconditions.checkNotNull(consumer);
+      methodCalled(AFTER_CLOSE_CONSUMER);
+   }
+
+   @Override
+   public void beforeCreateQueue(QueueConfig queueConfig) {
+      Preconditions.checkNotNull(queueConfig);
+      methodCalled(BEFORE_CREATE_QUEUE);
+   }
+
+   @Override
+   public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue 
queue) {
+      Preconditions.checkNotNull(queue);
+      methodCalled(AFTER_CREATE_QUEUE);
+   }
+
+   @Override
+   public void beforeDestroyQueue(SimpleString queueName, SecurityAuth 
session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+      Preconditions.checkNotNull(queueName);
+      methodCalled(BEFORE_DESTROY_QUEUE);
+   }
+
+   @Override
+   public void afterDestroyQueue(Queue queue, SimpleString address, 
SecurityAuth session, boolean checkConsumerCount,
+         boolean removeConsumers, boolean autoDeleteAddress) {
+      Preconditions.checkNotNull(queue);
+      methodCalled(AFTER_DESTROY_QUEUE);
+   }
+
+   @Override
+   public void messageExpired(MessageReference message, SimpleString 
messageExpiryAddress) {
+      Preconditions.checkNotNull(message);
+      methodCalled(MESSAGE_EXPIRED);
+   }
+
+   @Override
+   public void messageAcknowledged(MessageReference ref, AckReason reason) {
+      Preconditions.checkNotNull(ref);
+      Preconditions.checkNotNull(reason);
+      methodCalled(MESSAGE_ACKED);
+   }
+
+   @Override
+   public void beforeSend(Transaction tx, Message message, boolean direct, 
boolean noAutoCreateQueue) {
+      Preconditions.checkNotNull(message);
+      methodCalled(BEFORE_SEND);
+   }
+
+   @Override
+   public void afterSend(Transaction tx, Message message, boolean direct, 
boolean noAutoCreateQueue,
+                         RoutingStatus result) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(result);
+      methodCalled(AFTER_SEND);
+   }
+
+   @Override
+   public void beforeMessageRoute(Message message, RoutingContext context, 
boolean direct, boolean rejectDuplicates) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(context);
+      methodCalled(BEFORE_MESSAGE_ROUTE);
+   }
+
+   @Override
+   public void afterMessageRoute(Message message, RoutingContext context, 
boolean direct, boolean rejectDuplicates,
+                                 RoutingStatus result) {
+      Preconditions.checkNotNull(message);
+      Preconditions.checkNotNull(context);
+      Preconditions.checkNotNull(result);
+      methodCalled(AFTER_MESSAGE_ROUTE);
+   }
+
+   @Override
+   public void beforeDeliver(MessageReference reference) {
+      Preconditions.checkNotNull(reference);
+      methodCalled(BEFORE_DELIVER);
+   }
+
+   @Override
+   public void afterDeliver(MessageReference reference) {
+      Preconditions.checkNotNull(reference);
+      methodCalled(AFTER_DELIVER);
+   }
+
+   @Override
+   public void beforeDeployBridge(BridgeConfiguration config) {
+      Preconditions.checkNotNull(config);
+      methodCalled(BEFORE_DEPLOY_BRIDGE);
+   }
+
+   @Override
+   public void afterDeployBridge(Bridge bridge) {
+      Preconditions.checkNotNull(bridge);
+      methodCalled(AFTER_DEPLOY_BRIDGE);
+   }
+
+   public void validatePluginMethodsEquals(int count, String... names) {
+      Arrays.asList(names).forEach(name -> {
+         assertEquals("validating method " + name, count, 
methodCalls.getOrDefault(name, new AtomicInteger()).get());
+      });
+   }
+
+   public void validatePluginMethodsAtLeast(int count, String... names) {
+      Arrays.asList(names).forEach(name -> {
+         assertTrue("validating method " + name, count <= 
methodCalls.getOrDefault(name, new AtomicInteger()).get());
+      });
+   }
+
+   private void methodCalled(String name) {
+      methodCalls.computeIfAbsent(name, k -> new 
AtomicInteger()).incrementAndGet();
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
new file mode 100644
index 0000000..5e7f127
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
+import 
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
+import 
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MqttPluginTest extends MQTTTestSupport {
+
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new 
MethodCalledVerifier(methodCalls);
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      Field sessions = MQTTSession.class.getDeclaredField("SESSIONS");
+      sessions.setAccessible(true);
+      sessions.set(null, new ConcurrentHashMap<>());
+
+      Field connectedClients = 
MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
+      connectedClients.setAccessible(true);
+      connectedClients.set(null, new ConcurrentHashSet<>());
+      super.setUp();
+
+   }
+
+   @Override
+   public void configureBroker() throws Exception {
+      super.configureBroker();
+      server.registerBrokerPlugin(verifier);
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testSendAndReceiveMQTT() throws Exception {
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+
+      subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+      final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES);
+
+      Thread thread = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            for (int i = 0; i < NUM_MESSAGES; i++) {
+               try {
+                  byte[] payload = subscriptionProvider.receive(10000);
+                  assertNotNull("Should get a message", payload);
+                  latch.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  break;
+               }
+
+            }
+         }
+      });
+      thread.start();
+
+      final MQTTClientProvider publishProvider = getMQTTClientProvider();
+      initializeConnection(publishProvider);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         String payload = "Message " + i;
+         publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
+      }
+
+      latch.await(10, TimeUnit.SECONDS);
+      assertEquals(0, latch.getCount());
+      subscriptionProvider.disconnect();
+      publishProvider.disconnect();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, 
BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, 
AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, 
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
new file mode 100644
index 0000000..afb6841
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Test;
+
+public class OpenwirePluginTest extends BasicOpenWireTest {
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new 
MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected ActiveMQServer createServer(boolean realFiles, Configuration 
configuration, long pageSize,
+                                         long maxAddressSize, Map<String, 
AddressSettings> settings) {
+      ActiveMQServer server = super.createServer(realFiles, configuration, 
pageSize, maxAddressSize, settings);
+      server.registerBrokerPlugin(verifier);
+      return server;
+   }
+
+   @Test
+   public void testAckedMessageAreConsumed() throws JMSException {
+      connection.start();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageProducer producer = session.createProducer(queue);
+      producer.send(session.createTextMessage("Hello"));
+
+      // Consume the message...
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message msg = consumer.receive(1000);
+      assertNotNull(msg);
+
+      // Reset the session.
+      session.close();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      // Attempt to Consume the message...
+      consumer = session.createConsumer(queue);
+      msg = consumer.receive(1000);
+      assertNull(msg);
+
+      session.close();
+      connection.close();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
+            BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, 
BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, 
AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, 
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
new file mode 100644
index 0000000..c771272
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.plugin;
+
+
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
+import static 
org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import 
org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StompPluginTest extends StompTestBase {
+
+   private static final transient IntegrationTestLogger log = 
IntegrationTestLogger.LOGGER;
+   public static final String CLIENT_ID = "myclientid";
+
+   private StompClientConnectionV12 conn;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      conn = 
(StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2",
 hostname, port);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         boolean connected = conn != null && conn.isConnected();
+         log.debug("Connection 1.2 : " + connected);
+         if (connected) {
+            conn.disconnect();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   private final Map<String, AtomicInteger> methodCalls = new HashMap<>();
+   private final MethodCalledVerifier verifier = new 
MethodCalledVerifier(methodCalls);
+
+   @Override
+   protected JMSServerManager createServer() throws Exception {
+      JMSServerManager server = super.createServer();
+      server.getActiveMQServer().registerBrokerPlugin(verifier);
+      return server;
+   }
+
+   @Test
+   public void testSendAndReceive() throws Exception {
+
+      // subscribehoward county escaped
+      StompClientConnection newConn = 
StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+      newConn.connect(defUser, defPass);
+      subscribe(newConn, "a-sub");
+
+      send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello 
World 1!");
+      ClientStompFrame frame = newConn.receiveFrame();
+
+      System.out.println("received " + frame);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+      // unsub
+      unsubscribe(newConn, "a-sub");
+
+      newConn.disconnect();
+
+      verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, 
BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
+      verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, 
AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION,
+            AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, 
BEFORE_CREATE_CONSUMER,
+            AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, 
AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE,
+            MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, 
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER,
+            AFTER_DELIVER);
+
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git 
a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
 
b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
index c0a4112..121a4b0 100644
--- 
a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
+++ 
b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java
@@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduledNoConsumer() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new 
SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, 
null, null, null, 
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), new 
SimpleString("queue1"), null, null, false, true,
+                             false, scheduledExecutor, null, null, null,
+                             
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()),
 null);
 
       // Send one scheduled
 
@@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase {
 
    @Test
    public void testScheduled() throws Exception {
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new 
SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, 
null, null, null, 
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), new 
SimpleString("queue1"), null, null, false, true,
+                             false, scheduledExecutor, null, null, null,
+                             
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()),
 null);
 
       FakeConsumer consumer = null;
 
@@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase {
          public void disconnect() {
          }
       };
-      QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), 
QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, 
null, null, 
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()));
+      QueueImpl queue =
+               new QueueImpl(1, new SimpleString("address1"), 
QueueImplTest.queue1, null, null, false, true, false,
+                             scheduledExecutor, null, null, null,
+                             
Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()),
 null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 
2000);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 78179a8..0a08eb6 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase {
    }
 
    private QueueImpl getQueue(SimpleString name, boolean durable, boolean 
temporary, Filter filter) {
-      return new QueueImpl(1, QueueImplTest.address1, name, filter, null, 
durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, 
executor);
+      return new QueueImpl(1, QueueImplTest.address1, name, filter, null, 
durable, temporary, false, scheduledExecutor,
+                           new FakePostOffice(), null, null, executor, null);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
index 06c7e1e..40c117a 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
@@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory {
 
    @Override
    public Queue createQueueWith(final QueueConfig config) {
-      return new QueueImpl(config.id(), config.address(), config.name(), 
config.filter(), config.pageSubscription(), config.user(), config.isDurable(), 
config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, 
null, null, executor);
+      return new QueueImpl(config.id(), config.address(), config.name(), 
config.filter(), config.pageSubscription(),
+                           config.user(), config.isDurable(), 
config.isTemporary(), config.isAutoCreated(),
+                           scheduledExecutor, postOffice, null, null, 
executor, null);
    }
 
    @Deprecated
@@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory {
                             final boolean durable,
                             final boolean temporary,
                             final boolean autoCreated) {
-      return new QueueImpl(persistenceID, address, name, filter, subscription, 
user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, 
null, executor);
+      return new QueueImpl(persistenceID, address, name, filter, subscription, 
user, durable, temporary, autoCreated,
+                           scheduledExecutor, postOffice, null, null, 
executor, null);
    }
 
    @Override

Reply via email to