http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java new file mode 100644 index 0000000..778cd40 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.junit.Test; + +public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport { + + private static final int FRAME_SIZE = 512; + + @Override + protected void configureAMQPAcceptorParameters(Map<String, Object> params) { + params.put("maxFrameSize", FRAME_SIZE); + } + + @Test(timeout = 60000) + public void testMultipleTransfers() throws Exception { + + String testQueueName = "ConnectionFrameSize"; + int nMsgs = 200; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(testQueueName); + + final int payload = FRAME_SIZE * 16; + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = createAmqpMessage((byte) 'A', payload); + sender.send(message); + } + + int count = getMessageCount(server.getPostOffice(), testQueueName); + assertEquals(nMsgs, count); + + AmqpReceiver receiver = session.createReceiver(testQueueName); + receiver.flow(nMsgs); + + for (int i = 0; i < nMsgs; ++i) { + AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("failed at " + i, message); + MessageImpl wrapped = (MessageImpl) message.getWrappedMessage(); + Data data = (Data) wrapped.getBody(); + System.out.println("received : message: " + data.getValue().getLength()); + assertEquals(payload, data.getValue().getLength()); + message.accept(); + } + + } finally { + connection.close(); + } + } + + private AmqpMessage createAmqpMessage(byte value, int payloadSize) { + AmqpMessage message = new AmqpMessage(); + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payload.length; i++) { + payload[i] = value; + } + message.setBytes(payload); + return message; + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java new file mode 100644 index 0000000..f895c86 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +public class AmqpMessageDivertsTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testQueueReceiverReadMessageWithDivert() throws Exception { + final String forwardingAddress = getQueueName() + "Divert"; + final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); + server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); + server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); + + sendMessages(getQueueName(), 1); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(forwardingAddress); + + Queue queueView = getProxyToQueue(forwardingAddress); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java new file mode 100644 index 0000000..fcce0ab --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.junit.Test; + +public class AmqpMessageRoutingTest extends JMSClientTestSupport { + + @Override + protected boolean isAutoCreateQueues() { + return false; + } + + @Override + protected boolean isAutoCreateAddresses() { + return false; + } + + @Test(timeout = 60000) + public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages("anycast://" + addressA, 1); + + assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + + @Test(timeout = 60000) + public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages(addressA, 1, RoutingType.ANYCAST); + + assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + + @Test(timeout = 60000) + public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages("multicast://" + addressA, 1); + + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + + @Test(timeout = 60000) + public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception { + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServerControl serverControl = server.getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + sendMessages(addressA, 1, RoutingType.MULTICAST); + + assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + + /** + * If we have an address configured with both ANYCAST and MULTICAST routing types enabled, we must ensure that any + * messages sent specifically to MULTICAST (e.g. JMS TopicProducer) are only delivered to MULTICAST queues (e.g. + * i.e. subscription queues) and **NOT** to ANYCAST queues (e.g. JMS Queue). + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testRoutingExclusivity() throws Exception { + + // Create Address with both ANYCAST and MULTICAST enabled + String testAddress = "testRoutingExclusivity-mixed-mode"; + SimpleString ssTestAddress = new SimpleString(testAddress); + + AddressInfo addressInfo = new AddressInfo(ssTestAddress); + addressInfo.addRoutingType(RoutingType.MULTICAST); + addressInfo.addRoutingType(RoutingType.ANYCAST); + + server.addAddressInfo(addressInfo); + server.createQueue(ssTestAddress, RoutingType.ANYCAST, ssTestAddress, null, true, false); + + Connection connection = createConnection(UUIDGenerator.getInstance().generateStringUUID()); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session.createTopic(testAddress); + javax.jms.Queue queue = session.createQueue(testAddress); + + MessageProducer producer = session.createProducer(topic); + + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + + producer.send(session.createTextMessage("testMessage")); + + assertNotNull(topicConsumer.receive(1000)); + assertNull(queueConsumer.receive(1000)); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java new file mode 100644 index 0000000..3d8be49 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; +import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testOutboundConnection() throws Throwable { + final ActiveMQServer remote = createServer(AMQP_PORT + 1); + remote.start(); + try { + Wait.waitFor(remote::isActive); + } catch (Exception e) { + remote.stop(); + throw e; + } + + final Map<String, Object> config = new LinkedHashMap<>(); + config.put(TransportConstants.HOST_PROP_NAME, "localhost"); + config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1)); + ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty()); + ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server); + NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager); + connector.start(); + connector.createConnection(); + + try { + Wait.waitFor(() -> remote.getConnectionCount() > 0); + assertEquals(1, remote.getConnectionCount()); + + lifeCycleListener.stop(); + + Wait.waitFor(() -> remote.getConnectionCount() == 0); + assertEquals(0, remote.getConnectionCount()); + } finally { + lifeCycleListener.stop(); + remote.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java index 422e23e..3fd21b1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -192,7 +193,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { message.setText("Test-Message"); sender.send(message); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); @@ -228,7 +229,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { message.setText("Test-Message"); sender.send(message); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true); @@ -250,21 +251,4 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport { sender.close(); connection.close(); } - - public void sendMessages(String destinationName, int count) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - sender.send(message); - } - } finally { - connection.close(); - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java new file mode 100644 index 0000000..e16fd46 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.Wait; +import org.fusesource.hawtbuf.Buffer; +import org.junit.Test; + +public class AmqpProtocolHeaderHandlingTest extends AmqpClientTestSupport { + + @Override + protected boolean isSecurityEnabled() { + return true; + } + + @Test(timeout = 60000) + public void testNonSaslHeaderRejectedOnConnect() throws Exception { + final AmqpHeader header = new AmqpHeader(); + + header.setProtocolId(0); + header.setMajor(1); + header.setMinor(0); + header.setRevision(0); + + final ClientConnection connection = new ClientConnection(); + connection.open("localhost", AMQP_PORT); + connection.send(header); + + AmqpHeader response = connection.readAmqpHeader(); + assertNotNull(response); + assertEquals(3, response.getProtocolId()); + IntegrationTestLogger.LOGGER.info("Broker responded with: " + response); + + // pump some bytes down the wire until broker closes the connection + assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisfied() throws Exception { + try { + connection.send(header); + return false; + } catch (Exception e) { + return true; + } + } + }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250))); + } + + private class ClientConnection { + + protected static final long RECEIVE_TIMEOUT = 10000; + protected Socket clientSocket; + + public void open(String host, int port) throws IOException { + clientSocket = new Socket(host, port); + clientSocket.setTcpNoDelay(true); + } + + public void send(AmqpHeader header) throws Exception { + IntegrationTestLogger.LOGGER.info("Client sending header: " + header); + OutputStream outputStream = clientSocket.getOutputStream(); + header.getBuffer().writeTo(outputStream); + outputStream.flush(); + } + + public AmqpHeader readAmqpHeader() throws Exception { + clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT); + InputStream is = clientSocket.getInputStream(); + + byte[] header = new byte[8]; + int read = is.read(header); + if (read == header.length) { + return new AmqpHeader(new Buffer(header)); + } else { + return null; + } + } + } + + @SuppressWarnings("unused") + private class AmqpHeader { + + final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'}); + + private Buffer buffer; + + AmqpHeader() { + this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0})); + } + + AmqpHeader(Buffer buffer) { + this(buffer, true); + } + + AmqpHeader(Buffer buffer, boolean validate) { + setBuffer(buffer, validate); + } + + public int getProtocolId() { + return buffer.get(4) & 0xFF; + } + + public void setProtocolId(int value) { + buffer.data[buffer.offset + 4] = (byte) value; + } + + public int getMajor() { + return buffer.get(5) & 0xFF; + } + + public void setMajor(int value) { + buffer.data[buffer.offset + 5] = (byte) value; + } + + public int getMinor() { + return buffer.get(6) & 0xFF; + } + + public void setMinor(int value) { + buffer.data[buffer.offset + 6] = (byte) value; + } + + public int getRevision() { + return buffer.get(7) & 0xFF; + } + + public void setRevision(int value) { + buffer.data[buffer.offset + 7] = (byte) value; + } + + public Buffer getBuffer() { + return buffer; + } + + public void setBuffer(Buffer value) { + setBuffer(value, true); + } + + public void setBuffer(Buffer value, boolean validate) { + if (validate && !value.startsWith(PREFIX) || value.length() != 8) { + throw new IllegalArgumentException("Not an AMQP header buffer"); + } + buffer = value.buffer(); + } + + public boolean hasValidPrefix() { + return buffer.startsWith(PREFIX); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < buffer.length(); ++i) { + char value = (char) buffer.get(i); + if (Character.isLetter(value)) { + builder.append(value); + } else { + builder.append(","); + builder.append((int) value); + } + } + return builder.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java index 9cd8f50..e636d83 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java @@ -22,7 +22,6 @@ import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; -import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.qpid.proton.message.Message; import org.junit.Test; @@ -57,7 +56,6 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { receiver2.flow(1); message.release(); - // Read the message again and validate its state message = receiver2.receive(10, TimeUnit.SECONDS); assertNotNull("did not receive message again", message); @@ -172,21 +170,4 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport { connection.close(); } - - public void sendMessages(String destinationName, int count) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - sender.send(message); - } - } finally { - connection.close(); - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java index 681ffbd..edf9459 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java @@ -19,11 +19,11 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; -import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.junit.Test; @@ -33,46 +33,81 @@ import org.junit.Test; public class AmqpReceiverDrainTest extends AmqpClientTestSupport { @Test(timeout = 60000) - public void testReceiverCanDrainMessages() throws Exception { + public void testReceiverCanDrainMessagesQueue() throws Exception { + doTestReceiverCanDrainMessages(false); + } + + @Test(timeout = 60000) + public void testReceiverCanDrainMessagesTopic() throws Exception { + doTestReceiverCanDrainMessages(true); + } + + private void doTestReceiverCanDrainMessages(boolean topic) throws Exception { + final String destinationName; + if (topic) { + destinationName = getTopicName(); + } else { + destinationName = getQueueName(); + } + int MSG_COUNT = 20; - sendMessages(getQueueName(), MSG_COUNT); AmqpClient client = createAmqpClient(); - AmqpConnection connection = client.connect(); + AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getQueueName()); + AmqpReceiver receiver = session.createReceiver(destinationName); + + sendMessages(destinationName, MSG_COUNT); + + Queue queueView = getProxyToQueue(destinationName); - Queue queueView = getProxyToQueue(getQueueName()); assertEquals(MSG_COUNT, queueView.getMessageCount()); + assertEquals(0, queueView.getDeliveringCount()); receiver.drain(MSG_COUNT); for (int i = 0; i < MSG_COUNT; ++i) { AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(message); + assertNotNull("Failed to read message: " + (i + 1), message); + IntegrationTestLogger.LOGGER.info("Read message: " + message.getMessageId()); message.accept(); } receiver.close(); - assertEquals(0, queueView.getMessageCount()); - connection.close(); } @Test(timeout = 60000) - public void testPullWithNoMessageGetDrained() throws Exception { + public void testPullWithNoMessageGetDrainedQueue() throws Exception { + doTestPullWithNoMessageGetDrained(false); + } + + @Test(timeout = 60000) + public void testPullWithNoMessageGetDrainedTopic() throws Exception { + doTestPullWithNoMessageGetDrained(true); + } + + private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception { + + final String destinationName; + if (topic) { + destinationName = getTopicName(); + } else { + destinationName = getQueueName(); + } AmqpClient client = createAmqpClient(); - AmqpConnection connection = client.connect(); + AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getQueueName()); + AmqpReceiver receiver = session.createReceiver(destinationName); receiver.flow(10); - Queue queueView = getProxyToQueue(getQueueName()); + Queue queueView = getProxyToQueue(destinationName); + assertEquals(0, queueView.getMessageCount()); - assertEquals(0, queueView.getDeliveringCount()); + assertEquals(0, queueView.getMessagesAcknowledged()); assertEquals(10, receiver.getReceiver().getRemoteCredit()); @@ -84,18 +119,36 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testPullOneFromRemote() throws Exception { - int MSG_COUNT = 20; - sendMessages(getQueueName(), MSG_COUNT); + public void testPullOneFromRemoteQueue() throws Exception { + doTestPullOneFromRemote(false); + } + + @Test(timeout = 60000) + public void testPullOneFromRemoteTopic() throws Exception { + doTestPullOneFromRemote(true); + } + + private void doTestPullOneFromRemote(boolean topic) throws Exception { AmqpClient client = createAmqpClient(); - AmqpConnection connection = client.connect(); + AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getQueueName()); + final String destinationName; + if (topic) { + destinationName = getTopicName(); + } else { + destinationName = getQueueName(); + } + + AmqpReceiver receiver = session.createReceiver(destinationName); - Queue queueView = getProxyToQueue(getQueueName()); + int MSG_COUNT = 20; + sendMessages(destinationName, MSG_COUNT); + + Queue queueView = getProxyToQueue(destinationName); assertEquals(MSG_COUNT, queueView.getMessageCount()); + assertEquals(0, queueView.getDeliveringCount()); assertEquals(0, receiver.getReceiver().getRemoteCredit()); @@ -107,24 +160,39 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { receiver.close(); - assertEquals(MSG_COUNT - 1, queueView.getMessageCount()); - assertEquals(1, queueView.getMessagesAcknowledged()); - connection.close(); } @Test(timeout = 60000) - public void testMultipleZeroResultPulls() throws Exception { + public void testMultipleZeroResultPullsQueue() throws Exception { + doTestMultipleZeroResultPulls(false); + } + + @Test(timeout = 60000) + public void testMultipleZeroResultPullsTopic() throws Exception { + doTestMultipleZeroResultPulls(true); + } + + private void doTestMultipleZeroResultPulls(boolean topic) throws Exception { + AmqpClient client = createAmqpClient(); - AmqpConnection connection = client.connect(); + AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpReceiver receiver = session.createReceiver(getQueueName()); + final String destinationName; + if (topic) { + destinationName = getTopicName(); + } else { + destinationName = getQueueName(); + } + + AmqpReceiver receiver = session.createReceiver(destinationName); receiver.flow(10); - Queue queueView = getProxyToQueue(getQueueName()); + Queue queueView = getProxyToQueue(destinationName); assertEquals(0, queueView.getMessageCount()); + assertEquals(0, queueView.getDeliveringCount()); assertEquals(10, receiver.getReceiver().getRemoteCredit()); @@ -139,27 +207,4 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport { connection.close(); } - - public void sendMessages(String destinationName, int count) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = null; - - try { - connection = client.connect(); - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setText("Test-Message-" + i); - sender.send(message); - } - - sender.close(); - } finally { - if (connection != null) { - connection.close(); - } - } - } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java index b47ad50..3aff030 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java @@ -16,13 +16,38 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; +import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.TerminusDurability; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Session; import org.junit.Test; /** @@ -31,6 +56,119 @@ import org.junit.Test; public class AmqpReceiverTest extends AmqpClientTestSupport { @Test(timeout = 60000) + public void testCreateQueueReceiver() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + Queue queue = getProxyToQueue(getQueueName()); + assertNotNull(queue); + + receiver.close(); + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateTopicReceiver() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTopicName()); + + Queue queue = getProxyToQueue(getQueueName()); + assertNotNull(queue); + + receiver.close(); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateQueueReceiverWithNoLocalSet() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map<Symbol, Object> filters = source.getFilter(); + + // Currently don't support noLocal on a Queue + if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) { + markAsInvalid("Broker did not return the NoLocal Filter on Attach"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(getQueueName(), null, true); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testCreateQueueReceiverWithJMSSelector() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map<Symbol, Object> filters = source.getFilter(); + + if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { + markAsInvalid("Broker did not return the JMS Filter on Attach"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + session.createReceiver(getQueueName(), "JMSPriority > 8"); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testInvalidFilter() throws Exception { + AmqpClient client = createAmqpClient(); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + try { + session.createReceiver(getQueueName(), "null = 'f''", true); + fail("should throw exception"); + } catch (Exception e) { + assertTrue(e.getCause() instanceof JMSException); + } + + connection.close(); + } + + @Test(timeout = 60000) public void testSenderSettlementModeSettledIsHonored() throws Exception { doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED); } @@ -96,4 +234,164 @@ public class AmqpReceiverTest extends AmqpClientTestSupport { receiver.close(); connection.close(); } + + @Test(timeout = 60000) + public void testClientIdIsSetInSubscriptionList() throws Exception { + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST)); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + connection.setContainerId("testClient"); + connection.connect(); + + try { + AmqpSession session = connection.createSession(); + + Source source = new Source(); + source.setDurable(TerminusDurability.UNSETTLED_STATE); + source.setCapabilities(Symbol.getSymbol("topic")); + source.setAddress("mytopic"); + session.createReceiver(source, "testSub"); + + SimpleString fo = new SimpleString("testClient.testSub:mytopic"); + assertNotNull(server.locateQueue(fo)); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testLinkDetachSentWhenQueueDeleted() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + server.destroyQueue(new SimpleString(getQueueName()), null, false, true); + + assertTrue("Receiver should have closed", Wait.waitFor(receiver::isClosed)); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception { + AddressSettings value = new AddressSettings(); + value.setAutoCreateQueues(false); + value.setAutoCreateAddresses(false); + server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + + Exception expectedException = null; + try { + session.createSender("AnAddressThatDoesNotExist"); + fail("Creating a sender here on an address that doesn't exist should fail"); + } catch (Exception e) { + expectedException = e; + } + + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains("amqp:not-found")); + assertTrue(expectedException.getMessage().contains("target address does not exist")); + } finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception { + AmqpClient client = createAmqpClient(); + + client.setValidator(new AmqpValidator() { + + @SuppressWarnings("unchecked") + @Override + public void inspectOpenedResource(Receiver receiver) { + + if (receiver.getRemoteSource() == null) { + markAsInvalid("Link opened with null source."); + } + + Source source = (Source) receiver.getRemoteSource(); + Map<Symbol, Object> filters = source.getFilter(); + + if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) { + markAsInvalid("Broker should not return unsupported filter on attach."); + } + } + }); + + Map<Symbol, DescribedType> filters = new HashMap<>(); + filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER); + + Source source = new Source(); + source.setAddress(getQueueName()); + source.setFilter(filters); + source.setDurable(TerminusDurability.NONE); + source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertEquals(0, server.getTotalConsumerCount()); + + session.createReceiver(source); + + assertEquals(1, server.getTotalConsumerCount()); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testReceiverCloseSendsRemoteClose() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + final AtomicBoolean closed = new AtomicBoolean(); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectClosedResource(Session session) { + IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext()); + } + + @Override + public void inspectDetachedResource(Receiver receiver) { + markAsInvalid("Broker should not detach receiver linked to closed session."); + } + + @Override + public void inspectClosedResource(Receiver receiver) { + IntegrationTestLogger.LOGGER.info("Receiver closed: " + receiver.getContext()); + closed.set(true); + } + }); + + AmqpConnection connection = addConnection(client.connect()); + assertNotNull(connection); + AmqpSession session = connection.createSession(); + assertNotNull(session); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + assertNotNull(receiver); + + receiver.close(); + + assertTrue("Did not process remote close as expected", closed.get()); + connection.getStateInspector().assertValid(); + + connection.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java index 748f10a..6459e76 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java @@ -104,7 +104,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport { } } - @Test + @Test(timeout = 60000) public void testScheduleWithDelay() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java index 8e41d71..f99fc14 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java @@ -17,21 +17,9 @@ package org.apache.activemq.artemis.tests.integration.amqp; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.security.Role; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.settings.HierarchicalRepository; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl; -import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -39,44 +27,26 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; import org.junit.Test; public class AmqpSecurityTest extends AmqpClientTestSupport { - private String user1 = "user1"; - private String password1 = "password1"; - @Override - protected ActiveMQServer createServer() throws Exception { - ActiveMQServer server = createServer(true, true); - ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); - securityManager.getConfiguration().addUser("foo", "bar"); - securityManager.getConfiguration().addRole("foo", "none"); - securityManager.getConfiguration().addUser(user1, password1); - securityManager.getConfiguration().addRole(user1, "none"); - HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository(); - HashSet<Role> value = new HashSet<>(); - value.add(new Role("none", false, true, true, true, true, true, true, true)); - securityRepository.addMatch(getQueueName(), value); - - serverManager = new JMSServerManagerImpl(server); - Configuration serverConfig = server.getConfiguration(); - serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ"))); - serverConfig.setSecurityEnabled(true); - serverManager.start(); - server.start(); - return server; + protected boolean isSecurityEnabled() { + return true; } @Test(timeout = 60000) public void testSaslAuthWithInvalidCredentials() throws Exception { AmqpConnection connection = null; - AmqpClient client = createAmqpClient("foo", "foo"); + AmqpClient client = createAmqpClient(fullUser, guestUser); try { connection = client.connect(); - fail("Should authenticate even with authzid set"); + fail("Should not authenticate when invalid credentials provided"); } catch (Exception ex) { + // Expected } finally { if (connection != null) { connection.close(); @@ -87,8 +57,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSaslAuthWithAuthzid() throws Exception { AmqpConnection connection = null; - AmqpClient client = createAmqpClient("foo", "bar"); - client.setAuthzid("foo"); + AmqpClient client = createAmqpClient(guestUser, guestPass); + client.setAuthzid(guestUser); try { connection = client.connect(); @@ -104,7 +74,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSaslAuthWithoutAuthzid() throws Exception { AmqpConnection connection = null; - AmqpClient client = createAmqpClient("foo", "bar"); + AmqpClient client = createAmqpClient(guestUser, guestPass); try { connection = client.connect(); @@ -119,20 +89,22 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSendAndRejected() throws Exception { - AmqpConnection connection = null; - AmqpClient client = createAmqpClient("foo", "bar"); CountDownLatch latch = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(guestUser, guestPass); client.setValidator(new AmqpValidator() { + @Override - public void inspectDeliveryUpdate(Delivery delivery) { - super.inspectDeliveryUpdate(delivery); + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { if (!delivery.remotelySettled()) { markAsInvalid("delivery is not remotely settled"); } + latch.countDown(); } }); - connection = addConnection(client.connect()); + + AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); AmqpSender sender = session.createSender(getQueueName()); @@ -145,8 +117,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { try { sender.send(message); } catch (IOException e) { - // } + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); connection.getStateInspector().assertValid(); connection.close(); @@ -154,11 +126,9 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { @Test(timeout = 60000) public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception { - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); - server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false); - - AmqpClient client = createAmqpClient(user1, password1); + AmqpClient client = createAmqpClient(guestUser, guestPass); AmqpConnection connection = client.connect(); + try { AmqpSession session = connection.createSession(); @@ -181,5 +151,4 @@ public class AmqpSecurityTest extends AmqpClientTestSupport { connection.close(); } } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 9cf256a..0cae79f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -16,28 +16,23 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; -import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; import static org.apache.activemq.transport.amqp.AmqpSupport.contains; -import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.JMSException; +import javax.jms.Topic; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; -import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; @@ -48,8 +43,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.jgroups.util.UUID; import org.junit.Test; @@ -63,19 +56,14 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class); - @Test(timeout = 60000) - public void testCreateQueueReceiver() throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getQueueName()); - - Queue queue = getProxyToQueue(getQueueName()); - assertNotNull(queue); + @Override + protected boolean isAutoCreateQueues() { + return false; + } - receiver.close(); - connection.close(); + @Override + protected boolean isAutoCreateAddresses() { + return false; } @Test(timeout = 60000) @@ -103,90 +91,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { assertEquals(0, queue.getMessageCount()); } - - @Test(timeout = 60000) - public void testCreateQueueReceiverWithJMSSelector() throws Exception { - AmqpClient client = createAmqpClient(); - - client.setValidator(new AmqpValidator() { - - @SuppressWarnings("unchecked") - @Override - public void inspectOpenedResource(Receiver receiver) { - - if (receiver.getRemoteSource() == null) { - markAsInvalid("Link opened with null source."); - } - - Source source = (Source) receiver.getRemoteSource(); - Map<Symbol, Object> filters = source.getFilter(); - - if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) { - markAsInvalid("Broker did not return the JMS Filter on Attach"); - } - } - }); - - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - session.createReceiver(getQueueName(), "JMSPriority > 8"); - - connection.getStateInspector().assertValid(); - connection.close(); - } - - @Test(timeout = 60000) - public void testCreateQueueReceiverWithNoLocalSet() throws Exception { - AmqpClient client = createAmqpClient(); - - client.setValidator(new AmqpValidator() { - - @SuppressWarnings("unchecked") - @Override - public void inspectOpenedResource(Receiver receiver) { - - if (receiver.getRemoteSource() == null) { - markAsInvalid("Link opened with null source."); - } - - Source source = (Source) receiver.getRemoteSource(); - Map<Symbol, Object> filters = source.getFilter(); - - // Currently don't support noLocal on a Queue - if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) { - markAsInvalid("Broker did not return the NoLocal Filter on Attach"); - } - } - }); - - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - session.createReceiver(getQueueName(), null, true); - - connection.getStateInspector().assertValid(); - connection.close(); - } - - @Test(timeout = 60000) - public void testInvalidFilter() throws Exception { - AmqpClient client = createAmqpClient(); - - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - try { - session.createReceiver(getQueueName(), "null = 'f''", true); - fail("should throw exception"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof JMSException); - //passed - } - - connection.close(); - } - @Test(timeout = 60000) public void testQueueReceiverReadMessage() throws Exception { sendMessages(getQueueName(), 1); @@ -210,108 +114,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { } @Test(timeout = 60000) - public void testQueueReceiverReadMessageWithDivert() throws Exception { - final String forwardingAddress = getQueueName() + "Divert"; - final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress); - server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false); - server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString()); - sendMessages(getQueueName(), 1); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(forwardingAddress); - - Queue queueView = getProxyToQueue(forwardingAddress); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - assertNotNull(receiver.receive(5, TimeUnit.SECONDS)); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception { - final String addressA = "addressA"; - final String queueA = "queueA"; - final String queueB = "queueB"; - final String queueC = "queueC"; - - ActiveMQServerControl serverControl = server.getActiveMQServerControl(); - serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); - - sendMessages("anycast://" + addressA, 1); - - assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); - } - - @Test(timeout = 60000) - public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception { - final String addressA = "addressA"; - final String queueA = "queueA"; - final String queueB = "queueB"; - final String queueC = "queueC"; - - ActiveMQServerControl serverControl = server.getActiveMQServerControl(); - serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); - - sendMessages(addressA, 1, RoutingType.ANYCAST); - - assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); - } - - @Test - public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception { - final String addressA = "addressA"; - final String queueA = "queueA"; - final String queueB = "queueB"; - final String queueC = "queueC"; - - ActiveMQServerControl serverControl = server.getActiveMQServerControl(); - serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); - - sendMessages("multicast://" + addressA, 1); - - assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); - assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - } - - @Test - public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception { - final String addressA = "addressA"; - final String queueA = "queueA"; - final String queueB = "queueB"; - final String queueC = "queueC"; - - ActiveMQServerControl serverControl = server.getActiveMQServerControl(); - serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); - serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); - serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); - - sendMessages(addressA, 1, RoutingType.MULTICAST); - - assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); - assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - } - - @Test(timeout = 60000) public void testMessageDurableFalse() throws Exception { sendMessages(getQueueName(), 1, false); @@ -870,7 +672,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message1.setMessageId("ID:Message:1"); sender.send(message1); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); receiver1.flow(1); message1 = receiver1.receive(50, TimeUnit.SECONDS); assertNotNull("Should have read a message", message1); @@ -884,7 +686,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { message2.setMessageId("ID:Message:2"); sender.send(message2); - assertEquals(1, queue.getMessageCount()); + assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1)); receiver1.flow(1); message2 = receiver1.receive(50, TimeUnit.SECONDS); assertNotNull("Should have read a message", message2); @@ -1018,7 +820,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } - @Test + @Test(timeout = 60000) public void testDeliveryDelayOfferedWhenRequested() throws Exception { AmqpClient client = createAmqpClient(); client.setValidator(new AmqpValidator() { @@ -1036,7 +838,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + AmqpSender sender = session.createSender(getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); assertNotNull(sender); connection.getStateInspector().assertValid(); @@ -1100,45 +902,119 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } - public void sendMessages(String destinationName, int count) throws Exception { - sendMessages(destinationName, count, null); + @Test(timeout = 60000) + public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Exception expectedException = null; + try { + session.createSender("AnAddressThatDoesNotExist"); + fail("Creating a sender here on an address that doesn't exist should fail"); + } catch (Exception e) { + expectedException = e; + } + + assertNotNull(expectedException); + assertTrue(expectedException.getMessage().contains("amqp:not-found")); + assertTrue(expectedException.getMessage().contains("target address does not exist")); + + connection.close(); } - public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception { + @Test(timeout = 60000) + public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { + String queueName = "TestQueueName"; + String address = "TestAddress"; + server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST)); + server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false); + AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); + try { AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(1); - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - if (routingType != null) { - message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType()); - } - sender.send(message); - } + AmqpMessage message = new AmqpMessage(); + message.setText("TestPayload"); + sender.send(message); + + AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS); + assertNotNull(receivedMessage); } finally { connection.close(); } } - public void sendMessages(String destinationName, int count, boolean durable) throws Exception { + @Test(timeout = 60000) + public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception { + doTestSendReceiveLotsOfDurableMessages(Queue.class); + } + + @Test(timeout = 60000) + public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception { + doTestSendReceiveLotsOfDurableMessages(Topic.class); + } + + private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception { + final int MSG_COUNT = 1000; + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); + AmqpSession session = connection.createSession(); + + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + final AtomicBoolean error = new AtomicBoolean(false); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final String address; + if (Queue.class.equals(destType)) { + address = getQueueName(); + } else { + address = getTopicName(); + } + + final AmqpReceiver receiver = session.createReceiver(address); + receiver.flow(MSG_COUNT); + + AmqpSender sender = session.createSender(address); - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - message.setDurable(durable); - sender.send(message); + Queue queueView = getProxyToQueue(address); + + executor.execute(new Runnable() { + + @Override + public void run() { + for (int i = 0; i < MSG_COUNT; i++) { + try { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + received.accept(); + done.countDown(); + } catch (Exception ex) { + LOG.info("Caught error: {}", ex.getClass().getSimpleName()); + error.set(true); + } + } } - } finally { - connection.close(); + }); + + for (int i = 0; i < MSG_COUNT; i++) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("msg" + i); + sender.send(message); } + + assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS)); + assertFalse("should not be any errors on receive", error.get()); + assertTrue("Should be no inflight messages.", Wait.waitFor(() -> queueView.getDeliveringCount() == 0)); + + sender.close(); + receiver.close(); + connection.close(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java index 7b8cbef..8c95064 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java @@ -16,14 +16,22 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; import org.junit.Test; /** @@ -101,4 +109,74 @@ public class AmqpSenderTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testUnsettledSender() throws Exception { + final int MSG_COUNT = 1000; + + final CountDownLatch settled = new CountDownLatch(MSG_COUNT); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + connection.setStateInspector(new AmqpValidator() { + + @Override + public void inspectDeliveryUpdate(Sender sender, Delivery delivery) { + if (delivery.remotelySettled()) { + IntegrationTestLogger.LOGGER.trace("Remote settled message for sender: " + sender.getName()); + settled.countDown(); + } + } + }); + + AmqpSession session = connection.createSession(); + AmqpSender sender = session.createSender(getQueueName(), false); + + for (int i = 1; i <= MSG_COUNT; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + i); + sender.send(message); + + if (i % 1000 == 0) { + IntegrationTestLogger.LOGGER.info("Sent message: " + i); + } + } + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT)); + + sender.close(); + + assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES)); + + connection.close(); + } + + @Test(timeout = 60000) + public void testPresettledSender() throws Exception { + final int MSG_COUNT = 1000; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName(), true); + + for (int i = 1; i <= MSG_COUNT; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message: " + i); + sender.send(message); + + if (i % 1000 == 0) { + IntegrationTestLogger.LOGGER.info("Sent message: " + i); + } + } + + Queue queueView = getProxyToQueue(getQueueName()); + assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT)); + + sender.close(); + connection.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java new file mode 100644 index 0000000..0048be5 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Session; +import org.junit.Test; + +public class AmqpSessionTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testCreateSession() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + connection.close(); + } + + @Test(timeout = 60000) + public void testSessionClosedDoesNotGetReceiverDetachFromRemote() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectClosedResource(Session session) { + IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext()); + } + + @Override + public void inspectDetachedResource(Receiver receiver) { + markAsInvalid("Broker should not detach receiver linked to closed session."); + } + + @Override + public void inspectClosedResource(Receiver receiver) { + markAsInvalid("Broker should not close receiver linked to closed session."); + } + }); + + AmqpConnection connection = addConnection(client.connect()); + assertNotNull(connection); + AmqpSession session = connection.createSession(); + assertNotNull(session); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + assertNotNull(receiver); + + session.close(); + + connection.getStateInspector().assertValid(); + connection.close(); + } +}
