Fix ManagementWithStompTest
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3a8552df Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3a8552df Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3a8552df Branch: refs/heads/ARTEMIS-780 Commit: 3a8552df192aa683bdd4d29424ac9cc0a0adb1c0 Parents: 1e7b0ed Author: jbertram <[email protected]> Authored: Tue Nov 15 20:55:12 2016 -0600 Committer: jbertram <[email protected]> Committed: Wed Nov 23 09:04:34 2016 -0600 ---------------------------------------------------------------------- .../core/protocol/stomp/StompConnection.java | 2 +- .../management/ManagementWithStompTest.java | 232 ------------------- .../tests/integration/stomp/StompTest.java | 58 +++++ .../tests/integration/stomp/StompTestBase.java | 6 +- 4 files changed, 62 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3a8552df/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index eaeb21d..356aae1 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -262,7 +262,7 @@ public final class StompConnection implements RemotingConnection { try { if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { // TODO check here to see if auto-creation is enabled - if (routingType.equals(AddressInfo.RoutingType.MULTICAST)) { + if (routingType != null && routingType.equals(AddressInfo.RoutingType.MULTICAST)) { manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setAutoCreated(true)); } else { manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3a8552df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java deleted file mode 100644 index 5c94f1d..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ManagementWithStompTest.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.management; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; - -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.api.core.management.ResourceNames; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.protocol.stomp.Stomp; -import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServers; -import org.apache.activemq.artemis.jms.server.JMSServerManager; -import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class ManagementWithStompTest extends ManagementTestBase { - - // Constants ----------------------------------------------------- - - // Attributes ---------------------------------------------------- - - protected ActiveMQServer server; - - protected JMSServerManager jmsServer; - - protected ClientSession session; - - private Socket stompSocket; - - private ByteArrayOutputStream inputBuffer; - - // Static -------------------------------------------------------- - - // Constructors -------------------------------------------------- - - // Public -------------------------------------------------------- - - @Test - public void testGetManagementAttributeFromStomp() throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - SimpleString queue = RandomUtil.randomSimpleString(); - - session.createQueue(address, queue, null, false); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + queue + "\n\n" + Stomp.NULL; - sendFrame(frame); - - // retrieve the address of the queue - frame = "\nSEND\n" + "destination:" + ActiveMQDefaultConfiguration.getDefaultManagementAddress() + "\n" + - "reply-to:" + address + "\n" + - "_AMQ_ResourceName:" + ResourceNames.QUEUE + queue + "\n" + - "_AMQ_Attribute: Address\n\n" + - Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - System.out.println(frame); - Assert.assertTrue(frame.contains("_AMQ_OperationSucceeded:true")); - // the address will be returned in the message body in a JSON array - Assert.assertTrue(frame.contains("[\"" + address + "\"]")); - - frame = "UNSUBSCRIBE\n" + "destination:" + queue + "\n" + - "receipt: 123\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL; - sendFrame(disconnectFrame); - - session.deleteQueue(queue); - } - - @Test - public void testInvokeOperationFromStomp() throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - SimpleString queue = RandomUtil.randomSimpleString(); - - session.createQueue(address, queue, null, false); - - String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - Assert.assertTrue(frame.startsWith("CONNECTED")); - - frame = "SUBSCRIBE\n" + "destination:" + queue + "\n\n" + Stomp.NULL; - sendFrame(frame); - - // count number of message with filter "color = 'blue'" - frame = "\nSEND\n" + "destination:" + ActiveMQDefaultConfiguration.getDefaultManagementAddress() + "\n" + - "reply-to:" + address + "\n" + - "_AMQ_ResourceName:" + ResourceNames.QUEUE + queue + "\n" + - "_AMQ_OperationName: countMessages\n\n" + - "[\"color = 'blue'\"]" + - Stomp.NULL; - sendFrame(frame); - - frame = receiveFrame(10000); - System.out.println(frame); - Assert.assertTrue(frame.contains("_AMQ_OperationSucceeded:true")); - // there is no such messages => 0 returned in a JSON array - Assert.assertTrue(frame.contains("[0]")); - - frame = "UNSUBSCRIBE\n" + "destination:" + queue + "\n" + - "receipt: 123\n\n" + - Stomp.NULL; - sendFrame(frame); - waitForReceipt(); - - String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL; - sendFrame(disconnectFrame); - - session.deleteQueue(queue); - } - - // Package protected --------------------------------------------- - - // Protected ----------------------------------------------------- - - private ServerLocator locator; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME); - params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT); - TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - - Configuration config = createDefaultInVMConfig().addAcceptorConfiguration(stompTransport); - - server = addServer(ActiveMQServers.newActiveMQServer(config, mbeanServer, false, "brianm", "wombats")); - - jmsServer = new JMSServerManagerImpl(server); - - jmsServer.start(); - - locator = createInVMNonHALocator().setBlockOnNonDurableSend(true); - ClientSessionFactory sf = createSessionFactory(locator); - session = sf.createSession(false, true, false); - session.start(); - - stompSocket = new Socket("127.0.0.1", TransportConstants.DEFAULT_STOMP_PORT); - inputBuffer = new ByteArrayOutputStream(); - } - - // Private ------------------------------------------------------- - - public void sendFrame(String data) throws Exception { - byte[] bytes = data.getBytes(StandardCharsets.UTF_8); - OutputStream outputStream = stompSocket.getOutputStream(); - for (int i = 0; i < bytes.length; i++) { - outputStream.write(bytes[i]); - } - outputStream.flush(); - } - - public String receiveFrame(long timeOut) throws Exception { - stompSocket.setSoTimeout((int) timeOut); - InputStream is = stompSocket.getInputStream(); - int c = 0; - for (;;) { - c = is.read(); - if (c < 0) { - throw new IOException("socket closed."); - } else if (c == 0) { - c = is.read(); - if (c != '\n') { - byte[] ba = inputBuffer.toByteArray(); - System.out.println(new String(ba, StandardCharsets.UTF_8)); - } - Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n'); - byte[] ba = inputBuffer.toByteArray(); - inputBuffer.reset(); - return new String(ba, StandardCharsets.UTF_8); - } else { - inputBuffer.write(c); - } - } - } - - protected void waitForReceipt() throws Exception { - String frame = receiveFrame(50000); - Assert.assertNotNull(frame); - Assert.assertTrue(frame.indexOf("RECEIPT") > -1); - } - - // Inner classes ------------------------------------------------- - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3a8552df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index e7dcc91..f1def68 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientMessage; @@ -38,6 +39,8 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; @@ -1406,4 +1409,59 @@ public class StompTest extends StompTestBase { frame = conn.sendFrame(frame); assertEquals(Stomp.Responses.ERROR, frame.getCommand()); } + + @Test + public void testGetManagementAttributeFromStomp() throws Exception { + conn.connect(defUser, defPass); + + subscribe(conn, null); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString()) + .addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName()) + .addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName()) + .addHeader(ManagementHelper.HDR_ATTRIBUTE.toString(), "Address"); + + conn.sendFrame(frame); + + frame = conn.receiveFrame(10000); + + IntegrationTestLogger.LOGGER.info("Received: " + frame); + + Assert.assertEquals(Boolean.TRUE.toString(), frame.getHeader(ManagementHelper.HDR_OPERATION_SUCCEEDED.toString())); + // the address will be returned in the message body in a JSON array + Assert.assertEquals("[\"" + getQueuePrefix() + getQueueName() + "\"]", frame.getBody()); + + unsubscribe(conn, null); + + conn.disconnect(); + } + + @Test + public void testInvokeOperationFromStomp() throws Exception { + conn.connect(defUser, defPass); + + subscribe(conn, null); + + ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND) + .addHeader(Stomp.Headers.Send.DESTINATION, ActiveMQDefaultConfiguration.getDefaultManagementAddress().toString()) + .addHeader(Stomp.Headers.Send.REPLY_TO, getQueuePrefix() + getQueueName()) + .addHeader(ManagementHelper.HDR_RESOURCE_NAME.toString(), ResourceNames.QUEUE + getQueuePrefix() + getQueueName()) + .addHeader(ManagementHelper.HDR_OPERATION_NAME.toString(), "countMessages") + .setBody("[\"color = 'blue'\"]"); + + conn.sendFrame(frame); + + frame = conn.receiveFrame(10000); + + IntegrationTestLogger.LOGGER.info("Received: " + frame); + + Assert.assertEquals(Boolean.TRUE.toString(), frame.getHeader(ManagementHelper.HDR_OPERATION_SUCCEEDED.toString())); + // there is no such messages => 0 returned in a JSON array + Assert.assertEquals("[0]", frame.getBody()); + + unsubscribe(conn, null); + + conn.disconnect(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3a8552df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index bcac436..dbe2762 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -314,20 +314,20 @@ public abstract class StompTestBase extends ActiveMQTestBase { public ClientStompFrame subscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException { - return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null); + return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO); } public ClientStompFrame subscribe(StompClientConnection conn, String subscriptionId, String ack) throws IOException, InterruptedException { - return subscribe(conn, subscriptionId, ack, null, null); + return subscribe(conn, subscriptionId, ack, null); } public ClientStompFrame subscribe(StompClientConnection conn, String subscriptionId, String ack, String durableId) throws IOException, InterruptedException { - return subscribe(conn, subscriptionId, ack, durableId, null); + return subscribe(conn, subscriptionId, ack, durableId, false); } public ClientStompFrame subscribe(StompClientConnection conn,
