http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 000c72a..b3fc5ac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -68,7 +68,6 @@ import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -167,8 +166,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { private final OperationContext context; - private QueueCreator queueCreator; - // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<>(); @@ -203,7 +200,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString defaultAddress, final SessionCallback callback, final OperationContext context, - final QueueCreator queueCreator, final PagingManager pagingManager) throws Exception { this.username = username; @@ -251,8 +247,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { remotingConnection.addFailureListener(this); this.context = context; - this.queueCreator = queueCreator; - if (!xa) { tx = newTransaction(); } @@ -390,11 +384,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } } - @Override - public QueueCreator getQueueCreator() { - return queueCreator; - } - protected void securityCheck(SimpleString address, CheckType checkType, SecurityAuth auth) throws Exception { if (securityEnabled) { securityStore.check(address, checkType, auth); @@ -500,7 +489,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { - return createQueue(address, name, filterString, temporary, durable, null, null); + return createQueue(address, name, filterString, temporary, durable, null, null, false); } @Override @@ -510,7 +499,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean temporary, final boolean durable, final Integer maxConsumers, - final Boolean deleteOnNoConsumers) throws Exception { + final Boolean deleteOnNoConsumers, + final Boolean autoCreated) throws Exception { if (durable) { // make sure the user has privileges to create this queue securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this); @@ -520,7 +510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers); + Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -1485,7 +1475,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } private void installJMSHooks() { - this.queueCreator = server.getJMSDestinationCreator(); } private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() { @@ -1605,11 +1594,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } try { - if (noAutoCreateQueue) { - result = postOffice.route(msg, null, routingContext, direct); - } else { - result = postOffice.route(msg, queueCreator, routingContext, direct); - } + result = postOffice.route(msg, routingContext, direct); Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java index 0ade3f1..242cbc7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java @@ -464,7 +464,6 @@ public class ManagementServiceImpl implements ManagementService { public synchronized void registerInRegistry(final String resourceName, final Object managedResource) { unregisterFromRegistry(resourceName); - ActiveMQServerLogger.LOGGER.info("Registering: " + resourceName); registry.put(resourceName, managedResource); } @@ -653,7 +652,7 @@ public class ManagementServiceImpl implements ManagementService { notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID())); } - postOffice.route(notificationMessage, null, false); + postOffice.route(notificationMessage, false); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 41af399..3e9865d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -46,12 +46,14 @@ import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.junit.Assert; import org.junit.Test; +import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.ANYCAST; +import static org.apache.activemq.artemis.core.server.impl.AddressInfo.RoutingType.MULTICAST; + public class FileConfigurationTest extends ConfigurationImplTest { private final String fullConfigurationName = "ConfigurationTest-full-config.xml"; @@ -376,7 +378,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 1 CoreAddressConfiguration addressConfiguration = conf.getAddressConfigurations().get(0); assertEquals("addr1", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.ANYCAST, addressConfiguration.getRoutingType()); + assertEquals(ANYCAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 1 Queue 1 @@ -402,7 +404,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { // Addr 2 addressConfiguration = conf.getAddressConfigurations().get(1); assertEquals("addr2", addressConfiguration.getName()); - assertEquals(AddressInfo.RoutingType.MULTICAST, addressConfiguration.getRoutingType()); + assertEquals(MULTICAST, addressConfiguration.getRoutingType()); assertEquals(2, addressConfiguration.getQueueConfigurations().size()); // Addr 2 Queue 1 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java ---------------------------------------------------------------------- diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java index a5a5015..4d89e6d 100644 --- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java +++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/artemis/integration/vertx/IncomingVertxEventHandler.java @@ -154,7 +154,7 @@ class IncomingVertxEventHandler implements ConnectorService { manualEncodeVertxMessageBody(msg.getBodyBuffer(), message.body(), type); try { - postOffice.route(msg, null, false); + postOffice.route(msg, false); } catch (Exception e) { ActiveMQVertxLogger.LOGGER.error("failed to route msg " + msg, e); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java index 5e0345f..c47d026 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/BridgeTestBase.java @@ -506,8 +506,9 @@ public abstract class BridgeTestBase extends ActiveMQTestBase { managementService = server1.getManagementService(); } AddressControl topicControl = (AddressControl) managementService.getResource(ResourceNames.ADDRESS + topic.getTopicName()); - Assert.assertEquals(0, topicControl.getQueueNames().length); - + if (topicControl != null) { + Assert.assertEquals(0, topicControl.getQueueNames().length); + } } protected void removeAllMessages(final String queueName, final int index) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java index 59408ab..5c56224 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java @@ -63,6 +63,7 @@ import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; @@ -775,6 +776,7 @@ public class ProtonTest extends ProtonTestBase { Exception expectedException = null; try { session.createSender("AnAddressThatDoesNotExist"); + fail("Creating a sender here on an address that doesn't exist should fail"); } catch (Exception e) { expectedException = e; } @@ -896,7 +898,7 @@ public class ProtonTest extends ProtonTestBase { //create request message for getQueueNames query AmqpMessage request = new AmqpMessage(); - request.setApplicationProperty("_AMQ_ResourceName", "core.server"); + request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER); request.setApplicationProperty("_AMQ_OperationName", "getQueueNames"); request.setReplyToAddress(destinationAddress); request.setText("[]"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java index 5094eba..006bef1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoCreateJmsDestinationTest.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.UUID; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.Queue; @@ -144,7 +145,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { connection.close(); - assertNotNull(server.getManagementService().getResource("core.address.test")); + assertNotNull(server.getManagementService().getResource(ResourceNames.ADDRESS + "test")); } @Test @@ -181,11 +182,11 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { connection.start(); assertNotNull(consumer.receive(500)); - assertNotNull(server.getManagementService().getResource("core.address." + topicName)); + assertNotNull(server.getManagementService().getResource(ResourceNames.ADDRESS + topicName)); connection.close(); - assertNull(server.getManagementService().getResource("core.address." + topicName)); + assertNull(server.getManagementService().getResource(ResourceNames.ADDRESS + topicName)); } @Test @@ -204,7 +205,7 @@ public class AutoCreateJmsDestinationTest extends JMSTestBase { connection.close(); - assertNotNull(server.getManagementService().getResource("core.address.test")); + assertNotNull(server.getManagementService().getResource(ResourceNames.ADDRESS + "test")); assertNotNull(server.locateQueue(SimpleString.toSimpleString("myClientID.myDurableSub"))); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 159a285..8822015 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -513,7 +513,7 @@ public class HangConsumerTest extends ActiveMQTestBase { } /* (non-Javadoc) - * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int) + * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int) */ @Override public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) { @@ -592,7 +592,7 @@ public class HangConsumerTest extends ActiveMQTestBase { SessionCallback callback, OperationContext context, boolean autoCreateQueue) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null, getPagingManager()); + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java index 2f72d8b..7f97100 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener; @@ -228,6 +229,7 @@ public class SessionTest extends ActiveMQTestBase { @Test public void testQueueQueryNoQ() throws Exception { + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsQueues(false)); cf = createSessionFactory(locator); ClientSession clientSession = cf.createSession(false, true, true); QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/AutoCreateQueueClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/AutoCreateQueueClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/AutoCreateQueueClusterTest.java index cda5494..eb3d184 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/AutoCreateQueueClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/AutoCreateQueueClusterTest.java @@ -58,13 +58,15 @@ public class AutoCreateQueueClusterTest extends JMSClusteredTestBase { Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer prod1 = session1.createProducer(ActiveMQJMSClient.createQueue("myQueue")); + // TODO the "jms." prefix is required here because the cluster connection only works for queues which start with "jms" + MessageProducer prod1 = session1.createProducer(ActiveMQJMSClient.createQueue("jms.myQueue")); prod1.setDeliveryMode(DeliveryMode.PERSISTENT); prod1.send(session1.createTextMessage("m1")); - MessageConsumer cons2 = session2.createConsumer(ActiveMQJMSClient.createQueue("myQueue")); + // TODO the "jms." prefix is required here because the cluster connection only works for queues which start with "jms" + MessageConsumer cons2 = session2.createConsumer(ActiveMQJMSClient.createQueue("jms.myQueue")); TextMessage received = (TextMessage) cons2.receive(5000); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java index 88dc68b..64d2af3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/jms2client/NonExistentQueueTest.java @@ -25,9 +25,7 @@ import javax.jms.JMSException; import javax.jms.JMSProducer; import javax.jms.JMSRuntimeException; import javax.jms.MessageProducer; -import javax.jms.Queue; import javax.jms.Session; -import java.util.Random; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; @@ -36,26 +34,14 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.JMSTestBase; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class NonExistentQueueTest extends JMSTestBase { - private JMSContext context; - private final Random random = new Random(); - private Queue queue; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - context = createContext(); - queue = createQueue(JmsContextTest.class.getSimpleName() + "Queue1"); - } - @Test public void sendToNonExistentDestination() throws Exception { server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsQueues(false)); + server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateJmsTopics(false)); Destination destination = ActiveMQJMSClient.createTopic("DoesNotExist"); TransportConfiguration transportConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName()); ConnectionFactory localConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index 29f280b..7863021 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.persistence; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; -import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; @@ -28,7 +27,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.UUID; import org.apache.activemq.artemis.api.core.Message; @@ -42,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator; @@ -51,7 +48,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.server.JMSServerManager; import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.tests.unit.util.InVMContext; @@ -436,163 +432,6 @@ public class XmlImportExportTest extends ActiveMQTestBase { } @Test - public void testJmsConnectionFactoryBinding() throws Exception { - final String clientId = "myClientId"; - final long clientFailureCheckPeriod = 1; - final long connectionTTl = 2; - final long callTimeout = 3; - final long callFailoverTimeout = 4; - final boolean cacheLargeMessagesClient = true; - final int minLargeMessageSize = 5; - final boolean compressLargeMessages = true; - final int consumerWindowSize = 6; - final int consumerMaxRate = 7; - final int confirmationWindowSize = 8; - final int producerWindowSize = 9; - final int producerMaxrate = 10; - final boolean blockOnAcknowledge = true; - final boolean blockOnDurableSend = false; - final boolean blockOnNonDurableSend = true; - final boolean autoGroup = true; - final boolean preacknowledge = true; - final String loadBalancingPolicyClassName = "myPolicy"; - final int transactionBatchSize = 11; - final int dupsOKBatchSize = 12; - final boolean useGlobalPools = true; - final int scheduledThreadPoolMaxSize = 13; - final int threadPoolMaxSize = 14; - final long retryInterval = 15; - final double retryIntervalMultiplier = 10.0; - final long maxRetryInterval = 16; - final int reconnectAttempts = 17; - final boolean failoverOnInitialConnection = true; - final String groupId = "myGroupId"; - final String name = "myFirstConnectionFactoryName"; - final String jndi_binding1 = name + "Binding1"; - final String jndi_binding2 = name + "Binding2"; - final JMSFactoryType type = JMSFactoryType.CF; - final boolean ha = true; - final List<String> connectors = Arrays.asList("in-vm1", "in-vm2"); - - ClientSession session = basicSetUp(); - - jmsServer.createConnectionFactory(name, ha, type, connectors, clientId, clientFailureCheckPeriod, connectionTTl, callTimeout, callFailoverTimeout, cacheLargeMessagesClient, minLargeMessageSize, compressLargeMessages, consumerWindowSize, consumerMaxRate, confirmationWindowSize, producerWindowSize, producerMaxrate, blockOnAcknowledge, blockOnDurableSend, blockOnNonDurableSend, autoGroup, preacknowledge, loadBalancingPolicyClassName, transactionBatchSize, dupsOKBatchSize, useGlobalPools, scheduledThreadPoolMaxSize, threadPoolMaxSize, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, failoverOnInitialConnection, groupId, jndi_binding1, jndi_binding2); - - jmsServer.createConnectionFactory("mySecondConnectionFactoryName", false, JMSFactoryType.CF, Arrays.asList("in-vm1", "in-vm2"), "mySecondConnectionFactoryName1", "mySecondConnectionFactoryName2"); - - session.close(); - locator.close(); - server.stop(); - - ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); - XmlDataExporter xmlDataExporter = new XmlDataExporter(); - xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsLocation().getAbsolutePath(), server.getConfiguration().getJournalLocation().getAbsolutePath(), server.getConfiguration().getPagingLocation().getAbsolutePath(), server.getConfiguration().getLargeMessagesLocation().getAbsolutePath()); - System.out.print(new String(xmlOutputStream.toByteArray())); - - clearDataRecreateServerDirs(); - server.start(); - checkForLongs(); - locator = createInVMNonHALocator(); - factory = createSessionFactory(locator); - session = factory.createSession(false, true, true); - - ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); - XmlDataImporter xmlDataImporter = new XmlDataImporter(); - xmlDataImporter.process(xmlInputStream, session); - - ConnectionFactory cf1 = (ConnectionFactory) namingContext.lookup(jndi_binding1); - assertNotNull(cf1); - ActiveMQConnectionFactory hcf1 = (ActiveMQConnectionFactory) cf1; - assertEquals(ha, hcf1.isHA()); - assertEquals(type.intValue(), hcf1.getFactoryType()); - assertEquals(clientId, hcf1.getClientID()); - assertEquals(clientFailureCheckPeriod, hcf1.getClientFailureCheckPeriod()); - assertEquals(connectionTTl, hcf1.getConnectionTTL()); - assertEquals(callTimeout, hcf1.getCallTimeout()); - // Assert.assertEquals(callFailoverTimeout, hcf1.getCallFailoverTimeout()); // this value isn't currently persisted by org.apache.activemq.artemis.jms.server.config.impl.ConnectionFactoryConfigurationImpl.encode() - // Assert.assertEquals(cacheLargeMessagesClient, hcf1.isCacheLargeMessagesClient()); // this value isn't currently supported by org.apache.activemq.artemis.api.jms.management.JMSServerControl.createConnectionFactory(java.lang.String, boolean, boolean, int, java.lang.String, java.lang.String, java.lang.String, long, long, long, long, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean, boolean, java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean, java.lang.String) - assertEquals(minLargeMessageSize, hcf1.getMinLargeMessageSize()); - // Assert.assertEquals(compressLargeMessages, hcf1.isCompressLargeMessage()); // this value isn't currently handled properly by org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl.createConnectionFactory(java.lang.String, boolean, org.apache.activemq.artemis.api.jms.JMSFactoryType, java.util.List<java.lang.String>, java.lang.String, long, long, long, long, boolean, int, boolean, int, int, int, int, int, boolean, boolean, boolean, boolean, boolean, java.lang.String, int, int, boolean, int, int, long, double, long, int, boolean, java.lang.String, java.lang.String...)() - assertEquals(consumerWindowSize, hcf1.getConsumerWindowSize()); - assertEquals(consumerMaxRate, hcf1.getConsumerMaxRate()); - assertEquals(confirmationWindowSize, hcf1.getConfirmationWindowSize()); - assertEquals(producerWindowSize, hcf1.getProducerWindowSize()); - assertEquals(producerMaxrate, hcf1.getProducerMaxRate()); - assertEquals(blockOnAcknowledge, hcf1.isBlockOnAcknowledge()); - assertEquals(blockOnDurableSend, hcf1.isBlockOnDurableSend()); - assertEquals(blockOnNonDurableSend, hcf1.isBlockOnNonDurableSend()); - assertEquals(autoGroup, hcf1.isAutoGroup()); - assertEquals(preacknowledge, hcf1.isPreAcknowledge()); - assertEquals(loadBalancingPolicyClassName, hcf1.getConnectionLoadBalancingPolicyClassName()); - assertEquals(transactionBatchSize, hcf1.getTransactionBatchSize()); - assertEquals(dupsOKBatchSize, hcf1.getDupsOKBatchSize()); - assertEquals(useGlobalPools, hcf1.isUseGlobalPools()); - assertEquals(scheduledThreadPoolMaxSize, hcf1.getScheduledThreadPoolMaxSize()); - assertEquals(threadPoolMaxSize, hcf1.getThreadPoolMaxSize()); - assertEquals(retryInterval, hcf1.getRetryInterval()); - assertEquals(retryIntervalMultiplier, hcf1.getRetryIntervalMultiplier(), 0); - assertEquals(maxRetryInterval, hcf1.getMaxRetryInterval()); - assertEquals(reconnectAttempts, hcf1.getReconnectAttempts()); - assertEquals(failoverOnInitialConnection, hcf1.isFailoverOnInitialConnection()); - assertEquals(groupId, hcf1.getGroupID()); - - assertNotNull(namingContext.lookup(jndi_binding2)); - assertNotNull(namingContext.lookup("mySecondConnectionFactoryName1")); - assertNotNull(namingContext.lookup("mySecondConnectionFactoryName2")); - } - - @Test - public void testJmsDestination() throws Exception { - ClientSession session = basicSetUp(); - - jmsServer.createQueue(true, "myQueue", null, true, "myQueueJndiBinding1", "myQueueJndiBinding2"); - jmsServer.createTopic(true, "myTopic", "myTopicJndiBinding1", "myTopicJndiBinding2"); - - session.close(); - locator.close(); - server.stop(); - - ByteArrayOutputStream xmlOutputStream = new ByteArrayOutputStream(); - XmlDataExporter xmlDataExporter = new XmlDataExporter(); - xmlDataExporter.process(xmlOutputStream, server.getConfiguration().getBindingsDirectory(), server.getConfiguration().getJournalDirectory(), server.getConfiguration().getPagingDirectory(), server.getConfiguration().getLargeMessagesDirectory()); - System.out.print(new String(xmlOutputStream.toByteArray())); - - clearDataRecreateServerDirs(); - server.start(); - checkForLongs(); - locator = createInVMNonHALocator(); - factory = createSessionFactory(locator); - session = factory.createSession(false, true, true); - - ByteArrayInputStream xmlInputStream = new ByteArrayInputStream(xmlOutputStream.toByteArray()); - XmlDataImporter xmlDataImporter = new XmlDataImporter(); - xmlDataImporter.process(xmlInputStream, session); - - assertNotNull(namingContext.lookup("myQueueJndiBinding1")); - assertNotNull(namingContext.lookup("myQueueJndiBinding2")); - assertNotNull(namingContext.lookup("myTopicJndiBinding1")); - assertNotNull(namingContext.lookup("myTopicJndiBinding2")); - - jmsServer.createConnectionFactory("test-cf", false, JMSFactoryType.CF, Arrays.asList("in-vm1"), "test-cf"); - - ConnectionFactory cf = (ConnectionFactory) namingContext.lookup("test-cf"); - Connection connection = cf.createConnection(); - Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = jmsSession.createProducer((Destination) namingContext.lookup("myQueueJndiBinding1")); - producer.send(jmsSession.createTextMessage()); - MessageConsumer consumer = jmsSession.createConsumer((Destination) namingContext.lookup("myQueueJndiBinding2")); - connection.start(); - assertNotNull(consumer.receive(3000)); - - consumer = jmsSession.createConsumer((Destination) namingContext.lookup("myTopicJndiBinding1")); - producer = jmsSession.createProducer((Destination) namingContext.lookup("myTopicJndiBinding2")); - producer.send(jmsSession.createTextMessage()); - assertNotNull(consumer.receive(3000)); - - connection.close(); - } - - @Test public void testLargeMessage() throws Exception { server = createServer(true); server.start(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java deleted file mode 100644 index 6917cfb..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.stomp; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.core.protocol.stomp.Stomp; -import org.junit.Assert; -import org.junit.Test; - -public class ConcurrentStompTest extends StompTestBase { - - private Socket stompSocket_2; - - private ByteArrayOutputStream inputBuffer_2; - - /** - * Send messages on 1 socket and receives them concurrently on another socket. - */ - @Test - public void testSendManyMessages() throws Exception { - try { - String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - - sendFrame(connect); - String connected = receiveFrame(10000); - Assert.assertTrue(connected.startsWith("CONNECTED")); - - stompSocket_2 = createSocket(); - inputBuffer_2 = new ByteArrayOutputStream(); - - sendFrame(stompSocket_2, connect); - connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000); - Assert.assertTrue(connected.startsWith("CONNECTED")); - - final int count = 1000; - final CountDownLatch latch = new CountDownLatch(count); - - String subscribe = "SUBSCRIBE\n" + - "destination:" + getQueuePrefix() + getQueueName() + "\n" + - "ack:auto\n\n" + - Stomp.NULL; - sendFrame(stompSocket_2, subscribe); - Thread.sleep(2000); - - new Thread() { - @Override - public void run() { - int i = 0; - while (true) { - try { - String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - System.out.println("<<< " + i++); - latch.countDown(); - } catch (Exception e) { - break; - } - } - } - }.start(); - - String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n"; - for (int i = 1; i <= count; i++) { - // Thread.sleep(1); - System.out.println(">>> " + i); - sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL); - } - - assertTrue(latch.await(60, TimeUnit.SECONDS)); - - } finally { - stompSocket_2.close(); - inputBuffer_2.close(); - } - - } - - // Implementation methods - // ------------------------------------------------------------------------- - public void sendFrame(Socket socket, String data) throws Exception { - byte[] bytes = data.getBytes(StandardCharsets.UTF_8); - OutputStream outputStream = socket.getOutputStream(); - for (byte b : bytes) { - outputStream.write(b); - } - outputStream.flush(); - } - - public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception { - socket.setSoTimeout((int) timeOut); - InputStream is = socket.getInputStream(); - int c = 0; - for (;;) { - c = is.read(); - if (c < 0) { - throw new IOException("socket closed."); - } else if (c == 0) { - c = is.read(); - if (c != '\n') { - byte[] ba = input.toByteArray(); - System.out.println(new String(ba, StandardCharsets.UTF_8)); - } - Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n'); - byte[] ba = input.toByteArray(); - input.reset(); - return new String(ba, StandardCharsets.UTF_8); - } else { - input.write(c); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java deleted file mode 100644 index a0dcdbf..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java +++ /dev/null @@ -1,848 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.stomp; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.QueueBrowser; -import javax.jms.TextMessage; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.Interceptor; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.protocol.core.Packet; -import org.apache.activemq.artemis.core.protocol.stomp.Stomp; -import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; -import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; -import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; -import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; -import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServers; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.config.JMSConfiguration; -import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; -import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl; -import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl; -import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; -import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; -import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; -import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; -import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class ExtraStompTest extends StompTestBase { - - @Override - @Before - public void setUp() throws Exception { - autoCreateServer = false; - super.setUp(); - } - - @Test - public void testConnectionTTL() throws Exception { - try { - server = createServerWithTTL("2000"); - server.start(); - - setUpAfterServer(); - - String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL; - sendFrame(connect_frame); - - String f = receiveFrame(10000); - Assert.assertTrue(f.startsWith("CONNECTED")); - Assert.assertTrue(f.indexOf("response-id:1") >= 0); - - String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL; - sendFrame(frame); - - assertChannelClosed(); - - MessageConsumer consumer = session.createConsumer(queue); - - TextMessage message = (TextMessage) consumer.receiveNoWait(); - Assert.assertNotNull(message); - - message = (TextMessage) consumer.receiveNoWait(); - Assert.assertNull(message); - } finally { - cleanUp(); - server.stop(); - } - } - - @Test - public void testEnableMessageID() throws Exception { - enableMessageIDTest(true); - } - - @Test - public void testDisableMessageID() throws Exception { - enableMessageIDTest(false); - } - - @Test - public void testDefaultEnableMessageID() throws Exception { - enableMessageIDTest(null); - } - - //stomp sender -> large -> stomp receiver - @Test - public void testSendReceiveLargePersistentMessages() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); - int count = 10; - int szBody = 1024 * 1024; - char[] contents = new char[szBody]; - for (int i = 0; i < szBody; i++) { - contents[i] = 'A'; - } - String body = new String(contents); - - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "persistent:true\n" + "\n\n" + body + Stomp.NULL; - - for (int i = 0; i < count; i++) { - sendFrame(frame); - } - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); - - for (int i = 0; i < count; i++) { - frame = receiveFrame(60000); - Assert.assertNotNull(frame); - System.out.println("part of frame: " + frame.substring(0, 200)); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - int index = frame.indexOf("AAAA"); - assertEquals(szBody, (frame.length() - index)); - } - - // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt:567\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large -> stomp receiver - @Test - public void testReceiveLargePersistentMessagesFromCore() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(); - - int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; - char[] contents = new char[msgSize]; - for (int i = 0; i < msgSize; i++) { - contents[i] = 'B'; - } - String msg = new String(contents); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); - - for (int i = 0; i < count; i++) { - frame = receiveFrame(60000); - Assert.assertNotNull(frame); - System.out.println("part of frame: " + frame.substring(0, 250)); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - int index = frame.indexOf("BBBB"); - assertEquals(msgSize, (frame.length() - index)); - } - - // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt:567\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //stomp v12 sender -> large -> stomp v12 receiver - @Test - public void testSendReceiveLargePersistentMessagesV12() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(); - - StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); - connV12.connect(defUser, defPass); - - int count = 10; - int szBody = 1024 * 1024; - char[] contents = new char[szBody]; - for (int i = 0; i < szBody; i++) { - contents[i] = 'A'; - } - String body = new String(contents); - - ClientStompFrame frame = connV12.createFrame("SEND"); - frame.addHeader("destination", getQueuePrefix() + getQueueName()); - frame.addHeader("persistent", "true"); - frame.setBody(body); - - for (int i = 0; i < count; i++) { - connV12.sendFrame(frame); - } - - ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV12.sendFrame(subFrame); - - for (int i = 0; i < count; i++) { - ClientStompFrame receiveFrame = connV12.receiveFrame(30000); - - Assert.assertNotNull(receiveFrame); - System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); - Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); - Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); - assertEquals(szBody, receiveFrame.getBody().length()); - } - - // remove susbcription - ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV12.sendFrame(unsubFrame); - - connV12.disconnect(); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large -> stomp v12 receiver - @Test - public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(); - - int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; - char[] contents = new char[msgSize]; - for (int i = 0; i < msgSize; i++) { - contents[i] = 'B'; - } - String msg = new String(contents); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); - connV12.connect(defUser, defPass); - - ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV12.sendFrame(subFrame); - - for (int i = 0; i < count; i++) { - ClientStompFrame receiveFrame = connV12.receiveFrame(30000); - - Assert.assertNotNull(receiveFrame); - System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); - Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); - Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); - assertEquals(msgSize, receiveFrame.getBody().length()); - } - - // remove susbcription - ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV12.sendFrame(unsubFrame); - - connV12.disconnect(); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large (compressed regular) -> stomp v10 receiver - @Test - public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(true); - - LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); - LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - - char[] contents = input.toArray(); - String msg = new String(contents); - - String leadingPart = msg.substring(0, 100); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); - - for (int i = 0; i < count; i++) { - frame = receiveFrame(60000); - Assert.assertNotNull(frame); - System.out.println("part of frame: " + frame.substring(0, 250)); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - int index = frame.indexOf(leadingPart); - assertEquals(msg.length(), (frame.length() - index)); - } - - // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt:567\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large (compressed regular) -> stomp v12 receiver - @Test - public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(true); - - LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); - LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - - char[] contents = input.toArray(); - String msg = new String(contents); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); - connV12.connect(defUser, defPass); - - ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV12.sendFrame(subFrame); - - for (int i = 0; i < count; i++) { - ClientStompFrame receiveFrame = connV12.receiveFrame(30000); - - Assert.assertNotNull(receiveFrame); - System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); - Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); - Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); - assertEquals(contents.length, receiveFrame.getBody().length()); - } - - // remove susbcription - ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV12.sendFrame(unsubFrame); - - connV12.disconnect(); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large (compressed large) -> stomp v12 receiver - @Test - public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(true); - - LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); - input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - - char[] contents = input.toArray(); - String msg = new String(contents); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); - connV12.connect(defUser, defPass); - - ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); - subFrame.addHeader("id", "a-sub"); - subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); - subFrame.addHeader("ack", "auto"); - - connV12.sendFrame(subFrame); - - for (int i = 0; i < count; i++) { - ClientStompFrame receiveFrame = connV12.receiveFrame(30000); - - Assert.assertNotNull(receiveFrame); - System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); - Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); - Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); - assertEquals(contents.length, receiveFrame.getBody().length()); - } - - // remove susbcription - ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); - unsubFrame.addHeader("id", "a-sub"); - connV12.sendFrame(unsubFrame); - - connV12.disconnect(); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - //core sender -> large (compressed large) -> stomp v10 receiver - @Test - public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception { - try { - server = createPersistentServerWithStompMinLargeSize(2048); - server.start(); - - setUpAfterServer(true); - - LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); - input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); - - char[] contents = input.toArray(); - String msg = new String(contents); - - String leadingPart = msg.substring(0, 100); - - int count = 10; - for (int i = 0; i < count; i++) { - this.sendMessage(msg); - } - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); - - for (int i = 0; i < count; i++) { - frame = receiveFrame(60000); - Assert.assertNotNull(frame); - System.out.println("part of frame: " + frame.substring(0, 250)); - Assert.assertTrue(frame.startsWith("MESSAGE")); - Assert.assertTrue(frame.indexOf("destination:") > 0); - int index = frame.indexOf(leadingPart); - assertEquals(msg.length(), (frame.length() - index)); - } - - // remove suscription - frame = "UNSUBSCRIBE\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n" + - "receipt:567\n" + - "\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - sendFrame(frame); - } catch (Exception ex) { - ex.printStackTrace(); - throw ex; - } finally { - cleanUp(); - server.stop(); - } - } - - protected JMSServerManager createPersistentServerWithStompMinLargeSize(int sz) throws Exception { - Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); - params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); - params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); - params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, sz); - TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - - Configuration config = createBasicConfig().setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); - - ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); - - JMSConfiguration jmsConfig = new JMSConfigurationImpl(); - jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName())); - jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName())); - server = new JMSServerManagerImpl(activeMQServer, jmsConfig); - server.setRegistry(new JndiBindingRegistry((new InVMNamingContext()))); - return server; - } - - private void enableMessageIDTest(Boolean enable) throws Exception { - try { - server = createServerWithExtraStompOptions(null, enable); - server.start(); - - setUpAfterServer(); - - String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL; - sendFrame(connect_frame); - - String f = receiveFrame(10000); - Assert.assertTrue(f.startsWith("CONNECTED")); - Assert.assertTrue(f.indexOf("response-id:1") >= 0); - - String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 2" + Stomp.NULL; - - sendFrame(frame); - - QueueBrowser browser = session.createBrowser(queue); - - Enumeration enu = browser.getEnumeration(); - - while (enu.hasMoreElements()) { - Message msg = (Message) enu.nextElement(); - String msgId = msg.getStringProperty("amqMessageId"); - if (enable != null && enable.booleanValue()) { - assertNotNull(msgId); - assertTrue(msgId.indexOf("STOMP") == 0); - } else { - assertNull(msgId); - } - } - - browser.close(); - - MessageConsumer consumer = session.createConsumer(queue); - - TextMessage message = (TextMessage) consumer.receive(1000); - Assert.assertNotNull(message); - - message = (TextMessage) consumer.receive(1000); - Assert.assertNotNull(message); - - message = (TextMessage) consumer.receive(2000); - Assert.assertNull(message); - } finally { - cleanUp(); - server.stop(); - } - } - - protected JMSServerManager createServerWithTTL(String ttl) throws Exception { - return createServerWithExtraStompOptions(ttl, null); - } - - protected JMSServerManager createServerWithExtraStompOptions(String ttl, Boolean enableMessageID) throws Exception { - - Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); - params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); - if (ttl != null) { - params.put(TransportConstants.CONNECTION_TTL, ttl); - } - if (enableMessageID != null) { - params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, enableMessageID); - } - params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); - TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - - Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)); - - ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); - - JMSConfiguration jmsConfig = new JMSConfigurationImpl(); - jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName())); - jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName())); - server = new JMSServerManagerImpl(activeMQServer, jmsConfig); - server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); - return server; - } - - public static class MyCoreInterceptor implements Interceptor { - - static List<Packet> incomingInterceptedFrames = new ArrayList<>(); - - @Override - public boolean intercept(Packet packet, RemotingConnection connection) { - incomingInterceptedFrames.add(packet); - return true; - } - - } - - public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor { - - static List<StompFrame> incomingInterceptedFrames = new ArrayList<>(); - - @Override - public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { - incomingInterceptedFrames.add(stompFrame); - stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal"); - return true; - } - } - - public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor { - - static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>(); - - @Override - public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { - outgoingInterceptedFrames.add(stompFrame); - stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal"); - return true; - } - } - - @Test - public void stompFrameInterceptor() throws Exception { - MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear(); - MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear(); - try { - List<String> incomingInterceptorList = new ArrayList<>(); - incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor"); - incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor"); - List<String> outgoingInterceptorList = new ArrayList<>(); - outgoingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor"); - - server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList); - server.start(); - - setUpAfterServer(); // This will make some calls through core - - // So we clear them here - MyCoreInterceptor.incomingInterceptedFrames.clear(); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(100000); - - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL; - sendFrame(frame); - - assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size()); - sendMessage(getName()); - - // Something was supposed to be called on sendMessages - assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0); - - receiveFrame(10000); - - frame = "SEND\n" + "destination:" + - getQueuePrefix() + - getQueueName() + - "\n\n" + - "Hello World" + - Stomp.NULL; - sendFrame(frame); - - receiveFrame(10000); - - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; - - sendFrame(frame); - - } finally { - cleanUp(); - server.stop(); - } - - List<String> incomingCommands = new ArrayList<>(4); - incomingCommands.add("CONNECT"); - incomingCommands.add("SUBSCRIBE"); - incomingCommands.add("SEND"); - incomingCommands.add("DISCONNECT"); - - List<String> outgoingCommands = new ArrayList<>(3); - outgoingCommands.add("CONNECTED"); - outgoingCommands.add("MESSAGE"); - outgoingCommands.add("MESSAGE"); - - long timeout = System.currentTimeMillis() + 1000; - - // Things are async, giving some time to things arrive before we actually assert - while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 && - MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 && - timeout > System.currentTimeMillis()) { - Thread.sleep(10); - } - - Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size()); - Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size()); - - for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) { - Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand()); - Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); - } - - for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) { - Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand()); - } - - Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); - Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); - } - - protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor, - List<String> stompOutgoingInterceptor) throws Exception { - - Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); - params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); - params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); - TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - - Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)).setIncomingInterceptorClassNames(stompIncomingInterceptor).setOutgoingInterceptorClassNames(stompOutgoingInterceptor); - - ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); - - JMSConfiguration jmsConfig = new JMSConfigurationImpl(); - jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName())); - jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName())); - server = new JMSServerManagerImpl(hornetQServer, jmsConfig); - server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); - return server; - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java index 419b339..ac89c1d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java @@ -19,32 +19,20 @@ package org.apache.activemq.artemis.tests.integration.stomp; import javax.jms.Message; import javax.jms.MessageConsumer; -import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.jms.server.JMSServerManager; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.junit.Test; -public class StompConnectionCleanupTest extends StompTestBase { +public class StompConnectionCleanupTest extends StompTest { private static final long CONNECTION_TTL = 2000; // ARTEMIS-231 @Test public void testConnectionCleanupWithTopicSubscription() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); + conn.connect(defUser, defPass); - //We send and consumer a message to ensure a STOMP connection and server session is created - - System.out.println("Received frame: " + frame); - - assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "DISCONNECT\n\n" + Stomp.NULL; - sendFrame(frame); + subscribeTopic(conn, null, "auto", null); // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data // is being sent @@ -72,25 +60,16 @@ public class StompConnectionCleanupTest extends StompTestBase { @Test public void testConnectionCleanup() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); + conn.connect(defUser, defPass); - //We send and consumer a message to ensure a STOMP connection and server session is created + subscribe(conn, null, "auto", null); - System.out.println("Received frame: " + frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); - assertTrue(frame.startsWith("CONNECTED")); + ClientStompFrame frame = conn.receiveFrame(10000); - frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - assertTrue(frame.startsWith("MESSAGE")); - assertTrue(frame.indexOf("destination:") > 0); + assertTrue(frame.getCommand().equals("MESSAGE")); + assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName())); // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data // is being sent @@ -118,13 +97,7 @@ public class StompConnectionCleanupTest extends StompTestBase { @Test public void testConnectionNotCleanedUp() throws Exception { - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - frame = receiveFrame(10000); - - //We send and consumer a message to ensure a STOMP connection and server session is created - - assertTrue(frame.startsWith("CONNECTED")); + conn.connect(defUser, defPass); MessageConsumer consumer = session.createConsumer(queue); @@ -136,8 +109,7 @@ public class StompConnectionCleanupTest extends StompTestBase { while (true) { //Send and receive a msg - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; - sendFrame(frame); + send(conn, getQueuePrefix() + getQueueName(), null, "Hello World"); Message msg = consumer.receive(1000); assertNotNull(msg); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java deleted file mode 100644 index 138e37c..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.stomp; - -import java.nio.charset.StandardCharsets; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequestEncoder; -import io.netty.handler.codec.http.HttpResponseDecoder; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; - -public class StompOverHttpTest extends StompTest { - - @Override - protected void addChannelHandlers(int index, SocketChannel ch) { - ch.pipeline().addLast(new HttpRequestEncoder()); - ch.pipeline().addLast(new HttpResponseDecoder()); - ch.pipeline().addLast(new HttpHandler()); - ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler(index)); - } - - @Override - public String receiveFrame(long timeOut) throws Exception { - //we are request/response so may need to send an empty request so we get responses piggy backed - sendFrame(new byte[]{}); - return super.receiveFrame(timeOut); - } - - class HttpHandler extends ChannelDuplexHandler { - - @Override - public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { - if (msg instanceof DefaultHttpContent) { - DefaultHttpContent response = (DefaultHttpContent) msg; - ctx.fireChannelRead(response.content()); - } - } - - @Override - public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "", buf); - httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes())); - ctx.write(httpRequest, promise); - } else { - ctx.write(msg, promise); - } - } - } -}
