http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index 26f2a2f..bcac436 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -26,34 +26,17 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import java.io.IOException; -import java.net.Socket; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.string.StringDecoder; -import io.netty.handler.codec.string.StringEncoder; +import java.util.UUID; + import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; @@ -63,6 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.apache.activemq.artemis.jms.server.JMSServerManager; @@ -73,13 +57,16 @@ import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.After; import org.junit.Before; public abstract class StompTestBase extends ActiveMQTestBase { + protected String hostname = "127.0.0.1"; + protected final int port = 61613; private ConnectionFactory connectionFactory; @@ -98,98 +85,56 @@ public abstract class StompTestBase extends ActiveMQTestBase { protected String defPass = "wombats"; - protected boolean autoCreateServer = true; - - private List<Bootstrap> bootstraps = new ArrayList<>(); - - // private Channel channel; - - private List<BlockingQueue<String>> priorityQueues = new ArrayList<>(); - - private List<EventLoopGroup> groups = new ArrayList<>(); - - private List<Channel> channels = new ArrayList<>(); - // Implementation methods // ------------------------------------------------------------------------- - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - if (autoCreateServer) { - server = createServer(); - addServer(server.getActiveMQServer()); - server.start(); - connectionFactory = createConnectionFactory(); - createBootstrap(); - - if (isSecurityEnabled()) { - connection = connectionFactory.createConnection("brianm", "wombats"); - } else { - connection = connectionFactory.createConnection(); - } - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = session.createQueue(getQueueName()); - topic = session.createTopic(getTopicName()); - connection.start(); - } + public boolean isCompressLargeMessages() { + return false; } - private void createBootstrap() { - createBootstrap(0, port); + public boolean isSecurityEnabled() { + return false; } - protected void createBootstrap(int port) { - createBootstrap(0, port); + public boolean isPersistenceEnabled() { + return false; } - protected void createBootstrap(final int index, int port) { - priorityQueues.add(index, new ArrayBlockingQueue<String>(1000)); - groups.add(index, new NioEventLoopGroup()); - bootstraps.add(index, new Bootstrap()); - bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - addChannelHandlers(index, ch); - } - }); - - // Start the client. - try { - channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel()); - handshake(); - } catch (InterruptedException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - + public boolean isEnableStompMessageId() { + return false; } - protected void handshake() throws InterruptedException { + public Integer getStompMinLargeMessageSize() { + return null; } - protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException { - ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler(index)); + public List<String> getIncomingInterceptors() { + return null; } - protected void setUpAfterServer() throws Exception { - setUpAfterServer(false); + public List<String> getOutgoingInterceptors() { + return null; } - protected void setUpAfterServer(boolean jmsCompressLarge) throws Exception { + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + server = createServer(); + server.start(); connectionFactory = createConnectionFactory(); - ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) connectionFactory; - activeMQConnectionFactory.setCompressLargeMessage(jmsCompressLarge); - createBootstrap(); + ((ActiveMQConnectionFactory)connectionFactory).setCompressLargeMessage(isCompressLargeMessages()); - connection = connectionFactory.createConnection(); - connection.start(); + if (isSecurityEnabled()) { + connection = connectionFactory.createConnection("brianm", "wombats"); + } else { + connection = connectionFactory.createConnection(); + } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue(getQueueName()); topic = session.createTopic(getTopicName()); - + connection.start(); } /** @@ -198,14 +143,30 @@ public abstract class StompTestBase extends ActiveMQTestBase { */ protected JMSServerManager createServer() throws Exception { Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "," + MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME); params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); + if (isEnableStompMessageId()) { + params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, true); + } + if (getStompMinLargeMessageSize() != null) { + params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, 2048); + } TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName()); - Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); - config.addAcceptorConfiguration(allTransport); + Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()) + .setPersistenceEnabled(isPersistenceEnabled()) + .addAcceptorConfiguration(stompTransport) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())) + .setConnectionTtlCheckInterval(500); + + if (getIncomingInterceptors() != null) { + config.setIncomingInterceptorClassNames(getIncomingInterceptors()); + } + + if (getOutgoingInterceptors() != null) { + config.setOutgoingInterceptorClassNames(getOutgoingInterceptors()); + } ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass)); @@ -222,195 +183,350 @@ public abstract class StompTestBase extends ActiveMQTestBase { } JMSConfiguration jmsConfig = new JMSConfigurationImpl(); - jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName())); + jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName())); jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName())); server = new JMSServerManagerImpl(activeMQServer, jmsConfig); server.setRegistry(new JndiBindingRegistry(new InVMNamingContext())); return server; } - @Override - @After - public void tearDown() throws Exception { - if (autoCreateServer) { - connection.close(); - - for (EventLoopGroup group : groups) { - if (group != null) { - for (Channel channel : channels) { - channel.close(); - } - group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); - } - } - } - super.tearDown(); + protected ConnectionFactory createConnectionFactory() { + return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName())); } - protected void cleanUp() throws Exception { - connection.close(); - if (groups.get(0) != null) { - groups.get(0).shutdown(); - } + protected String getQueueName() { + return "testQueue"; } - protected void reconnect() throws Exception { - reconnect(0); + protected String getQueuePrefix() { + return ""; } - protected void reconnect(long sleep) throws Exception { - groups.get(0).shutdown(); + protected String getTopicName() { + return "testtopic"; + } - if (sleep > 0) { - Thread.sleep(sleep); - } + protected String getTopicPrefix() { + return ""; + } - createBootstrap(); + public void sendJmsMessage(String msg) throws Exception { + sendJmsMessage(msg, queue); } - protected ConnectionFactory createConnectionFactory() { - return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName())); + public void sendJmsMessage(String msg, Destination destination) throws Exception { + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage(msg); + producer.send(message); } - protected Socket createSocket() throws IOException { - return new Socket("localhost", port); + public void sendJmsMessage(byte[] data, Destination destination) throws Exception { + sendJmsMessage(data, "foo", "xyz", destination); } - protected String getQueueName() { - return "test"; + public void sendJmsMessage(String msg, String propertyName, String propertyValue) throws Exception { + sendJmsMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue); } - protected String getQueuePrefix() { - return ""; + public void sendJmsMessage(byte[] data, + String propertyName, + String propertyValue, + Destination destination) throws Exception { + MessageProducer producer = session.createProducer(destination); + BytesMessage message = session.createBytesMessage(); + message.setStringProperty(propertyName, propertyValue); + message.writeBytes(data); + producer.send(message); } - protected String getTopicName() { - return "testtopic"; + public void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException { + ClientStompFrame abortFrame = conn.createFrame(Stomp.Commands.ABORT) + .addHeader(Stomp.Headers.TRANSACTION, txID); + + conn.sendFrame(abortFrame); } - protected String getTopicPrefix() { - return ""; + public void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException { + ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.BEGIN) + .addHeader(Stomp.Headers.TRANSACTION, txID); + + conn.sendFrame(beginFrame); } - protected void assertChannelClosed() throws InterruptedException { - assertChannelClosed(0); + public void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException { + commitTransaction(conn, txID, false); } - protected void assertChannelClosed(int index) throws InterruptedException { - boolean closed = channels.get(index).closeFuture().await(5000); - assertTrue("channel not closed", closed); + public void commitTransaction(StompClientConnection conn, + String txID, + boolean receipt) throws IOException, InterruptedException { + ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.COMMIT) + .addHeader(Stomp.Headers.TRANSACTION, txID); + String uuid = UUID.randomUUID().toString(); + if (receipt) { + beginFrame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + } + ClientStompFrame resp = conn.sendFrame(beginFrame); + if (receipt) { + assertEquals(uuid, resp.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + } } - public void sendFrame(String data) throws Exception { - IntegrationTestLogger.LOGGER.info("Sending: " + data); - sendFrame(0, data); + public void ack(StompClientConnection conn, + String subscriptionId, + ClientStompFrame messageIdFrame) throws IOException, InterruptedException { + String messageID = messageIdFrame.getHeader(Stomp.Headers.Message.MESSAGE_ID); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK) + .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID); + + if (subscriptionId != null) { + frame.addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId); + } + + ClientStompFrame response = conn.sendFrame(frame); + if (response != null) { + throw new IOException("failed to ack " + response); + } } - public void sendFrame(int index, String data) throws Exception { - channels.get(index).writeAndFlush(data); + public void ack(StompClientConnection conn, + String subscriptionId, + String mid, + String txID) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK) + .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId) + .addHeader(Stomp.Headers.Message.MESSAGE_ID, mid); + if (txID != null) { + frame.addHeader(Stomp.Headers.TRANSACTION, txID); + } + + conn.sendFrame(frame); } - public void sendFrame(byte[] data) throws Exception { - sendFrame(0, data); + public void nack(StompClientConnection conn, String subscriptionId, String messageId) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.NACK) + .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId) + .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageId); + + conn.sendFrame(frame); } - public void sendFrame(int index, byte[] data) throws Exception { - ByteBuf buffer = Unpooled.buffer(data.length); - buffer.writeBytes(data); - channels.get(index).writeAndFlush(buffer); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null); } - public String receiveFrame(long timeOut) throws Exception { - return receiveFrame(0, timeOut); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, ack, null, null); } - public String receiveFrame(int index, long timeOut) throws Exception { - String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS); - return msg; + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, ack, durableId, null); } - public void sendMessage(String msg) throws Exception { - sendMessage(msg, queue); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + boolean receipt) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, ack, durableId, null, receipt); } - public void sendMessage(String msg, Destination destination) throws Exception { - MessageProducer producer = session.createProducer(destination); - TextMessage message = session.createTextMessage(msg); - producer.send(message); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + String selector) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, ack, durableId, selector, false); } - public void sendMessage(byte[] data, Destination destination) throws Exception { - sendMessage(data, "foo", "xyz", destination); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + String selector, + boolean receipt) throws IOException, InterruptedException { + return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt); } - public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception { - sendMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue); + public ClientStompFrame subscribe(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + String selector, + String destination, + boolean receipt) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, AddressInfo.RoutingType.ANYCAST.toString()) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination); + if (subscriptionId != null) { + frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId); + } + if (ack != null) { + frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack); + } + if (durableId != null) { + frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId); + } + if (selector != null) { + frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector); + } + String uuid = UUID.randomUUID().toString(); + if (receipt) { + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + } + + frame = conn.sendFrame(frame); + + if (receipt) { + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + } + + return frame; } - public void sendMessage(byte[] data, - String propertyName, - String propertyValue, - Destination destination) throws Exception { - MessageProducer producer = session.createProducer(destination); - BytesMessage message = session.createBytesMessage(); - message.setStringProperty(propertyName, propertyValue); - message.writeBytes(data); - producer.send(message); + public ClientStompFrame subscribeTopic(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId) throws IOException, InterruptedException { + return subscribeTopic(conn, subscriptionId, ack, durableId, false); } - protected void waitForReceipt() throws Exception { - String frame = receiveFrame(50000); - assertNotNull(frame); - assertTrue(frame.indexOf("RECEIPT") > -1); + public ClientStompFrame subscribeTopic(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + boolean receipt) throws IOException, InterruptedException { + return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false); } - protected void waitForFrameToTakeEffect() throws InterruptedException { - // bit of a dirty hack :) - // another option would be to force some kind of receipt to be returned - // from the frame - Thread.sleep(500); + public ClientStompFrame subscribeTopic(StompClientConnection conn, + String subscriptionId, + String ack, + String durableId, + boolean receipt, + boolean noLocal) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) + .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, AddressInfo.RoutingType.MULTICAST.toString()) + .addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + getTopicName()); + if (subscriptionId != null) { + frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId); + } + if (ack != null) { + frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack); + } + if (durableId != null) { + frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId); + } + String uuid = UUID.randomUUID().toString(); + if (receipt) { + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + } + if (noLocal) { + frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true"); + } + + frame = conn.sendFrame(frame); + + if (receipt) { + assertNotNull("Requested receipt, but response is null", frame); + assertTrue(frame.getHeader(Stomp.Headers.Response.RECEIPT_ID).equals(uuid)); + } + + return frame; } - public boolean isSecurityEnabled() { - return false; + public ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException { + return unsubscribe(conn, subscriptionId, null, false, false); } - class StompClientHandler extends SimpleChannelInboundHandler<String> { + public ClientStompFrame unsubscribe(StompClientConnection conn, + String subscriptionId, + boolean receipt) throws IOException, InterruptedException { + return unsubscribe(conn, subscriptionId, null, receipt, false); + } - int index = 0; + public ClientStompFrame unsubscribe(StompClientConnection conn, + String subscriptionId, + String destination, + boolean receipt, + boolean durable) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE); + if (durable && subscriptionId != null) { + frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subscriptionId); + } else if (!durable && subscriptionId != null) { + frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId); + } - StompClientHandler(int index) { - this.index = index; + if (destination != null) { + frame.addHeader(Stomp.Headers.Unsubscribe.DESTINATION, destination); } - StringBuffer currentMessage = new StringBuffer(""); + String uuid = UUID.randomUUID().toString(); - @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { - currentMessage.append(msg); - String fullMessage = currentMessage.toString(); - if (fullMessage.contains("\0\n")) { - int messageEnd = fullMessage.indexOf("\0\n"); - String actualMessage = fullMessage.substring(0, messageEnd); - fullMessage = fullMessage.substring(messageEnd + 2); - currentMessage = new StringBuffer(""); - BlockingQueue queue = priorityQueues.get(index); - if (queue == null) { - queue = new ArrayBlockingQueue(1000); - priorityQueues.add(index, queue); - } - queue.add(actualMessage); - if (fullMessage.length() > 0) { - channelRead(ctx, fullMessage); - } - } + if (receipt) { + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); } - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - cause.printStackTrace(); - ctx.close(); + frame = conn.sendFrame(frame); + + if (receipt) { + assertEquals(Stomp.Responses.RECEIPT, frame.getCommand()); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); } + + return frame; + } + + public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body) throws IOException, InterruptedException { + return send(conn, destination, contentType, body, false); + } + + public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt) throws IOException, InterruptedException { + return send(conn, destination, contentType, body, receipt, null); + } + + public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType) throws IOException, InterruptedException { + return send(conn, destination, contentType, body, receipt, destinationType, null); } + public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType, String txId) throws IOException, InterruptedException { + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, destination) + .setBody(body); + + if (contentType != null) { + frame.addHeader(Stomp.Headers.CONTENT_TYPE, contentType); + } + + if (destinationType != null) { + frame.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, destinationType.toString()); + } + + if (txId != null) { + frame.addHeader(Stomp.Headers.TRANSACTION, txId); + } + + String uuid = UUID.randomUUID().toString(); + + if (receipt) { + frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid); + } + frame = conn.sendFrame(frame); + + if (receipt) { + assertEquals(Stomp.Responses.RECEIPT, frame.getCommand()); + assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID)); + } + + IntegrationTestLogger.LOGGER.info("Received: " + frame); + + return frame; + } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java new file mode 100644 index 0000000..9bb9bf2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.stomp; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; +import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.junit.Assert; +import org.junit.Test; + +public class StompTestWithInterceptors extends StompTestBase { + + @Override + public List<String> getIncomingInterceptors() { + List<String> stompIncomingInterceptor = new ArrayList<>(); + stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyIncomingStompFrameInterceptor"); + stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyCoreInterceptor"); + + return stompIncomingInterceptor; + } + + @Override + public List<String> getOutgoingInterceptors() { + List<String> stompOutgoingInterceptor = new ArrayList<>(); + stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyOutgoingStompFrameInterceptor"); + + return stompOutgoingInterceptor; + } + + @Test + public void stompFrameInterceptor() throws Exception { + MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear(); + MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear(); + + Thread.sleep(200); + + // So we clear them here + MyCoreInterceptor.incomingInterceptedFrames.clear(); + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + conn.sendFrame(subFrame); + + assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size()); + sendJmsMessage(getName()); + + // Something was supposed to be called on sendMessages + assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0); + + conn.receiveFrame(10000); + + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.setBody("Hello World"); + conn.sendFrame(frame); + + conn.disconnect(); + + List<String> incomingCommands = new ArrayList<>(4); + incomingCommands.add("CONNECT"); + incomingCommands.add("SUBSCRIBE"); + incomingCommands.add("SEND"); + incomingCommands.add("DISCONNECT"); + + List<String> outgoingCommands = new ArrayList<>(3); + outgoingCommands.add("CONNECTED"); + outgoingCommands.add("MESSAGE"); + outgoingCommands.add("MESSAGE"); + + long timeout = System.currentTimeMillis() + 1000; + + // Things are async, giving some time to things arrive before we actually assert + while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 && + MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 && + timeout > System.currentTimeMillis()) { + Thread.sleep(10); + } + + Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size()); + Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size()); + + for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) { + Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand()); + Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp")); + } + + for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) { + Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand()); + } + + Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp")); + Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp")); + } + + public static class MyCoreInterceptor implements Interceptor { + + static List<Packet> incomingInterceptedFrames = new ArrayList<>(); + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) { + IntegrationTestLogger.LOGGER.info("Core intercepted: " + packet); + incomingInterceptedFrames.add(packet); + return true; + } + } + + public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor { + + static List<StompFrame> incomingInterceptedFrames = new ArrayList<>(); + + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { + incomingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal"); + return true; + } + } + + public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor { + + static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>(); + + @Override + public boolean intercept(StompFrame stompFrame, RemotingConnection connection) { + outgoingInterceptedFrames.add(stompFrame); + stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal"); + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java new file mode 100644 index 0000000..18410be --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.stomp; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class StompTestWithLargeMessages extends StompTestBase { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @Override + public boolean isCompressLargeMessages() { + return true; + } + + @Override + public boolean isPersistenceEnabled() { + return true; + } + + @Override + public Integer getStompMinLargeMessageSize() { + return 2048; + } + + //stomp sender -> large -> stomp receiver + @Test + public void testSendReceiveLargePersistentMessages() throws Exception { + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + int count = 10; + int msgSize = 1024 * 1024; + char[] contents = new char[msgSize]; + for (int i = 0; i < msgSize; i++) { + contents[i] = 'A'; + } + String body = new String(contents); + + for (int i = 0; i < count; i++) { + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("persistent", "true"); + frame.setBody(body); + conn.sendFrame(frame); + } + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + conn.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame frame = conn.receiveFrame(60000); + Assert.assertNotNull(frame); + System.out.println("part of frame: " + frame.getBody().substring(0, 200)); + Assert.assertTrue(frame.getCommand().equals("MESSAGE")); + Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName())); + int index = frame.getBody().indexOf("AAAA"); + assertEquals(msgSize, (frame.getBody().length() - index)); + } + + ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + unsubFrame.addHeader("receipt", "567"); + ClientStompFrame response = conn.sendFrame(unsubFrame); + assertNotNull(response); + assertNotNull(response.getCommand().equals("RECEIPT")); + + conn.disconnect(); + } + + //core sender -> large -> stomp receiver + @Test + public void testReceiveLargePersistentMessagesFromCore() throws Exception { + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + char[] contents = new char[msgSize]; + for (int i = 0; i < msgSize; i++) { + contents[i] = 'B'; + } + String msg = new String(contents); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + conn.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame frame = conn.receiveFrame(60000); + Assert.assertNotNull(frame); + System.out.println("part of frame: " + frame.getBody().substring(0, 200)); + Assert.assertTrue(frame.getCommand().equals("MESSAGE")); + Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName())); + int index = frame.getBody().indexOf("BBB"); + assertEquals(msgSize, (frame.getBody().length() - index)); + } + + ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + unsubFrame.addHeader("receipt", "567"); + ClientStompFrame response = conn.sendFrame(unsubFrame); + assertNotNull(response); + assertNotNull(response.getCommand().equals("RECEIPT")); + + conn.disconnect(); + } + + //stomp v12 sender -> large -> stomp v12 receiver + @Test + public void testSendReceiveLargePersistentMessagesV12() throws Exception { + StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); + connV12.connect(defUser, defPass); + + int count = 10; + int szBody = 1024 * 1024; + char[] contents = new char[szBody]; + for (int i = 0; i < szBody; i++) { + contents[i] = 'A'; + } + String body = new String(contents); + + ClientStompFrame frame = connV12.createFrame("SEND"); + frame.addHeader("destination-type", "ANYCAST"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("persistent", "true"); + frame.setBody(body); + + for (int i = 0; i < count; i++) { + connV12.sendFrame(frame); + } + + ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); + subFrame.addHeader("id", "a-sub"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + + connV12.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame receiveFrame = connV12.receiveFrame(30000); + + Assert.assertNotNull(receiveFrame); + System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); + Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); + Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); + assertEquals(szBody, receiveFrame.getBody().length()); + } + + // remove susbcription + ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("id", "a-sub"); + connV12.sendFrame(unsubFrame); + + connV12.disconnect(); + } + + //core sender -> large -> stomp v12 receiver + @Test + public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception { + int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE; + char[] contents = new char[msgSize]; + for (int i = 0; i < msgSize; i++) { + contents[i] = 'B'; + } + String msg = new String(contents); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); + connV12.connect(defUser, defPass); + + ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); + subFrame.addHeader("id", "a-sub"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + connV12.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame receiveFrame = connV12.receiveFrame(30000); + + Assert.assertNotNull(receiveFrame); + System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); + Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); + Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); + assertEquals(msgSize, receiveFrame.getBody().length()); + } + + // remove susbcription + ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("id", "a-sub"); + connV12.sendFrame(unsubFrame); + + connV12.disconnect(); + } + + //core sender -> large (compressed regular) -> stomp v10 receiver + @Test + public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception { + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); + LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + char[] contents = input.toArray(); + String msg = new String(contents); + + String leadingPart = msg.substring(0, 100); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + conn.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame receiveFrame = conn.receiveFrame(30000); + Assert.assertNotNull(receiveFrame); + System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 250)); + Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); + Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); + int index = receiveFrame.getBody().indexOf(leadingPart); + assertEquals(msg.length(), (receiveFrame.getBody().length() - index)); + } + + // remove suscription + ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + unsubFrame.addHeader("receipt", "567"); + ClientStompFrame response = conn.sendFrame(unsubFrame); + assertNotNull(response); + assertNotNull(response.getCommand().equals("RECEIPT")); + + conn.disconnect(); + } + + //core sender -> large (compressed regular) -> stomp v12 receiver + @Test + public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception { + LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); + LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + char[] contents = input.toArray(); + String msg = new String(contents); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port); + connV12.connect(defUser, defPass); + + ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); + subFrame.addHeader("id", "a-sub"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + + connV12.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame receiveFrame = connV12.receiveFrame(30000); + + Assert.assertNotNull(receiveFrame); + System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); + Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); + Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); + assertEquals(contents.length, receiveFrame.getBody().length()); + } + + // remove susbcription + ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("id", "a-sub"); + connV12.sendFrame(unsubFrame); + + connV12.disconnect(); + } + + //core sender -> large (compressed large) -> stomp v12 receiver + @Test + public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception { + LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); + input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + char[] contents = input.toArray(); + String msg = new String(contents); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + IntegrationTestLogger.LOGGER.info("Message count for " + getQueueName() + ": " + server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(getQueueName())).getMessageCount()); + + StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", hostname, port); + connV12.connect(defUser, defPass); + + ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE"); + subFrame.addHeader("id", "a-sub"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + + connV12.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame receiveFrame = connV12.receiveFrame(30000); + + Assert.assertNotNull(receiveFrame); + System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20)); + Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE")); + Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName()); + assertEquals(contents.length, receiveFrame.getBody().length()); + } + + // remove susbcription + ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("id", "a-sub"); + connV12.sendFrame(unsubFrame); + + connV12.disconnect(); + } + + //core sender -> large (compressed large) -> stomp v10 receiver + @Test + public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception { + LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true); + input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + char[] contents = input.toArray(); + String msg = new String(contents); + + String leadingPart = msg.substring(0, 100); + + int count = 10; + for (int i = 0; i < count; i++) { + this.sendJmsMessage(msg); + } + + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE"); + subFrame.addHeader("subscription-type", "ANYCAST"); + subFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + subFrame.addHeader("ack", "auto"); + conn.sendFrame(subFrame); + + for (int i = 0; i < count; i++) { + ClientStompFrame frame = conn.receiveFrame(60000); + Assert.assertNotNull(frame); + System.out.println("part of frame: " + frame.getBody().substring(0, 250)); + Assert.assertTrue(frame.getCommand().equals("MESSAGE")); + Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName())); + int index = frame.getBody().toString().indexOf(leadingPart); + assertEquals(msg.length(), (frame.getBody().toString().length() - index)); + } + + ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE"); + unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName()); + unsubFrame.addHeader("receipt", "567"); + conn.sendFrame(unsubFrame); + + conn.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java new file mode 100644 index 0000000..69c214b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.stomp; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.QueueBrowser; +import javax.jms.TextMessage; +import java.util.Enumeration; + +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; +import org.junit.Assert; +import org.junit.Test; + +public class StompTestWithMessageID extends StompTestBase { + + @Override + public boolean isEnableStompMessageId() { + return true; + } + + @Test + public void testEnableMessageID() throws Exception { + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); + + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.setBody("Hello World 1"); + conn.sendFrame(frame); + + frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.setBody("Hello World 2"); + conn.sendFrame(frame); + + QueueBrowser browser = session.createBrowser(queue); + + Enumeration enu = browser.getEnumeration(); + + while (enu.hasMoreElements()) { + Message msg = (Message) enu.nextElement(); + String msgId = msg.getStringProperty("amqMessageId"); + assertNotNull(msgId); + assertTrue(msgId.indexOf("STOMP") == 0); + } + + browser.close(); + + MessageConsumer consumer = session.createConsumer(queue); + + TextMessage message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + + message = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(message); + + message = (TextMessage) consumer.receive(2000); + Assert.assertNull(message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java index e9d5550..a6ce6c9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java @@ -19,27 +19,34 @@ package org.apache.activemq.artemis.tests.integration.stomp; import javax.jms.MessageConsumer; import javax.jms.TextMessage; -import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection; +import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory; import org.junit.Assert; import org.junit.Test; public class StompTestWithSecurity extends StompTestBase { + @Override + public boolean isSecurityEnabled() { + return true; + } + @Test public void testJMSXUserID() throws Exception { server.getActiveMQServer().getConfiguration().setPopulateValidatedUser(true); MessageConsumer consumer = session.createConsumer(queue); - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); + StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); + conn.connect(defUser, defPass); - frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL; + ClientStompFrame frame = conn.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.setBody("Hello World"); + conn.sendFrame(frame); - sendFrame(frame); + conn.disconnect(); TextMessage message = (TextMessage) consumer.receive(1000); Assert.assertNotNull(message); @@ -54,9 +61,4 @@ public class StompTestWithSecurity extends StompTestBase { long tmsg = message.getJMSTimestamp(); Assert.assertTrue(Math.abs(tnow - tmsg) < 1000); } - - @Override - public boolean isSecurityEnabled() { - return true; - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java index 06771bb..c48fd8d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java @@ -24,9 +24,9 @@ import java.util.Iterator; import java.util.List; import java.util.Set; -public abstract class AbstractClientStompFrame implements ClientStompFrame { +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; - protected static final String HEADER_RECEIPT = "receipt"; +public abstract class AbstractClientStompFrame implements ClientStompFrame { protected static final Set<String> validCommands = new HashSet<>(); protected String command; @@ -36,19 +36,19 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame { protected String EOL = "\n"; static { - validCommands.add("CONNECT"); - validCommands.add("CONNECTED"); - validCommands.add("SEND"); - validCommands.add("RECEIPT"); - validCommands.add("SUBSCRIBE"); - validCommands.add("UNSUBSCRIBE"); - validCommands.add("MESSAGE"); - validCommands.add("BEGIN"); - validCommands.add("COMMIT"); - validCommands.add("ABORT"); - validCommands.add("ACK"); - validCommands.add("DISCONNECT"); - validCommands.add("ERROR"); + validCommands.add(Stomp.Commands.CONNECT); + validCommands.add(Stomp.Responses.CONNECTED); + validCommands.add(Stomp.Commands.SEND); + validCommands.add(Stomp.Responses.RECEIPT); + validCommands.add(Stomp.Commands.SUBSCRIBE); + validCommands.add(Stomp.Commands.UNSUBSCRIBE); + validCommands.add(Stomp.Responses.MESSAGE); + validCommands.add(Stomp.Commands.BEGIN); + validCommands.add(Stomp.Commands.COMMIT); + validCommands.add(Stomp.Commands.ABORT); + validCommands.add(Stomp.Commands.ACK); + validCommands.add(Stomp.Commands.DISCONNECT); + validCommands.add(Stomp.Responses.ERROR); } public AbstractClientStompFrame(String command) { @@ -80,37 +80,15 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame { @Override public ByteBuffer toByteBuffer() { - if (isPing()) { - ByteBuffer buffer = ByteBuffer.allocateDirect(1); - buffer.put((byte) 0x0A); - buffer.rewind(); - return buffer; - } - StringBuffer sb = new StringBuffer(); - sb.append(command + EOL); - int n = headers.size(); - for (int i = 0; i < n; i++) { - sb.append(headers.get(i).key + ":" + headers.get(i).val + EOL); - } - sb.append(EOL); - if (body != null) { - sb.append(body); - } - sb.append((char) 0); - - String data = sb.toString(); - - byte[] byteValue = data.getBytes(StandardCharsets.UTF_8); - - ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length); - buffer.put(byteValue); - - buffer.rewind(); - return buffer; + return toByteBufferInternal(null); } @Override public ByteBuffer toByteBufferWithExtra(String str) { + return toByteBufferInternal(str); + } + + public ByteBuffer toByteBufferInternal(String str) { StringBuffer sb = new StringBuffer(); sb.append(command + EOL); int n = headers.size(); @@ -122,7 +100,9 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame { sb.append(body); } sb.append((char) 0); - sb.append(str); + if (str != null) { + sb.append(str); + } String data = sb.toString(); @@ -137,26 +117,29 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame { @Override public boolean needsReply() { - if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT)) { + if (Stomp.Commands.CONNECT.equals(command) || headerKeys.contains(Stomp.Headers.RECEIPT_REQUESTED)) { return true; } return false; } @Override - public void setCommand(String command) { + public ClientStompFrame setCommand(String command) { this.command = command; + return this; } @Override - public void addHeader(String key, String val) { + public ClientStompFrame addHeader(String key, String val) { headers.add(new Header(key, val)); headerKeys.add(key); + return this; } @Override - public void setBody(String body) { + public ClientStompFrame setBody(String body) { this.body = body; + return this; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java index ce94ec3..d8a487e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java @@ -27,29 +27,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; public abstract class AbstractStompClientConnection implements StompClientConnection { - public static final String STOMP_COMMAND = "STOMP"; - - public static final String ACCEPT_HEADER = "accept-version"; - public static final String HOST_HEADER = "host"; - public static final String VERSION_HEADER = "version"; - public static final String RECEIPT_HEADER = "receipt"; - - protected static final String CONNECT_COMMAND = "CONNECT"; - protected static final String CONNECTED_COMMAND = "CONNECTED"; - protected static final String DISCONNECT_COMMAND = "DISCONNECT"; - - protected static final String LOGIN_HEADER = "login"; - protected static final String PASSCODE_HEADER = "passcode"; - - //ext - protected static final String CLIENT_ID_HEADER = "client-id"; - protected Pinger pinger; - protected String version; protected String host; protected int port; @@ -58,13 +41,10 @@ public abstract class AbstractStompClientConnection implements StompClientConnec protected StompFrameFactory factory; protected final SocketChannel socketChannel; protected ByteBuffer readBuffer; - protected List<Byte> receiveList; - protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>(); - protected boolean connected = false; - private int serverPingCounter; + protected int serverPingCounter; public AbstractStompClientConnection(String version, String host, int port) throws IOException { this.version = version; @@ -90,11 +70,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec new ReaderThread().start(); } - @Override - public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException { + private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException { ClientStompFrame response = null; - IntegrationTestLogger.LOGGER.info("Sending frame:\n" + frame); - ByteBuffer buffer = frame.toByteBuffer(); + IntegrationTestLogger.LOGGER.info("Sending " + (wicked ? "*wicked* " : "") + "frame:\n" + frame); + ByteBuffer buffer; + if (wicked) { + buffer = frame.toByteBufferWithExtra("\n"); + } else { + buffer = frame.toByteBuffer(); + } while (buffer.remaining() > 0) { socketChannel.write(buffer); } @@ -105,7 +89,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec //filter out server ping while (response != null) { - if (response.getCommand().equals("STOMP")) { + if (response.getCommand().equals(Stomp.Commands.STOMP)) { response = receiveFrame(); } else { break; @@ -113,32 +97,19 @@ public abstract class AbstractStompClientConnection implements StompClientConnec } } + IntegrationTestLogger.LOGGER.info("Received:\n" + response); + return response; } @Override - public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException { - ClientStompFrame response = null; - ByteBuffer buffer = frame.toByteBufferWithExtra("\n"); - - while (buffer.remaining() > 0) { - socketChannel.write(buffer); - } - - //now response - if (frame.needsReply()) { - response = receiveFrame(); + public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException { + return sendFrameInternal(frame, false); + } - //filter out server ping - while (response != null) { - if (response.getCommand().equals("STOMP")) { - response = receiveFrame(); - } else { - break; - } - } - } - return response; + @Override + public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException { + return sendFrameInternal(frame, true); } @Override @@ -186,17 +157,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec readBuffer.rewind(); } - @Override - public int getServerPingNumber() { - return serverPingCounter; - } - protected void incrementServerPing() { serverPingCounter++; } private boolean validateFrame(ClientStompFrame f) { - String h = f.getHeader("content-length"); + String h = f.getHeader(Stomp.Headers.CONTENT_LENGTH); if (h != null) { int len = Integer.valueOf(h); if (f.getBody().getBytes(StandardCharsets.UTF_8).length < len) { @@ -271,34 +237,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec return this.frameQueue.size(); } - @Override - public void startPinger(long interval) { - pinger = new Pinger(interval); - pinger.startPing(); - } - - @Override - public void stopPinger() { - if (pinger != null) { - pinger.stopPing(); - try { - pinger.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - pinger = null; - } - } - - private class Pinger extends Thread { + protected class Pinger extends Thread { long pingInterval; ClientStompFrame pingFrame; volatile boolean stop = false; - private Pinger(long interval) { + Pinger(long interval) { this.pingInterval = interval; - pingFrame = createFrame("STOMP"); + pingFrame = createFrame(Stomp.Commands.STOMP); pingFrame.setBody("\n"); pingFrame.setForceOneway(); pingFrame.setPing(true); @@ -329,5 +276,4 @@ public abstract class AbstractStompClientConnection implements StompClientConnec } } } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java index 53bced4..93801f9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java @@ -27,11 +27,11 @@ public interface ClientStompFrame { boolean needsReply(); - void setCommand(String command); + ClientStompFrame setCommand(String command); - void addHeader(String string, String string2); + ClientStompFrame addHeader(String string, String string2); - void setBody(String string); + ClientStompFrame setBody(String string); String getCommand(); @@ -43,8 +43,8 @@ public interface ClientStompFrame { boolean isPing(); - void setForceOneway(); + ClientStompFrame setForceOneway(); - void setPing(boolean b); + ClientStompFrame setPing(boolean b); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java index 5273236..92629ab 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java @@ -30,18 +30,18 @@ public class ClientStompFrameV10 extends AbstractClientStompFrame { } @Override - public boolean isPing() { - return false; + public ClientStompFrame setForceOneway() { + throw new IllegalStateException("Doesn't apply with V1.0!"); } @Override - public void setForceOneway() { + public ClientStompFrame setPing(boolean b) { throw new IllegalStateException("Doesn't apply with V1.0!"); } @Override - public void setPing(boolean b) { - throw new IllegalStateException("Doesn't apply with V1.0!"); + public boolean isPing() { + return false; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java index 22d7146..4d8d1e0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java @@ -16,14 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.stomp.util; -/** - * pls use factory to create frames. - */ -public class ClientStompFrameV11 extends AbstractClientStompFrame { +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; + +public class ClientStompFrameV11 extends ClientStompFrameV10 { static { - validCommands.add("NACK"); - validCommands.add("STOMP"); + validCommands.add(Stomp.Commands.NACK); + validCommands.add(Stomp.Commands.STOMP); } boolean forceOneway = false; @@ -38,8 +37,9 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame { } @Override - public void setForceOneway() { + public ClientStompFrame setForceOneway() { forceOneway = true; + return this; } @Override @@ -47,15 +47,17 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame { if (forceOneway) return false; - if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) { + if (Stomp.Commands.STOMP.equals(command)) { return true; } - return false; + + return super.needsReply(); } @Override - public void setPing(boolean b) { + public ClientStompFrame setPing(boolean b) { isPing = b; + return this; } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java index 5ca530e..eaffd1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java @@ -16,17 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.stomp.util; -/** - */ -public class ClientStompFrameV12 extends AbstractClientStompFrame { - - static { - validCommands.add("NACK"); - validCommands.add("STOMP"); - } - - boolean forceOneway = false; - boolean isPing = false; +public class ClientStompFrameV12 extends ClientStompFrameV11 { public ClientStompFrameV12(String command) { this(command, true, true); @@ -45,32 +35,6 @@ public class ClientStompFrameV12 extends AbstractClientStompFrame { } @Override - public void setForceOneway() { - forceOneway = true; - } - - @Override - public boolean needsReply() { - if (forceOneway) - return false; - - if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) { - return true; - } - return false; - } - - @Override - public void setPing(boolean b) { - isPing = b; - } - - @Override - public boolean isPing() { - return isPing; - } - - @Override public String toString() { return "[1.2]" + super.toString(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java index 12f52d0..7be09a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java @@ -18,9 +18,6 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; -/** - * pls use factory to create frames. - */ public interface StompClientConnection { ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException; @@ -35,7 +32,7 @@ public interface StompClientConnection { ClientStompFrame connect(String defUser, String defPass) throws Exception; - void connect(String defUser, String defPass, String clientId) throws Exception; + ClientStompFrame connect(String defUser, String defPass, String clientId) throws Exception; boolean isConnected(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a88853fe/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java index 7a1a529..d32823b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java @@ -18,52 +18,47 @@ package org.apache.activemq.artemis.tests.integration.stomp.util; import java.io.IOException; -/** - * pls use factory to create frames. - */ +import org.apache.activemq.artemis.core.protocol.stomp.Stomp; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; + public class StompClientConnectionV10 extends AbstractStompClientConnection { public StompClientConnectionV10(String host, int port) throws IOException { super("1.0", host, port); } + public StompClientConnectionV10(String version, String host, int port) throws IOException { + super(version, host, port); + } + @Override public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - - ClientStompFrame response = this.sendFrame(frame); - - if (response.getCommand().equals(CONNECTED_COMMAND)) { - connected = true; - } else { - System.out.println("Connection failed with: " + response); - connected = false; - } - return response; + return connect(username, passcode, null); } @Override - public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND); - frame.addHeader(LOGIN_HEADER, username); - frame.addHeader(PASSCODE_HEADER, passcode); - frame.addHeader(CLIENT_ID_HEADER, clientID); + public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException { + ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT); + frame.addHeader(Stomp.Headers.Connect.LOGIN, username); + frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode); + if (clientID != null) { + frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID); + } ClientStompFrame response = this.sendFrame(frame); - if (response.getCommand().equals(CONNECTED_COMMAND)) { + if (response.getCommand().equals(Stomp.Responses.CONNECTED)) { connected = true; } else { - System.out.println("Connection failed with: " + response); + IntegrationTestLogger.LOGGER.warn("Connection failed with: " + response); connected = false; } + return response; } @Override public void disconnect() throws IOException, InterruptedException { - ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND); + ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT); this.sendFrame(frame); close();
