Repository: activemq-artemis Updated Branches: refs/heads/master 303d97c76 -> 5391d42e4
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java new file mode 100644 index 0000000..9f6b4ea --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; +import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +public class CorePluginTest extends JMSTestBase { + + private Queue queue; + + private final Map<String, AtomicInteger> methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); + + @Override + protected Configuration createDefaultConfig(boolean netty) throws Exception { + Configuration config = super.createDefaultConfig(netty); + config.registerBrokerPlugin(verifier); + return config; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + queue = createQueue("queue1"); + } + + + @Test + public void testSendReceive() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + MessageConsumer cons = sess.createConsumer(queue); + + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + TextMessage received1 = (TextMessage)cons.receive(1000); + assertNotNull(received1); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE); + verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + } + + @Test + public void testDestroyQueue() throws Exception { + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + sess.createProducer(queue); + conn.close(); + + server.destroyQueue(new SimpleString(queue.getQueueName())); + + verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE, + AFTER_DESTROY_QUEUE); + } + + @Test + public void testMessageExpireServer() throws Exception { + server.registerBrokerPlugin(new ExpiredPluginVerifier()); + + conn = cf.createConnection(); + conn.setClientID("test"); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + prod.setTimeToLive(1); + MessageConsumer cons = sess.createConsumer(queue); + Thread.sleep(100); + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + Thread.sleep(100); + assertNull(cons.receive(100)); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, BEFORE_DELIVER, AFTER_DELIVER, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + + } + + @Test + public void testMessageExpireClient() throws Exception { + server.registerBrokerPlugin(new ExpiredPluginVerifier()); + + conn = cf.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer prod = sess.createProducer(queue); + prod.setTimeToLive(500); + MessageConsumer cons = sess.createConsumer(queue); + + TextMessage msg1 = sess.createTextMessage("test"); + prod.send(msg1); + Thread.sleep(500); + assertNull(cons.receive(500)); + + conn.close(); + + verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, + BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, + BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, + AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED); + verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, + AFTER_CLOSE_SESSION); + + } + + @Test + public void testSimpleBridge() throws Exception { + server.stop(); + ActiveMQServer server0; + ActiveMQServer server1; + + Map<String, Object> server0Params = new HashMap<>(); + server0 = createClusteredServerWithParams(false, 0, false, server0Params); + + Map<String, Object> server1Params = new HashMap<>(); + server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1); + server1 = createClusteredServerWithParams(false, 1, false, server1Params); + + final String testAddress = "testAddress"; + final String queueName0 = "queue0"; + final String forwardAddress = "forwardAddress"; + + TransportConfiguration server1tc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params); + + HashMap<String, TransportConfiguration> connectors = new HashMap<>(); + connectors.put(server1tc.getName(), server1tc); + server0.getConfiguration().setConnectorConfigurations(connectors); + server0.registerBrokerPlugin(verifier); + + ArrayList<String> connectorConfig = new ArrayList<>(); + connectorConfig.add(server1tc.getName()); + BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1") + .setQueueName(queueName0) + .setForwardingAddress(forwardAddress) + .setRetryInterval(1000) + .setReconnectAttemptsOnSameNode(-1) + .setUseDuplicateDetection(false) + .setStaticConnectors(connectorConfig); + + List<BridgeConfiguration> bridgeConfigs = new ArrayList<>(); + bridgeConfigs.add(bridgeConfiguration); + server0.getConfiguration().setBridgeConfigurations(bridgeConfigs); + + CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0); + List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>(); + queueConfigs0.add(queueConfig0); + server0.getConfiguration().setQueueConfigurations(queueConfigs0); + + server1.start(); + server0.start(); + + verifier.validatePluginMethodsEquals(1, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + + server0.stop(); + server1.stop(); + + } + + private class ExpiredPluginVerifier implements ActiveMQServerPlugin { + + @Override + public void messageAcknowledged(MessageReference ref, AckReason reason) { + assertEquals(AckReason.EXPIRED, reason); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java new file mode 100644 index 0000000..14aa4a1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java @@ -0,0 +1,276 @@ +/** + * + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.BridgeConfiguration; +import org.apache.activemq.artemis.core.persistence.OperationContext; +import org.apache.activemq.artemis.core.postoffice.RoutingStatus; +import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.ServerConsumer; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.cluster.Bridge; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; + + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +public class MethodCalledVerifier implements ActiveMQServerPlugin { + + private final Map<String, AtomicInteger> methodCalls; + + public static final String AFTER_CREATE_CONNECTION = "afterCreateConnection"; + public static final String AFTER_DESTROY_CONNECTION = "afterDestroyConnection"; + public static final String BEFORE_CREATE_SESSION = "beforeCreateSession"; + public static final String AFTER_CREATE_SESSION = "afterCreateSession"; + public static final String BEFORE_CLOSE_SESSION = "beforeCloseSession"; + public static final String AFTER_CLOSE_SESSION = "afterCloseSession"; + public static final String BEFORE_SESSION_METADATA_ADDED = "beforeSessionMetadataAdded"; + public static final String AFTER_SESSION_METADATA_ADDED = "afterSessionMetadataAdded"; + public static final String BEFORE_CREATE_CONSUMER = "beforeCreateConsumer"; + public static final String AFTER_CREATE_CONSUMER = "afterCreateConsumer"; + public static final String BEFORE_CLOSE_CONSUMER = "beforeCloseConsumer"; + public static final String AFTER_CLOSE_CONSUMER = "afterCloseConsumer"; + public static final String BEFORE_CREATE_QUEUE = "beforeCreateQueue"; + public static final String AFTER_CREATE_QUEUE = "afterCreateQueue"; + public static final String BEFORE_DESTROY_QUEUE = "beforeDestroyQueue"; + public static final String AFTER_DESTROY_QUEUE = "afterDestroyQueue"; + public static final String MESSAGE_EXPIRED = "messageExpired"; + public static final String MESSAGE_ACKED = "messageAcknowledged"; + public static final String BEFORE_SEND = "beforeSend"; + public static final String AFTER_SEND = "afterSend"; + public static final String BEFORE_MESSAGE_ROUTE = "beforeMessageRoute"; + public static final String AFTER_MESSAGE_ROUTE = "afterMessageRoute"; + public static final String BEFORE_DELIVER = "beforeDeliver"; + public static final String AFTER_DELIVER = "afterDeliver"; + public static final String BEFORE_DEPLOY_BRIDGE = "beforeDeployBridge"; + public static final String AFTER_DEPLOY_BRIDGE = "afterDeployBridge"; + + /** + * @param methods + */ + public MethodCalledVerifier(Map<String, AtomicInteger> methodCalls) { + super(); + this.methodCalls = methodCalls; + } + + @Override + public void afterCreateConnection(RemotingConnection connection) { + Preconditions.checkNotNull(connection); + methodCalled(AFTER_CREATE_CONNECTION); + } + + @Override + public void afterDestroyConnection(RemotingConnection connection) { + Preconditions.checkNotNull(connection); + methodCalled(AFTER_DESTROY_CONNECTION); + } + + @Override + public void beforeCreateSession(String name, String username, int minLargeMessageSize, RemotingConnection connection, + boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, + String defaultAddress, SessionCallback callback, boolean autoCreateQueues, + OperationContext context, Map<SimpleString, RoutingType> prefixes) { + Preconditions.checkNotNull(connection); + methodCalled(BEFORE_CREATE_SESSION); + } + + @Override + public void afterCreateSession(ServerSession session) { + Preconditions.checkNotNull(session); + methodCalled(AFTER_CREATE_SESSION); + } + + @Override + public void beforeCloseSession(ServerSession session, boolean failed) { + Preconditions.checkNotNull(session); + methodCalled(BEFORE_CLOSE_SESSION); + } + + @Override + public void afterCloseSession(ServerSession session, boolean failed) { + Preconditions.checkNotNull(session); + methodCalled(AFTER_CLOSE_SESSION); + } + + @Override + public void beforeSessionMetadataAdded(ServerSession session, String key, String data) { + Preconditions.checkNotNull(key); + methodCalled(BEFORE_SESSION_METADATA_ADDED); + } + + @Override + public void afterSessionMetadataAdded(ServerSession session, String key, String data) { + Preconditions.checkNotNull(key); + methodCalled(AFTER_SESSION_METADATA_ADDED); + } + + @Override + public void beforeCreateConsumer(long consumerID, SimpleString queueName, SimpleString filterString, + boolean browseOnly, boolean supportLargeMessage) { + Preconditions.checkNotNull(queueName); + methodCalled(BEFORE_CREATE_CONSUMER); + } + + @Override + public void afterCreateConsumer(ServerConsumer consumer) { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CREATE_CONSUMER); + } + + @Override + public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) { + Preconditions.checkNotNull(consumer); + methodCalled(BEFORE_CLOSE_CONSUMER); + } + + @Override + public void afterCloseConsumer(ServerConsumer consumer, boolean failed) { + Preconditions.checkNotNull(consumer); + methodCalled(AFTER_CLOSE_CONSUMER); + } + + @Override + public void beforeCreateQueue(QueueConfig queueConfig) { + Preconditions.checkNotNull(queueConfig); + methodCalled(BEFORE_CREATE_QUEUE); + } + + @Override + public void afterCreateQueue(org.apache.activemq.artemis.core.server.Queue queue) { + Preconditions.checkNotNull(queue); + methodCalled(AFTER_CREATE_QUEUE); + } + + @Override + public void beforeDestroyQueue(SimpleString queueName, SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + Preconditions.checkNotNull(queueName); + methodCalled(BEFORE_DESTROY_QUEUE); + } + + @Override + public void afterDestroyQueue(Queue queue, SimpleString address, SecurityAuth session, boolean checkConsumerCount, + boolean removeConsumers, boolean autoDeleteAddress) { + Preconditions.checkNotNull(queue); + methodCalled(AFTER_DESTROY_QUEUE); + } + + @Override + public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) { + Preconditions.checkNotNull(message); + methodCalled(MESSAGE_EXPIRED); + } + + @Override + public void messageAcknowledged(MessageReference ref, AckReason reason) { + Preconditions.checkNotNull(ref); + Preconditions.checkNotNull(reason); + methodCalled(MESSAGE_ACKED); + } + + @Override + public void beforeSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) { + Preconditions.checkNotNull(message); + methodCalled(BEFORE_SEND); + } + + @Override + public void afterSend(Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue, + RoutingStatus result) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(result); + methodCalled(AFTER_SEND); + } + + @Override + public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + methodCalled(BEFORE_MESSAGE_ROUTE); + } + + @Override + public void afterMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates, + RoutingStatus result) { + Preconditions.checkNotNull(message); + Preconditions.checkNotNull(context); + Preconditions.checkNotNull(result); + methodCalled(AFTER_MESSAGE_ROUTE); + } + + @Override + public void beforeDeliver(MessageReference reference) { + Preconditions.checkNotNull(reference); + methodCalled(BEFORE_DELIVER); + } + + @Override + public void afterDeliver(MessageReference reference) { + Preconditions.checkNotNull(reference); + methodCalled(AFTER_DELIVER); + } + + @Override + public void beforeDeployBridge(BridgeConfiguration config) { + Preconditions.checkNotNull(config); + methodCalled(BEFORE_DEPLOY_BRIDGE); + } + + @Override + public void afterDeployBridge(Bridge bridge) { + Preconditions.checkNotNull(bridge); + methodCalled(AFTER_DEPLOY_BRIDGE); + } + + public void validatePluginMethodsEquals(int count, String... names) { + Arrays.asList(names).forEach(name -> { + assertEquals("validating method " + name, count, methodCalls.getOrDefault(name, new AtomicInteger()).get()); + }); + } + + public void validatePluginMethodsAtLeast(int count, String... names) { + Arrays.asList(names).forEach(name -> { + assertTrue("validating method " + name, count <= methodCalls.getOrDefault(name, new AtomicInteger()).get()); + }); + } + + private void methodCalled(String name) { + methodCalls.computeIfAbsent(name, k -> new AtomicInteger()).incrementAndGet(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java new file mode 100644 index 0000000..5e7f127 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.junit.Before; +import org.junit.Test; + +public class MqttPluginTest extends MQTTTestSupport { + + + private final Map<String, AtomicInteger> methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + @Before + public void setUp() throws Exception { + Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); + sessions.setAccessible(true); + sessions.set(null, new ConcurrentHashMap<>()); + + Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); + connectedClients.setAccessible(true); + connectedClients.set(null, new ConcurrentHashSet<>()); + super.setUp(); + + } + + @Override + public void configureBroker() throws Exception { + super.configureBroker(); + server.registerBrokerPlugin(verifier); + } + + @Test(timeout = 60 * 1000) + public void testSendAndReceiveMQTT() throws Exception { + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE); + + final CountDownLatch latch = new CountDownLatch(NUM_MESSAGES); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < NUM_MESSAGES; i++) { + try { + byte[] payload = subscriptionProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + + for (int i = 0; i < NUM_MESSAGES; i++) { + String payload = "Message " + i; + publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); + } + + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + subscriptionProvider.disconnect(); + publishProvider.disconnect(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java new file mode 100644 index 0000000..afb6841 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/OpenwirePluginTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +public class OpenwirePluginTest extends BasicOpenWireTest { + + private final Map<String, AtomicInteger> methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected ActiveMQServer createServer(boolean realFiles, Configuration configuration, long pageSize, + long maxAddressSize, Map<String, AddressSettings> settings) { + ActiveMQServer server = super.createServer(realFiles, configuration, pageSize, maxAddressSize, settings); + server.registerBrokerPlugin(verifier); + return server; + } + + @Test + public void testAckedMessageAreConsumed() throws JMSException { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + // Reset the session. + session.close(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + connection.close(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE, + BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java new file mode 100644 index 0000000..c771272 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/StompPluginTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.plugin; + + +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_QUEUE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_SESSION; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED; +import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StompPluginTest extends StompTestBase { + + private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + public static final String CLIENT_ID = "myclientid"; + + private StompClientConnectionV12 conn; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conn = (StompClientConnectionV12)StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + } + + @Override + @After + public void tearDown() throws Exception { + try { + boolean connected = conn != null && conn.isConnected(); + log.debug("Connection 1.2 : " + connected); + if (connected) { + conn.disconnect(); + } + } finally { + super.tearDown(); + } + } + + private final Map<String, AtomicInteger> methodCalls = new HashMap<>(); + private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); + + @Override + protected JMSServerManager createServer() throws Exception { + JMSServerManager server = super.createServer(); + server.getActiveMQServer().registerBrokerPlugin(verifier); + return server; + } + + @Test + public void testSendAndReceive() throws Exception { + + // subscribehoward county escaped + StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + newConn.connect(defUser, defPass); + subscribe(newConn, "a-sub"); + + send(newConn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!"); + ClientStompFrame frame = newConn.receiveFrame(); + + System.out.println("received " + frame); + Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand()); + + // unsub + unsubscribe(newConn, "a-sub"); + + newConn.disconnect(); + + verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE); + verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION, BEFORE_CREATE_SESSION, + AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION, BEFORE_CREATE_CONSUMER, + AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, + MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, + AFTER_DELIVER); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java ---------------------------------------------------------------------- diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java index c0a4112..121a4b0 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueImplTest.java @@ -67,7 +67,10 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduledNoConsumer() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, + false, scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); // Send one scheduled @@ -132,7 +135,10 @@ public class QueueImplTest extends ActiveMQTestBase { @Test public void testScheduled() throws Exception { - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, false, true, + false, scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); FakeConsumer consumer = null; @@ -230,7 +236,10 @@ public class QueueImplTest extends ActiveMQTestBase { public void disconnect() { } }; - QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, scheduledExecutor, null, null, null, Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory())); + QueueImpl queue = + new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, null, null, false, true, false, + scheduledExecutor, null, null, null, + Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()), null); MessageReference messageReference = generateReference(queue, 1); queue.addConsumer(consumer); messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 78179a8..0a08eb6 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1309,6 +1309,7 @@ public class QueueImplTest extends ActiveMQTestBase { } private QueueImpl getQueue(SimpleString name, boolean durable, boolean temporary, Filter filter) { - return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, new FakePostOffice(), null, null, executor); + return new QueueImpl(1, QueueImplTest.address1, name, filter, null, durable, temporary, false, scheduledExecutor, + new FakePostOffice(), null, null, executor, null); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1e1ede84/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index 06c7e1e..40c117a 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -40,7 +40,9 @@ public final class FakeQueueFactory implements QueueFactory { @Override public Queue createQueueWith(final QueueConfig config) { - return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor); + return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), + config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), + scheduledExecutor, postOffice, null, null, executor, null); } @Deprecated @@ -54,7 +56,8 @@ public final class FakeQueueFactory implements QueueFactory { final boolean durable, final boolean temporary, final boolean autoCreated) { - return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, null, null, executor); + return new QueueImpl(persistenceID, address, name, filter, subscription, user, durable, temporary, autoCreated, + scheduledExecutor, postOffice, null, null, executor, null); } @Override
