QPID-6933: [System Tests] Refactor amqp management tests as JMS 1.1 system test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a9a275dc Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a9a275dc Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a9a275dc Branch: refs/heads/master Commit: a9a275dc74c499735184b134eb4ae38cd0d6c50b Parents: 208b5ae Author: Alex Rudyy <[email protected]> Authored: Sun Jan 7 00:12:05 2018 +0000 Committer: Alex Rudyy <[email protected]> Committed: Sun Jan 7 00:12:05 2018 +0000 ---------------------------------------------------------------------- .../management/AmqpManagementTest.java | 1030 ++++++++++++++++++ .../jms_1_1/extensions/tls/TlsTest.java | 179 ++- .../management/amqp/AmqpManagementTest.java | 872 --------------- test-profiles/Java10BrokenTestsExcludes | 5 - 4 files changed, 1156 insertions(+), 930 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9a275dc/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/management/AmqpManagementTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/management/AmqpManagementTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/management/AmqpManagementTest.java new file mode 100644 index 0000000..2fbf21f --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/management/AmqpManagementTest.java @@ -0,0 +1,1030 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systests.jms_1_1.extensions.management; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.model.Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES; +import static org.apache.qpid.systests.jms_1_1.extensions.tls.TlsTest.TRUSTSTORE; +import static org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE_PASSWORD; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.junit.Assume.assumeThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.jms.Connection; +import javax.jms.ConnectionMetaData; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import org.apache.qpid.server.exchange.ExchangeDefaults; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.queue.PriorityQueue; +import org.apache.qpid.systests.AmqpManagementFacade; +import org.apache.qpid.systests.JmsTestBase; +import org.apache.qpid.systests.jms_1_1.extensions.tls.TlsTest; +import org.apache.qpid.tests.utils.BrokerAdmin; + +public class AmqpManagementTest extends JmsTestBase +{ + private Session _session; + private Queue _replyAddress; + private MessageConsumer _consumer; + private MessageProducer _producer; + + private void setUp(final Connection connection) throws Exception + { + connection.start(); + _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Queue queue; + final Queue replyConsumer; + if(getProtocol() == Protocol.AMQP_1_0) + { + queue = _session.createQueue("$management"); + _replyAddress = _session.createTemporaryQueue(); + replyConsumer = _replyAddress; + } + else + { + queue = _session.createQueue("ADDR:$management"); + _replyAddress = _session.createQueue("ADDR:!response"); + replyConsumer = _session.createQueue( + "ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}"); + } + _consumer = _session.createConsumer(replyConsumer); + _producer = _session.createProducer(queue); + } + + // test get types on $management + @Test + public void testGetTypesOnBrokerManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + Message message = _session.createBytesMessage(); + + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "GET-TYPES"); + + message.setJMSReplyTo(_replyAddress); + + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertNotNull("The response did not include the org.amqp.Management type", + getValueFromMapResponse(responseMessage, "org.amqp.management")); + assertNotNull("The response did not include the org.apache.qpid.Port type", + getValueFromMapResponse(responseMessage, "org.apache.qpid.Port")); + } + finally + { + connection.close(); + } + } + + // test get types on $management + @Test + public void testQueryBrokerManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "QUERY"); + message.setObject("attributeNames", "[]"); + message.setJMSReplyTo(_replyAddress); + + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + assertEquals("The correlation id does not match the sent message's messageId", + message.getJMSMessageID(), + responseMessage.getJMSCorrelationID()); + checkResponseIsMapType(responseMessage); + List<String> resultMessageKeys = new ArrayList<>(getMapResponseKeys(responseMessage)); + assertEquals("The response map has two entries", 2, resultMessageKeys.size()); + assertTrue("The response map does not contain attribute names", + resultMessageKeys.contains("attributeNames")); + assertTrue("The response map does not contain results ", resultMessageKeys.contains("results")); + Object attributeNames = getValueFromMapResponse(responseMessage, "attributeNames"); + assertTrue("The attribute names are not a list", attributeNames instanceof Collection); + Collection attributeNamesCollection = (Collection) attributeNames; + assertTrue("The attribute names do not contain identity", attributeNamesCollection.contains("identity")); + assertTrue("The attribute names do not contain name", attributeNamesCollection.contains("name")); + + assertTrue("The attribute names do not contain qpid-type", attributeNamesCollection.contains("qpid-type")); + + // Now test filtering by type + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "QUERY"); + message.setStringProperty("entityType", "org.apache.qpid.Exchange"); + + message.setObject("attributeNames", "[\"name\", \"identity\", \"type\"]"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + + assertEquals("The correlation id does not match the sent message's messageId", + message.getJMSMessageID(), + responseMessage.getJMSCorrelationID()); + resultMessageKeys = new ArrayList<>(getMapResponseKeys(responseMessage)); + assertEquals("The response map has two entries", 2, resultMessageKeys.size()); + assertTrue("The response map does not contain attribute names", + resultMessageKeys.contains("attributeNames")); + assertTrue("The response map does not contain results ", resultMessageKeys.contains("results")); + attributeNames = getValueFromMapResponse(responseMessage, "attributeNames"); + assertTrue("The attribute names are not a list", attributeNames instanceof Collection); + attributeNamesCollection = (Collection) attributeNames; + assertEquals("The attributeNames are no as expected", + Arrays.asList("name", "identity", "type"), + attributeNamesCollection); + Object resultsObject = getValueFromMapResponse(responseMessage, "results"); + assertTrue("results is not a collection", resultsObject instanceof Collection); + Collection results = (Collection) resultsObject; + + final int numberOfExchanges = results.size(); + assertTrue("results should have at least 4 elements", numberOfExchanges >= 4); + + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "QUERY"); + message.setStringProperty("entityType", "org.apache.qpid.DirectExchange"); + + message.setObject("attributeNames", "[\"name\", \"identity\", \"type\"]"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + final Collection directExchanges = (Collection) getValueFromMapResponse(responseMessage, "results"); + assertTrue( + "There are the same number of results when searching for direct exchanges as when searching for all exchanges", + directExchanges.size() < numberOfExchanges); + assertTrue("The list of direct exchanges is not a proper subset of the list of all exchanges", + results.containsAll(directExchanges)); + } + finally + { + connection.close(); + } + } + + // test get types on a virtual host + @Test + public void testGetTypesOnVirtualHostManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + Message message = _session.createBytesMessage(); + + message.setStringProperty("identity", "self"); + message.setStringProperty("type", "org.amqp.management"); + message.setStringProperty("operation", "GET-TYPES"); + String correlationID = "some correlation id"; + message.setJMSCorrelationIDAsBytes(correlationID.getBytes(UTF_8)); + + message.setJMSReplyTo(_replyAddress); + + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertNotNull("A response message was not sent", responseMessage); + assertEquals("The correlation id does not match the sent message's correlationId", + correlationID, responseMessage.getJMSCorrelationID()); + + assertResponseCode(responseMessage, 200); + assertNotNull("The response did not include the org.amqp.Management type", + getValueFromMapResponse(responseMessage,"org.amqp.management")); + assertNull("The response included the org.apache.qpid.Port type", + getValueFromMapResponse(responseMessage,"org.apache.qpid.Port")); + } + finally + { + connection.close(); + } + + } + + // create / update / read / delete a queue via $management + @Test + public void testCreateQueueOnBrokerManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L); + String path = getVirtualHostName() + "/" + getVirtualHostName() + "/" + getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 201); + checkResponseIsMapType(responseMessage); + assertEquals("The created queue was not a standard queue", + "org.apache.qpid.StandardQueue", + getValueFromMapResponse(responseMessage, "type")); + assertEquals("The created queue was not a standard queue", + "standard", + getValueFromMapResponse(responseMessage, "qpid-type")); + assertEquals("the created queue did not have the correct alerting threshold", + 100L, + getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)); + Object identity = getValueFromMapResponse(responseMessage, "identity"); + + message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "UPDATE"); + message.setObjectProperty("identity", identity); + message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertEquals("the created queue did not have the correct alerting threshold", + 250L, + getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)); + + message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "DELETE"); + message.setObjectProperty("index", "object-path"); + message.setObjectProperty("key", path); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 204); + + message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "READ"); + message.setObjectProperty("identity", identity); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 404); + } + finally + { + connection.close(); + } + } + + // create / update / read / delete a queue via vhost + @Test + public void testCreateQueueOnVirtualHostManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + message.setInt(PriorityQueue.PRIORITIES, 13); + String path = getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 201); + checkResponseIsMapType(responseMessage); + assertEquals("The created queue was not a priority queue", + "org.apache.qpid.PriorityQueue", + getValueFromMapResponse(responseMessage, "type")); + assertEquals("The created queue was not a standard queue", + "priority", + getValueFromMapResponse(responseMessage, "qpid-type")); + assertEquals("the created queue did not have the correct number of priorities", + 13, + Integer.valueOf(getValueFromMapResponse(responseMessage, PriorityQueue.PRIORITIES).toString()) + .intValue()); + Object identity = getValueFromMapResponse(responseMessage, "identity"); + + // Trying to create a second queue with the same name should cause a conflict + message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + message.setInt(PriorityQueue.PRIORITIES, 7); + message.setString("object-path", getTestName()); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 409); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "READ"); + message.setObjectProperty("identity", identity); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + assertEquals("the queue did not have the correct number of priorities", + 13, + Integer.valueOf(getValueFromMapResponse(responseMessage, PriorityQueue.PRIORITIES).toString()) + .intValue()); + assertEquals("the queue did not have the expected path", + getTestName(), + getValueFromMapResponse(responseMessage, "object-path")); + + message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "UPDATE"); + message.setObjectProperty("identity", identity); + message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertEquals("The updated queue did not have the correct alerting threshold", + 250L, + Long.valueOf(getValueFromMapResponse(responseMessage, + ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES).toString()) + .longValue()); + + message = _session.createMapMessage(); + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "DELETE"); + message.setObjectProperty("index", "object-path"); + message.setObjectProperty("key", path); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 204); + + message = _session.createMapMessage(); + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "DELETE"); + message.setObjectProperty("index", "object-path"); + message.setObjectProperty("key", path); + + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 404); + } + finally + { + connection.close(); + } + } + + // read virtual host from virtual host management + @Test + public void testReadVirtualHost() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.VirtualHost"); + message.setStringProperty("operation", "READ"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", ""); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertEquals("The name of the virtual host is not as expected", + getVirtualHostName(), + getValueFromMapResponse(responseMessage, "name")); + + message.setBooleanProperty("actuals", false); + _producer.send(message); + responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertNotNull("Derived attribute (productVersion) should be available", + getValueFromMapResponse(responseMessage, "productVersion")); + } + finally + { + connection.close(); + } + } + + @Test + public void testReadObject_ObjectNotFound() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Exchange"); + message.setStringProperty("operation", "READ"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", "not-found-exchange"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 404); + } + finally + { + connection.close(); + } + } + + @Test + public void testInvokeOperation_ObjectNotFound() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Exchange"); + message.setStringProperty("operation", "getStatistics"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", "not-found-exchange"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 404); + } + finally + { + connection.close(); + } + } + + @Test + public void testInvokeOperationReturningMap() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Broker"); + message.setStringProperty("operation", "getStatistics"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", ""); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertNotNull(getValueFromMapResponse(responseMessage, "numberOfLiveThreads")); + } + finally + { + connection.close(); + } + } + + @Test + public void testInvokeOperationReturningManagedAttributeValue() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Broker"); + message.setStringProperty("operation", "getConnectionMetaData"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", ""); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertNotNull(getValueFromMapResponse(responseMessage, "port")); + } + finally + { + connection.close(); + } + } + + @Test + public void testInvokeSecureOperation() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + String secureOperation = "publishMessage"; // // a secure operation + Map<String, String> operationArg = new HashMap<>(); + operationArg.put("address", ExchangeDefaults.FANOUT_EXCHANGE_NAME); + operationArg.put("content", "Hello, world!"); + + Connection unsecuredConnection = getConnection(); + try + { + setUp(unsecuredConnection); + + MapMessage plainRequest = _session.createMapMessage(); + + plainRequest.setStringProperty("type", "org.apache.qpid.VirtualHost"); + plainRequest.setStringProperty("operation", secureOperation); + plainRequest.setStringProperty("index", "object-path"); + plainRequest.setStringProperty("key", ""); + plainRequest.setStringProperty("message", new ObjectMapper().writeValueAsString(operationArg)); + plainRequest.setJMSReplyTo(_replyAddress); + _producer.send(plainRequest); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 403); + } + finally + { + unsecuredConnection.close(); + } + + int tlsPort = TlsTest.createTlsPort(getTestName() + "TlsPort", + false, + false, + false, + getConnectionBuilder(), + new AmqpManagementFacade(getProtocol()), + getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort()); + + Connection connection = getConnectionBuilder().setTls(true) + .setSslPort(tlsPort) + .setTrustStoreLocation(TRUSTSTORE) + .setTrustStorePassword(TRUSTSTORE_PASSWORD) + .build(); + try + { + setUp(connection); + + MapMessage secureRequest = _session.createMapMessage(); + + secureRequest.setStringProperty("type", "org.apache.qpid.VirtualHost"); + secureRequest.setStringProperty("operation", secureOperation); + secureRequest.setStringProperty("index", "object-path"); + secureRequest.setStringProperty("key", ""); + secureRequest.setStringProperty("message", new ObjectMapper().writeValueAsString(operationArg)); + secureRequest.setJMSReplyTo(_replyAddress); + _producer.send(secureRequest); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + } + finally + { + connection.close(); + } + } + + // create a virtual host from $management + @Test + public void testCreateVirtualHost() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + String virtualHostName = "newMemoryVirtualHost"; + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.JsonVirtualHostNode"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", virtualHostName); + message.setString(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, "{ \"type\" : \"Memory\" }"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 201); + } + finally + { + connection.close(); + } + + Connection virtualHostConnection = getConnectionBuilder().setVirtualHost(virtualHostName).build(); + try + { + setUp(virtualHostConnection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.VirtualHost"); + message.setStringProperty("operation", "READ"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", ""); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 200); + checkResponseIsMapType(responseMessage); + assertEquals("The name of the virtual host is not as expected", + virtualHostName, + getValueFromMapResponse(responseMessage, "name")); + assertEquals("The type of the virtual host is not as expected", + "Memory", + getValueFromMapResponse(responseMessage, "qpid-type")); + } + finally + { + virtualHostConnection.close(); + } + + } + + // attempt to delete the virtual host via the virtual host + @Test + public void testDeleteVirtualHost() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.VirtualHost"); + message.setStringProperty("operation", "DELETE"); + message.setStringProperty("index", "object-path"); + message.setStringProperty("key", ""); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 501); + } + finally + { + connection.close(); + } + } + + // create a queue with the qpid type + @Test + public void testCreateQueueWithQpidType() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Queue"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + message.setString("qpid-type", "lvq"); + String path = getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 201); + checkResponseIsMapType(responseMessage); + assertEquals("The created queue did not have the correct type", + "org.apache.qpid.LastValueQueue", + getValueFromMapResponse(responseMessage, "type")); + } + finally + { + connection.close(); + } + } + + // create a queue using the AMQP type + @Test + public void testCreateQueueWithAmqpType() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.SortedQueue"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + String path = getTestName(); + message.setString("object-path", path); + message.setString("sortKey", "foo"); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 201); + checkResponseIsMapType(responseMessage); + assertEquals("The created queue did not have the correct type", + "sorted", + getValueFromMapResponse(responseMessage, "qpid-type")); + } + finally + { + connection.close(); + } + } + + // attempt to create an exchange without a type + @Test + public void testCreateExchangeWithoutType() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Exchange"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + String path = getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 400); + } + finally + { + connection.close(); + } + } + + // attempt to create a connection + @Test + public void testCreateConnectionOnVirtualHostManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Connection"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + String path = getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 501); + } + finally + { + connection.close(); + } + } + + @Test + public void testCreateConnectionOnBrokerManagement() throws Exception + { + assumeThat(isSupportedClient(), is(true)); + + Connection connection = getBrokerManagementConnection(); + try + { + setUp(connection); + + MapMessage message = _session.createMapMessage(); + + message.setStringProperty("type", "org.apache.qpid.Connection"); + message.setStringProperty("operation", "CREATE"); + message.setString("name", getTestName()); + String path = getTestName(); + message.setString("object-path", path); + message.setJMSReplyTo(_replyAddress); + _producer.send(message); + + Message responseMessage = _consumer.receive(getReceiveTimeout()); + assertResponseCode(responseMessage, 501); + } + finally + { + connection.close(); + } + } + + @SuppressWarnings("unchecked") + private void assertResponseCode(final Message responseMessage, final int expectedResponseCode) throws JMSException + { + assertNotNull("A response message was not sent", responseMessage); + assertTrue("The response message does not have a status code", + Collections.list(responseMessage.getPropertyNames()).contains("statusCode")); + assertEquals("The response code did not indicate success", + expectedResponseCode, responseMessage.getIntProperty("statusCode")); + } + + + private Connection getBrokerManagementConnection() throws NamingException, JMSException + { + return getConnectionBuilder().setVirtualHost("$management") + .setClientId(UUID.randomUUID().toString()) + .build(); + } + + private void checkResponseIsMapType(final Message responseMessage) throws JMSException + { + if (getProtocol() == Protocol.AMQP_1_0) + { + if (!(responseMessage instanceof MapMessage) + && !(responseMessage instanceof ObjectMessage + && ((ObjectMessage) responseMessage).getObject() instanceof Map)) + { + fail(String.format("The response was neither a Map Message nor an Object Message containing a Map. It was a : %s ", + responseMessage.getClass())); + } + } + else + { + assertTrue(String.format("The response was not a MapMessage. It was a '%s'.", responseMessage.getClass()), responseMessage instanceof MapMessage); + } + } + + private Object getValueFromMapResponse(final Message responseMessage, String name) throws JMSException + { + if (getProtocol() == Protocol.AMQP_1_0 && responseMessage instanceof ObjectMessage) + { + return ((Map)((ObjectMessage)responseMessage).getObject()).get(name); + } + else + { + return ((MapMessage) responseMessage).getObject(name); + } + } + + @SuppressWarnings("unchecked") + private Collection<String> getMapResponseKeys(final Message responseMessage) throws JMSException + { + if (getProtocol() == Protocol.AMQP_1_0 && responseMessage instanceof ObjectMessage) + { + return ((Map)((ObjectMessage)responseMessage).getObject()).keySet(); + } + else + { + return Collections.list(((MapMessage) responseMessage).getMapNames()); + } + } + + private boolean isSupportedClient() throws NamingException, JMSException + { + if (getProtocol() == Protocol.AMQP_1_0) + { + return true; + } + else + { + Connection con = getConnection(); + try + { + final ConnectionMetaData metaData = con.getMetaData(); + // Older Qpid JMS Client 0-x (<=6.1.x) didn't support management addresses. + return !(metaData.getProviderMajorVersion() < 6 || (metaData.getProviderMajorVersion() == 6 + && metaData.getProviderMinorVersion() <= 1)); + } + finally + { + con.close(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9a275dc/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/tls/TlsTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/tls/TlsTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/tls/TlsTest.java index e10aecb..f79dc61 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/tls/TlsTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/tls/TlsTest.java @@ -58,6 +58,8 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.security.FileKeyStore; import org.apache.qpid.server.security.FileTrustStore; +import org.apache.qpid.systests.AmqpManagementFacade; +import org.apache.qpid.systests.ConnectionBuilder; import org.apache.qpid.systests.JmsTestBase; import org.apache.qpid.test.utils.TestSSLConstants; import org.apache.qpid.tests.utils.BrokerAdmin; @@ -71,7 +73,7 @@ public class TlsTest extends JmsTestBase TEST_PROFILE_RESOURCE_BASE + org.apache.qpid.test.utils.TestSSLConstants.BROKER_TRUSTSTORE; private static final String KEYSTORE = TEST_PROFILE_RESOURCE_BASE + org.apache.qpid.test.utils.TestSSLConstants.KEYSTORE; - private static final String TRUSTSTORE = + public static final String TRUSTSTORE = TEST_PROFILE_RESOURCE_BASE + org.apache.qpid.test.utils.TestSSLConstants.TRUSTSTORE; @BeforeClass @@ -513,71 +515,142 @@ public class TlsTest extends JmsTestBase final boolean samePort) throws Exception { - Connection connection = getConnectionBuilder().setVirtualHost("$management").build(); - try - { - connection.start(); - return createPort(portName, needClientAuth, wantClientAuth, samePort, connection); - } - finally - { - connection.close(); - } + return createTlsPort(portName, + needClientAuth, + wantClientAuth, + samePort, + getConnectionBuilder(), + new AmqpManagementFacade(getProtocol()), + getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort()); } - private int createPort(final String portName, - final boolean needClientAuth, - final boolean wantClientAuth, - final boolean plainAndSsl, - Connection connection) throws Exception + public static int createTlsPort(final String portName, + final boolean needClientAuth, + final boolean wantClientAuth, + final boolean plainAndSsl, + final ConnectionBuilder connectionBuilder, + final AmqpManagementFacade managementFacade, + final int brokerPort) throws Exception { - String keyStoreName = portName + "KeyStore"; - String trustStoreName = portName + "TrustStore"; - String authenticationProvider = null; - - List<Map<String, Object>> ports = queryEntitiesUsingAmqpManagement("org.apache.qpid.AmqpPort", connection); - for (Map<String, Object> port : ports) + Connection connection = connectionBuilder.setVirtualHost("$management").build(); + try { - String name = String.valueOf(port.get(Port.NAME)); + connection.start(); + String keyStoreName = portName + "KeyStore"; + String trustStoreName = portName + "TrustStore"; + String authenticationProvider = null; - Map<String, Object> attributes = - readEntityUsingAmqpManagement(name, "org.apache.qpid.AmqpPort", false, connection); - if (attributes.get("boundPort") - .equals(getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP).getPort())) + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { - authenticationProvider = String.valueOf(attributes.get(Port.AUTHENTICATION_PROVIDER)); - break; + List<Map<String, Object>> ports = + managementFacade.managementQueryObjects(session, "org.apache.qpid.AmqpPort"); + for (Map<String, Object> port : ports) + { + String name = String.valueOf(port.get(Port.NAME)); + + Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + Map<String, Object> attributes = managementFacade.readEntityUsingAmqpManagement(s, + "org.apache.qpid.AmqpPort", + name, + false); + if (attributes.get("boundPort").equals(brokerPort)) + { + authenticationProvider = String.valueOf(attributes.get(Port.AUTHENTICATION_PROVIDER)); + break; + } + } + finally + { + s.close(); + } + } + } + finally + { + session.close(); } - } - final Map<String, Object> keyStoreAttributes = new HashMap<>(); - keyStoreAttributes.put("storeUrl", BROKER_KEYSTORE); - keyStoreAttributes.put("password", BROKER_KEYSTORE_PASSWORD); - createEntity(keyStoreName, FileKeyStore.class.getName(), keyStoreAttributes, connection); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + final Map<String, Object> keyStoreAttributes = new HashMap<>(); + keyStoreAttributes.put("storeUrl", BROKER_KEYSTORE); + keyStoreAttributes.put("password", BROKER_KEYSTORE_PASSWORD); + managementFacade.createEntityAndAssertResponse(keyStoreName, + FileKeyStore.class.getName(), + keyStoreAttributes, + session); + } + finally + { + session.close(); + } - final Map<String, Object> trustStoreAttributes = new HashMap<>(); - trustStoreAttributes.put("storeUrl", BROKER_TRUSTSTORE); - trustStoreAttributes.put("password", BROKER_TRUSTSTORE_PASSWORD); - createEntity(trustStoreName, FileTrustStore.class.getName(), trustStoreAttributes, connection); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + final Map<String, Object> trustStoreAttributes = new HashMap<>(); + trustStoreAttributes.put("storeUrl", BROKER_TRUSTSTORE); + trustStoreAttributes.put("password", BROKER_TRUSTSTORE_PASSWORD); + managementFacade.createEntityAndAssertResponse(trustStoreName, + FileTrustStore.class.getName(), + trustStoreAttributes, + session); + } + finally + { + session.close(); + } - Map<String, Object> sslPortAttributes = new HashMap<>(); - sslPortAttributes.put(Port.TRANSPORTS, plainAndSsl ? "[\"SSL\",\"TCP\"]" : "[\"SSL\"]"); - sslPortAttributes.put(Port.PORT, 0); - sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, authenticationProvider); - sslPortAttributes.put(Port.NEED_CLIENT_AUTH, needClientAuth); - sslPortAttributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth); - sslPortAttributes.put(Port.NAME, portName); - sslPortAttributes.put(Port.KEY_STORE, keyStoreName); - sslPortAttributes.put(Port.TRUST_STORES, "[\"" + trustStoreName + "\"]"); - createEntity(portName, "org.apache.qpid.AmqpPort", sslPortAttributes, connection); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + Map<String, Object> sslPortAttributes = new HashMap<>(); + sslPortAttributes.put(Port.TRANSPORTS, plainAndSsl ? "[\"SSL\",\"TCP\"]" : "[\"SSL\"]"); + sslPortAttributes.put(Port.PORT, 0); + sslPortAttributes.put(Port.AUTHENTICATION_PROVIDER, authenticationProvider); + sslPortAttributes.put(Port.NEED_CLIENT_AUTH, needClientAuth); + sslPortAttributes.put(Port.WANT_CLIENT_AUTH, wantClientAuth); + sslPortAttributes.put(Port.NAME, portName); + sslPortAttributes.put(Port.KEY_STORE, keyStoreName); + sslPortAttributes.put(Port.TRUST_STORES, "[\"" + trustStoreName + "\"]"); + + managementFacade.createEntityAndAssertResponse(portName, + "org.apache.qpid.AmqpPort", + sslPortAttributes, + session); + } + finally + { + session.close(); + } - Map<String, Object> portEffectiveAttributes = - readEntityUsingAmqpManagement(portName, "org.apache.qpid.AmqpPort", false, connection); - if (portEffectiveAttributes.containsKey("boundPort")) + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + Map<String, Object> portEffectiveAttributes = + managementFacade.readEntityUsingAmqpManagement(session, + "org.apache.qpid.AmqpPort", + portName, + false); + if (portEffectiveAttributes.containsKey("boundPort")) + { + return (int) portEffectiveAttributes.get("boundPort"); + } + throw new RuntimeException("Bound port is not found"); + } + finally + { + session.close(); + } + } + finally { - return (int) portEffectiveAttributes.get("boundPort"); + connection.close(); } - throw new RuntimeException("Bound port is not found"); } private void setSslStoreSystemProperties() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
