http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java deleted file mode 100644 index 7933fec..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java +++ /dev/null @@ -1,1788 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.InvalidClientIDException; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.QueueBrowser; -import javax.jms.ResourceAllocationException; -import javax.jms.Session; -import javax.jms.StreamMessage; -import javax.jms.TemporaryQueue; -import javax.jms.TextMessage; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; - -import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.postoffice.Bindings; -import org.apache.activemq.artemis.core.remoting.CloseListener; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.Queue; -import org.apache.activemq.artemis.core.server.impl.AddressInfo; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; -import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; -import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; -import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; -import org.apache.activemq.artemis.tests.util.Wait; -import org.apache.activemq.artemis.utils.Base64; -import org.apache.activemq.artemis.utils.ByteUtil; -import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.activemq.artemis.utils.TimeUtils; -import org.apache.activemq.artemis.utils.UUIDGenerator; -import org.apache.activemq.transport.amqp.client.AmqpClient; -import org.apache.activemq.transport.amqp.client.AmqpConnection; -import org.apache.activemq.transport.amqp.client.AmqpMessage; -import org.apache.activemq.transport.amqp.client.AmqpReceiver; -import org.apache.activemq.transport.amqp.client.AmqpSender; -import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.qpid.jms.JmsConnectionFactory; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Source; -import org.apache.qpid.proton.amqp.messaging.TerminusDurability; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class ProtonTest extends ProtonTestBase { - - private static final String amqpConnectionUri = "amqp://localhost:5672"; - - private static final String tcpAmqpConnectionUri = "tcp://localhost:5672"; - - private static final long maxSizeBytes = 1 * 1024 * 1024; - - private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024; - - private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); - - private int messagesSent = 0; - - // this will ensure that all tests in this class are run twice, - // once with "true" passed to the class' constructor and once with "false" - @Parameterized.Parameters(name = "{0}") - public static Collection getParameters() { - - // these 3 are for comparison - return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}}); - } - - - ConnectionFactory factory; - - private final int protocol; - - public ProtonTest(String name, int protocol) { - this.coreAddress = "exampleQueue"; - this.protocol = protocol; - if (protocol == 0 || protocol == 3) { - this.address = coreAddress; - } else { - this.address = "exampleQueue"; - } - } - - private final String coreAddress; - private final String address; - private Connection connection; - - - @Override - protected ActiveMQServer createAMQPServer(int port) throws Exception { - ActiveMQServer server = super.createAMQPServer(port); - server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1"); - server.setMBeanServer(mBeanServer); - server.getConfiguration().setJMXManagementEnabled(true); - return server; - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - Configuration serverConfig = server.getConfiguration(); - Map<String, AddressSettings> settings = serverConfig.getAddressesSettings(); - assertNotNull(settings); - AddressSettings addressSetting = settings.get("#"); - if (addressSetting == null) { - addressSetting = new AddressSettings(); - settings.put("#", addressSetting); - } - addressSetting.setAutoCreateQueues(false); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST)); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST)); - server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false); - server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false); - server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false); - server.addAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST)); - server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false); - /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "5"), new SimpleString("amqp_testtopic" + "5"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "6"), new SimpleString("amqp_testtopic" + "6"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false); - server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/ - - connection = createConnection(); - - } - - @Override - @After - public void tearDown() throws Exception { - try { - Thread.sleep(250); - if (connection != null) { - connection.close(); - } - } finally { - super.tearDown(); - } - } - - @Test - public void testSendAndReceiveOnTopic() throws Exception { - Connection connection = createConnection("myClientId"); - try { - TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("amqp_testtopic"); - TopicSubscriber consumer = session.createSubscriber(topic); - TopicPublisher producer = session.createPublisher(topic); - - TextMessage message = session.createTextMessage("test-message"); - producer.send(message); - - producer.close(); - - connection.start(); - - message = (TextMessage) consumer.receive(1000); - assertNotNull(message); - assertNotNull(message.getText()); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test - public void testAddressControlSendMessage() throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); - - AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer); - Assert.assertEquals(1, addressControl.getQueueNames().length); - addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, userName, password); - - Wait.waitFor(() -> addressControl.getMessageCount() == 1); - - Assert.assertEquals(1, addressControl.getMessageCount()); - - Connection connection = createConnection("myClientId"); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = session.createQueue(address.toString()); - MessageConsumer consumer = session.createConsumer(queue); - Message message = consumer.receive(500); - assertNotNull(message); - byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; - ((BytesMessage)message).readBytes(buffer); - assertEquals("test", new String(buffer)); - session.close(); - connection.close(); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test - public void testAddressControlSendMessageWithText() throws Exception { - SimpleString address = RandomUtil.randomSimpleString(); - server.createQueue(address, RoutingType.ANYCAST, address, null, true, false); - - AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer); - Assert.assertEquals(1, addressControl.getQueueNames().length); - addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, "test", false, userName, password); - - Wait.waitFor(() -> addressControl.getMessageCount() == 1); - - Assert.assertEquals(1, addressControl.getMessageCount()); - - Connection connection = createConnection("myClientId"); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = session.createQueue(address.toString()); - MessageConsumer consumer = session.createConsumer(queue); - Message message = consumer.receive(500); - assertNotNull(message); - String text = ((TextMessage) message).getText(); - assertEquals("test", text); - session.close(); - connection.close(); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test - public void testDurableSubscriptionUnsubscribe() throws Exception { - Connection connection = createConnection("myClientId"); - try { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("amqp_testtopic"); - TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub"); - session.close(); - connection.close(); - connection = createConnection("myClientId"); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - myDurSub = session.createDurableSubscriber(topic, "myDurSub"); - myDurSub.close(); - Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); - session.unsubscribe("myDurSub"); - Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub"))); - session.close(); - connection.close(); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test - public void testTemporarySubscriptionDeleted() throws Exception { - try { - TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("amqp_testtopic"); - TopicSubscriber myDurSub = session.createSubscriber(topic); - Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic")); - Assert.assertEquals(2, bindingsForAddress.getBindings().size()); - session.close(); - final CountDownLatch latch = new CountDownLatch(1); - server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() { - @Override - public void connectionClosed() { - latch.countDown(); - } - }); - connection.close(); - latch.await(5, TimeUnit.SECONDS); - bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic")); - Assert.assertEquals(1, bindingsForAddress.getBindings().size()); - } finally { - if (connection != null) { - connection.close(); - } - } - } - - @Test - public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { - - String destinationAddress = address + 1; - AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(destinationAddress); - assertTrue(sender.getSender().getCredit() == 1); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testTemporaryQueue() throws Throwable { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - - TextMessage message = session.createTextMessage(); - message.setText("Message temporary"); - p.send(message); - - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - message = (TextMessage) cons.receive(5000); - Assert.assertNotNull(message); - } - - @Test - public void testCommitProducer() throws Throwable { - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - javax.jms.Queue queue = createQueue(address); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(); - message.setText("Message:" + i); - p.send(message); - } - session.commit(); - session.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - //because tx commit is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); - } - - @Test - public void testRollbackProducer() throws Throwable { - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - javax.jms.Queue queue = createQueue(address); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(); - message.setText("Message:" + i); - p.send(message); - } - session.rollback(); - session.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - Assert.assertEquals(q.getMessageCount(), 0); - } - - @Test - public void testCommitConsumer() throws Throwable { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = createQueue(address); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(); - message.setText("Message:" + i); - p.send(message); - } - session.close(); - - session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - for (int i = 0; i < 10; i++) { - TextMessage message = (TextMessage) cons.receive(5000); - Assert.assertNotNull(message); - Assert.assertEquals("Message:" + i, message.getText()); - } - session.commit(); - session.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - Assert.assertEquals(q.getMessageCount(), 0); - } - - @Test - public void testRollbackConsumer() throws Throwable { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = createQueue(address); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < 10; i++) { - TextMessage message = session.createTextMessage(); - message.setText("Message:" + i); - p.send(message); - } - session.close(); - - session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - for (int i = 0; i < 10; i++) { - TextMessage message = (TextMessage) cons.receive(5000); - Assert.assertNotNull(message); - Assert.assertEquals("Message:" + i, message.getText()); - } - session.rollback(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - //because tx rollback is executed async on broker, we use a timed wait. - assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10)); - - } - - @Test - public void testObjectMessage() throws Throwable { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = createQueue(address); - MessageProducer p = session.createProducer(queue); - ArrayList<String> list = new ArrayList<>(); - list.add("aString"); - ObjectMessage objectMessage = session.createObjectMessage(list); - p.send(objectMessage); - session.close(); - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - objectMessage = (ObjectMessage) cons.receive(5000); - assertNotNull(objectMessage); - list = (ArrayList<String>) objectMessage.getObject(); - assertEquals(list.get(0), "aString"); - connection.close(); - } - - @Test - public void testResourceLimitExceptionOnAddressFull() throws Exception { - setAddressFullBlockPolicy(); - String destinationAddress = address + 1; - fillAddress(destinationAddress); - - long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); - assertTrue(addressSize >= maxSizeBytesRejectThreshold); - } - - @Test - public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { - setAddressFullBlockPolicy(); - - String destinationAddress = address + 1; - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination d = session.createQueue(destinationAddress); - MessageProducer p = session.createProducer(d); - - fillAddress(destinationAddress); - - Exception e = null; - try { - p.send(session.createBytesMessage()); - } catch (ResourceAllocationException rae) { - e = rae; - } - assertTrue(e instanceof ResourceAllocationException); - assertTrue(e.getMessage().contains("resource-limit-exceeded")); - - long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); - assertTrue(addressSize >= maxSizeBytesRejectThreshold); - } - - @Test(timeout = 10000) - public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { - setAddressFullBlockPolicy(); - - - String destinationAddress = address + 1; - AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(destinationAddress); - - // Use blocking send to ensure buffered messages do not interfere with credit. - sender.setSendTimeout(-1); - sendUntilFull(sender); - - // This should be -1. A single message is buffered in the client, and 0 credit has been allocated. - assertTrue(sender.getSender().getCredit() == -1); - - long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize(); - assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { - setAddressFullBlockPolicy(); - - String destinationAddress = address + 1; - fillAddress(destinationAddress); - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(destinationAddress); - - // Wait for a potential flow frame. - Thread.sleep(500); - assertEquals(0, sender.getSender().getCredit()); - - // Empty Address except for 1 message used later. - AmqpReceiver receiver = session.createReceiver(destinationAddress); - receiver.flow(100); - - AmqpMessage m; - for (int i = 0; i < messagesSent - 1; i++) { - m = receiver.receive(5000, TimeUnit.MILLISECONDS); - m.accept(); - } - - // Wait for address to unblock and flow frame to arrive - Thread.sleep(500); - - assertTrue(sender.getSender().getCredit() >= 0); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception { - setAddressFullBlockPolicy(); - - fillAddress(address + 1); - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(address + 1); - // Wait for a potential flow frame. - Thread.sleep(1000); - assertEquals(0, sender.getSender().getCredit()); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable { - setAddressFullBlockPolicy(); - - // Create the link attach before filling the address to ensure the link is allocated credit. - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(address); - sender.setPresettle(true); - - fillAddress(address); - - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[50 * 1024]; - message.setBytes(payload); - - Exception expectedException = null; - try { - session.begin(); - sender.send(message); - session.commit(); - } catch (Exception e) { - expectedException = e; - } finally { - amqpConnection.close(); - } - - assertNotNull(expectedException); - assertTrue(expectedException.getMessage().contains("resource-limit-exceeded")); - assertTrue(expectedException.getMessage().contains("Address is full: " + address)); - } - - /** - * Fills an address. Careful when using this method. Only use when rejected messages are switched on. - * - * @param address - * @return - * @throws Exception - */ - private void fillAddress(String address) throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - Exception exception = null; - try { - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(address); - sendUntilFull(sender); - } catch (Exception e) { - exception = e; - } finally { - amqpConnection.close(); - } - - // Should receive a rejected error - assertNotNull(exception); - assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded")); - } - - private void sendUntilFull(final AmqpSender sender) throws Exception { - final AmqpMessage message = new AmqpMessage(); - byte[] payload = new byte[50 * 1024]; - message.setBytes(payload); - - final int maxMessages = 50; - final AtomicInteger sentMessages = new AtomicInteger(0); - final Exception[] errors = new Exception[1]; - final CountDownLatch timeout = new CountDownLatch(1); - - Runnable sendMessages = new Runnable() { - @Override - public void run() { - try { - for (int i = 0; i < maxMessages; i++) { - sender.send(message); - System.out.println("Sent " + i); - sentMessages.getAndIncrement(); - } - timeout.countDown(); - } catch (IOException e) { - errors[0] = e; - } - } - }; - - Thread t = new Thread(sendMessages); - - try { - t.start(); - - timeout.await(1, TimeUnit.SECONDS); - - messagesSent = sentMessages.get(); - if (errors[0] != null) { - throw errors[0]; - } - } finally { - t.interrupt(); - t.join(1000); - Assert.assertFalse(t.isAlive()); - } - } - - @Test - public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception { - AddressSettings value = new AddressSettings(); - value.setAutoCreateJmsQueues(false); - value.setAutoCreateQueues(false); - value.setAutoCreateAddresses(false); - value.setAutoCreateJmsTopics(false); - server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value); - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - AmqpSession session = amqpConnection.createSession(); - - Exception expectedException = null; - try { - session.createSender("AnAddressThatDoesNotExist"); - fail("Creating a sender here on an address that doesn't exist should fail"); - } catch (Exception e) { - expectedException = e; - } - - assertNotNull(expectedException); - assertTrue(expectedException.getMessage().contains("amqp:not-found")); - assertTrue(expectedException.getMessage().contains("target address does not exist")); - amqpConnection.close(); - } - - @Test - public void testLinkDetachSentWhenQueueDeleted() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - final AmqpConnection amqpConnection = client.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - - AmqpReceiver receiver = session.createReceiver(coreAddress); - server.destroyQueue(new SimpleString(coreAddress), null, false, true); - - Wait.waitFor(receiver::isClosed); - assertTrue(receiver.isClosed()); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testCloseIsSentOnConnectionClose() throws Exception { - connection.close(); - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - final AmqpConnection amqpConnection = client.connect(); - try { - for (RemotingConnection connection : server.getRemotingService().getConnections()) { - server.getRemotingService().removeConnection(connection); - connection.disconnect(true); - } - - Wait.waitFor(amqpConnection::isClosed); - - assertTrue(amqpConnection.isClosed()); - assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition()); - } finally { - amqpConnection.close(); - } - } - - - @Test - public void testClientIdIsSetInSubscriptionList() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST)); - AmqpConnection amqpConnection = client.createConnection(); - amqpConnection.setContainerId("testClient"); - amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic"))); - amqpConnection.connect(); - try { - AmqpSession session = amqpConnection.createSession(); - - Source source = new Source(); - source.setDurable(TerminusDurability.UNSETTLED_STATE); - source.setCapabilities(Symbol.getSymbol("topic")); - source.setAddress("mytopic"); - AmqpReceiver receiver = session.createReceiver(source, "testSub"); - - SimpleString fo = new SimpleString("testClient.testSub:mytopic"); - assertNotNull(server.locateQueue(fo)); - - } catch (Exception e) { - e.printStackTrace(); - } finally { - amqpConnection.close(); - } - } - - @Test - public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception { - - String queueName = "TestQueueName"; - String address = "TestAddress"; - server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST)); - server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false); - - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection amqpConnection = client.connect(); - AmqpSession session = amqpConnection.createSession(); - AmqpSender sender = session.createSender(address); - AmqpReceiver receiver = session.createReceiver(address); - receiver.flow(1); - - AmqpMessage message = new AmqpMessage(); - message.setText("TestPayload"); - sender.send(message); - - AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS); - assertNotNull(receivedMessage); - amqpConnection.close(); - } - - @Test - public void testReplyTo() throws Throwable { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); - MessageProducer p = session.createProducer(queue); - - TextMessage message = session.createTextMessage(); - message.setText("Message temporary"); - message.setJMSReplyTo(createQueue(address)); - p.send(message); - - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - message = (TextMessage) cons.receive(5000); - assertNotNull(message); - Destination jmsReplyTo = message.getJMSReplyTo(); - Assert.assertNotNull(jmsReplyTo); - Assert.assertNotNull(message); - } - - @Test - public void testReplyToNonJMS() throws Throwable { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue queue = session.createTemporaryQueue(); - System.out.println("queue:" + queue.getQueueName()); - MessageProducer p = session.createProducer(queue); - - TextMessage message = session.createTextMessage(); - message.setText("Message temporary"); - message.setJMSReplyTo(createQueue("someaddress")); - p.send(message); - - MessageConsumer cons = session.createConsumer(queue); - connection.start(); - - message = (TextMessage) cons.receive(5000); - Destination jmsReplyTo = message.getJMSReplyTo(); - Assert.assertNotNull(jmsReplyTo); - Assert.assertNotNull(message); - - } - - @Test - public void testOutboundConnection() throws Throwable { - final ActiveMQServer remote = createAMQPServer(5673); - remote.start(); - try { - Wait.waitFor(remote::isActive); - } catch (Exception e) { - remote.stop(); - throw e; - } - - final Map<String, Object> config = new LinkedHashMap<>(); - config.put(TransportConstants.HOST_PROP_NAME, "localhost"); - config.put(TransportConstants.PORT_PROP_NAME, "5673"); - ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty()); - ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server); - NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager); - connector.start(); - connector.createConnection(); - - try { - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return remote.getConnectionCount() > 0; - } - }); - assertEquals(1, remote.getConnectionCount()); - - lifeCycleListener.stop(); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisfied() throws Exception { - return remote.getConnectionCount() == 0; - } - }); - assertEquals(0, remote.getConnectionCount()); - } finally { - lifeCycleListener.stop(); - remote.stop(); - } - } - - /* - // Uncomment testLoopBrowser to validate the hunging on the test - @Test - public void testLoopBrowser() throws Throwable { - for (int i = 0 ; i < 1000; i++) { - System.out.println("#test " + i); - testBrowser(); - tearDown(); - setUp(); - } - } */ - - /** - * This test eventually fails because of: https://issues.apache.org/jira/browse/QPID-4901 - * - * @throws Throwable - */ - //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug - public void testBrowser() throws Throwable { - - boolean success = false; - - for (int i = 0; i < 10; i++) { - // As this test was hunging, we added a protection here to fail it instead. - // it seems something on the qpid client, so this failure belongs to them and we can ignore it on - // our side (ActiveMQ) - success = runWithTimeout(new RunnerWithEX() { - @Override - public void run() throws Throwable { - int numMessages = 50; - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("msg:" + i); - p.send(message); - } - - connection.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - - connection = createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - QueueBrowser browser = session.createBrowser(queue); - Enumeration enumeration = browser.getEnumeration(); - int count = 0; - while (enumeration.hasMoreElements()) { - Message msg = (Message) enumeration.nextElement(); - Assert.assertNotNull("" + count, msg); - Assert.assertTrue("" + msg, msg instanceof TextMessage); - String text = ((TextMessage) msg).getText(); - Assert.assertEquals(text, "msg:" + count++); - } - Assert.assertEquals(count, numMessages); - connection.close(); - Assert.assertEquals(getMessageCount(q), numMessages); - } - }, 5000); - - if (success) { - break; - } else { - System.err.println("Had to make it fail!!!"); - tearDown(); - setUp(); - } - } - - // There is a bug on the qpid client library currently, we can expect having to interrupt the thread on browsers. - // but we can't have it on 10 iterations... something must be broken if that's the case - Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", success); - } - - @Test - public void testConnection() throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer cons = session.createConsumer(createQueue(address)); - - org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(coreAddress)); - - assertEquals(1, serverQueue.getConsumerCount()); - - cons.close(); - - for (int i = 0; i < 100 && serverQueue.getConsumerCount() != 0; i++) { - Thread.sleep(500); - } - - assertEquals(0, serverQueue.getConsumerCount()); - - session.close(); - - } - - @Test - public void testMessagesSentTransactional() throws Exception { - int numMessages = 1000; - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer p = session.createProducer(queue); - byte[] bytes = new byte[2048]; - new Random().nextBytes(bytes); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("msg:" + i); - p.send(message); - } - session.commit(); - connection.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) { - Thread.sleep(1); - } - Assert.assertEquals(numMessages, getMessageCount(q)); - } - - @Test - public void testMessagesSentTransactionalRolledBack() throws Exception { - int numMessages = 1; - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer p = session.createProducer(queue); - byte[] bytes = new byte[2048]; - new Random().nextBytes(bytes); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("msg:" + i); - p.send(message); - } - session.close(); - connection.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - Assert.assertEquals(getMessageCount(q), 0); - } - - @Test - public void testCancelMessages() throws Exception { - int numMessages = 10; - long time = System.currentTimeMillis(); - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - byte[] bytes = new byte[2048]; - new Random().nextBytes(bytes); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("msg:" + i); - p.send(message); - } - connection.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) { - Thread.sleep(1); - } - - Assert.assertEquals(numMessages, getMessageCount(q)); - //now create a new connection and receive - connection = createConnection(); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - Thread.sleep(100); - consumer.close(); - connection.close(); - Assert.assertEquals(numMessages, getMessageCount(q)); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testClientAckMessages() throws Exception { - int numMessages = 10; - long time = System.currentTimeMillis(); - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - byte[] bytes = new byte[2048]; - new Random().nextBytes(bytes); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("msg:" + i); - p.send(message); - } - connection.close(); - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - - for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) { - Thread.sleep(1); - } - Assert.assertEquals(numMessages, getMessageCount(q)); - //now create a new connection and receive - connection = createConnection(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < numMessages; i++) { - Message msg = consumer.receive(5000); - if (msg == null) { - System.out.println("ProtonTest.testManyMessages"); - } - Assert.assertNotNull("" + i, msg); - Assert.assertTrue("" + msg, msg instanceof TextMessage); - String text = ((TextMessage) msg).getText(); - //System.out.println("text = " + text); - Assert.assertEquals(text, "msg:" + i); - msg.acknowledge(); - } - - consumer.close(); - connection.close(); - - // Wait for Acks to be processed and message removed from queue. - Thread.sleep(500); - - Assert.assertEquals(0, getMessageCount(q)); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testMessagesReceivedInParallel() throws Throwable { - final int numMessages = 50000; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - final ArrayList<Throwable> exceptions = new ArrayList<>(); - - Thread t = new Thread(new Runnable() { - @Override - public void run() { - Connection connectionConsumer = null; - try { - // TODO the test may starve if using the same connection (dead lock maybe?) - connectionConsumer = createConnection(); - // connectionConsumer = connection; - connectionConsumer.start(); - Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - long n = 0; - int count = numMessages; - while (count > 0) { - try { - if (++n % 1000 == 0) { - System.out.println("received " + n + " messages"); - } - - Message m = consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m); - count--; - } catch (JMSException e) { - e.printStackTrace(); - break; - } - } - } catch (Throwable e) { - exceptions.add(e); - e.printStackTrace(); - } finally { - try { - // if the createconnecion wasn't commented out - if (connectionConsumer != connection) { - connectionConsumer.close(); - } - } catch (Throwable ignored) { - // NO OP - } - } - } - }); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - t.start(); - - MessageProducer p = session.createProducer(queue); - p.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - for (int i = 0; i < numMessages; i++) { - BytesMessage message = session.createBytesMessage(); - message.writeUTF("Hello world!!!!" + i); - message.setIntProperty("count", i); - p.send(message); - } - t.join(); - - for (Throwable e : exceptions) { - throw e; - } - Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable(); - - connection.close(); - Assert.assertEquals(0, getMessageCount(q)); - - long taken = (System.currentTimeMillis() - time); - System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages); - - double messagesPerSecond = ((double) numMessages / (double) taken) * 1000; - - System.out.println(((int) messagesPerSecond) + " messages per second"); - - } - - @Test - public void testSimpleBinary() throws Throwable { - final int numMessages = 500; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - byte[] bytes = new byte[0xf + 1]; - for (int i = 0; i <= 0xf; i++) { - bytes[i] = (byte) i; - } - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - System.out.println("Sending " + i); - BytesMessage message = session.createBytesMessage(); - - message.writeBytes(bytes); - message.setIntProperty("count", i); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - BytesMessage m = (BytesMessage) consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); - - m.reset(); - - long size = m.getBodyLength(); - byte[] bytesReceived = new byte[(int) size]; - m.readBytes(bytesReceived); - - System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count")); - - Assert.assertArrayEquals(bytes, bytesReceived); - } - - // assertEquals(0, q.getMessageCount()); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testSimpleDefault() throws Throwable { - final int numMessages = 500; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - byte[] bytes = new byte[0xf + 1]; - for (int i = 0; i <= 0xf; i++) { - bytes[i] = (byte) i; - } - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - System.out.println("Sending " + i); - Message message = session.createMessage(); - - message.setIntProperty("count", i); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - Message m = consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); - } - - // assertEquals(0, q.getMessageCount()); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testSimpleMap() throws Throwable { - final int numMessages = 100; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - System.out.println("Sending " + i); - MapMessage message = session.createMapMessage(); - - message.setInt("i", i); - message.setIntProperty("count", i); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - MapMessage m = (MapMessage) consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); - - Assert.assertEquals(i, m.getInt("i")); - Assert.assertEquals(i, m.getIntProperty("count")); - } - - // assertEquals(0, q.getMessageCount()); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testSimpleStream() throws Throwable { - final int numMessages = 100; - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - StreamMessage message = session.createStreamMessage(); - message.writeInt(i); - message.writeBoolean(true); - message.writeString("test"); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - StreamMessage m = (StreamMessage) consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); - - Assert.assertEquals(i, m.readInt()); - Assert.assertEquals(true, m.readBoolean()); - Assert.assertEquals("test", m.readString()); - } - - } - - @Test - public void testSimpleText() throws Throwable { - final int numMessages = 100; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - System.out.println("Sending " + i); - TextMessage message = session.createTextMessage("text" + i); - message.setStringProperty("text", "text" + i); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - TextMessage m = (TextMessage) consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m); - Assert.assertEquals("text" + i, m.getText()); - } - - // assertEquals(0, q.getMessageCount()); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable { - String name = "exampleQueue1"; - - int numMessages = 50; - - System.out.println("1. Send messages into queue"); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = session.createQueue(name); - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - TextMessage message = session.createTextMessage(); - message.setText("Message temporary"); - p.send(message); - } - p.close(); - session.close(); - - System.out.println("2. Receive one by one, each in its own session"); - for (int i = 0; i < numMessages; i++) { - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = session.createQueue(name); - MessageConsumer c = session.createConsumer(queue); - Message m = c.receive(1000); - p.close(); - session.close(); - } - - System.out.println("3. Try to receive 10 in the same session"); - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - queue = session.createQueue(name); - MessageConsumer c = session.createConsumer(queue); - for (int i = 0; i < numMessages; i++) { - Message m = c.receive(1000); - } - p.close(); - session.close(); - } - - @Test - public void testSimpleObject() throws Throwable { - final int numMessages = 1; - long time = System.currentTimeMillis(); - final javax.jms.Queue queue = createQueue(address); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer p = session.createProducer(queue); - for (int i = 0; i < numMessages; i++) { - System.out.println("Sending " + i); - ObjectMessage message = session.createObjectMessage(new AnythingSerializable(i)); - p.send(message); - } - - Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final MessageConsumer consumer = sessionConsumer.createConsumer(queue); - - for (int i = 0; i < numMessages; i++) { - ObjectMessage msg = (ObjectMessage) consumer.receive(5000); - Assert.assertNotNull("Could not receive message count=" + i + " on consumer", msg); - - AnythingSerializable someSerialThing = (AnythingSerializable) msg.getObject(); - Assert.assertEquals(i, someSerialThing.getCount()); - } - - // assertEquals(0, q.getMessageCount()); - long taken = (System.currentTimeMillis() - time) / 1000; - System.out.println("taken = " + taken); - } - - @Test - public void testSelector() throws Exception { - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - TextMessage message = session.createTextMessage(); - message.setText("msg:0"); - p.send(message); - message = session.createTextMessage(); - message.setText("msg:1"); - message.setStringProperty("color", "RED"); - p.send(message); - connection.start(); - MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'"); - TextMessage m = (TextMessage) messageConsumer.receive(5000); - Assert.assertNotNull(m); - Assert.assertEquals("msg:1", m.getText()); - Assert.assertEquals(m.getStringProperty("color"), "RED"); - connection.close(); - } - - @Test - public void testProperties() throws Exception { - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - TextMessage message = session.createTextMessage(); - message.setText("msg:0"); - message.setBooleanProperty("true", true); - message.setBooleanProperty("false", false); - message.setStringProperty("foo", "bar"); - message.setDoubleProperty("double", 66.6); - message.setFloatProperty("float", 56.789f); - message.setIntProperty("int", 8); - message.setByteProperty("byte", (byte) 10); - p.send(message); - p.send(message); - connection.start(); - MessageConsumer messageConsumer = session.createConsumer(queue); - TextMessage m = (TextMessage) messageConsumer.receive(5000); - Assert.assertNotNull(m); - Assert.assertEquals("msg:0", m.getText()); - Assert.assertEquals(m.getBooleanProperty("true"), true); - Assert.assertEquals(m.getBooleanProperty("false"), false); - Assert.assertEquals(m.getStringProperty("foo"), "bar"); - Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001); - Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001); - Assert.assertEquals(m.getIntProperty("int"), 8); - Assert.assertEquals(m.getByteProperty("byte"), (byte) 10); - m = (TextMessage) messageConsumer.receive(5000); - Assert.assertNotNull(m); - connection.close(); - } - - @Test - public void testClientID() throws Exception { - Connection testConn1 = createConnection(false); - Connection testConn2 = createConnection(false); - try { - testConn1.setClientID("client-id1"); - try { - testConn1.setClientID("client-id2"); - fail("didn't get expected exception"); - } catch (javax.jms.IllegalStateException e) { - //expected - } - - try { - testConn2.setClientID("client-id1"); - fail("didn't get expected exception"); - } catch (InvalidClientIDException e) { - //expected - } - } finally { - testConn1.close(); - testConn2.close(); - } - - try { - testConn1 = createConnection(false); - testConn2 = createConnection(false); - testConn1.setClientID("client-id1"); - testConn2.setClientID("client-id2"); - } finally { - testConn1.close(); - testConn2.close(); - } - } - - @Test - public void testFilterJMSMessageID() throws Exception { - javax.jms.Queue queue = createQueue(address); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(queue); - TextMessage message = session.createTextMessage(); - p.send(message); - System.out.println("get mid: " + message.getJMSMessageID()); - connection.start(); - MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'"); - TextMessage m = (TextMessage) messageConsumer.receive(5000); - Assert.assertNotNull(m); - assertEquals(message.getJMSMessageID(), m.getJMSMessageID()); - connection.close(); - } - - @Test - public void testProducerWithoutUsingDefaultDestination() throws Exception { - - try { - javax.jms.Queue queue = createQueue(coreAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer p = session.createProducer(null); - - for (int i = 1; i <= 10; i++) { - String targetName = coreAddress + i; - javax.jms.Queue target = createQueue(targetName); - TextMessage message = session.createTextMessage("message for " + targetName); - p.send(target, message); - } - connection.start(); - MessageConsumer messageConsumer = session.createConsumer(queue); - Message m = messageConsumer.receive(200); - Assert.assertNull(m); - - for (int i = 1; i <= 10; i++) { - String targetName = coreAddress + i; - javax.jms.Queue target = createQueue(targetName); - MessageConsumer consumer = session.createConsumer(target); - TextMessage tm = (TextMessage) consumer.receive(2000); - assertNotNull(tm); - assertEquals("message for " + targetName, tm.getText()); - consumer.close(); - } - } finally { - connection.close(); - } - } - - private javax.jms.Queue createQueue(String address) throws Exception { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - try { - return session.createQueue(address); - } finally { - session.close(); - } - } - - private Connection createConnection() throws JMSException { - return this.createConnection(true); - } - - private javax.jms.Connection createConnection(boolean isStart) throws JMSException { - Connection connection; - if (protocol == 3) { - factory = new JmsConnectionFactory(amqpConnectionUri); - connection = factory.createConnection(); - } else if (protocol == 0) { - factory = new JmsConnectionFactory(userName, password, amqpConnectionUri); - connection = factory.createConnection(); - } else { - Assert.fail("protocol = " + protocol + " not supported"); - return null; // just to compile, the previous statement will throw an exception - } - if (isStart) { - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - } - - return connection; - } - - private javax.jms.Connection createConnection(String clientId) throws JMSException { - Connection connection; - if (protocol == 3) { - factory = new JmsConnectionFactory(amqpConnectionUri); - connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.setClientID(clientId); - connection.start(); - } else if (protocol == 0) { - factory = new JmsConnectionFactory(userName, password, amqpConnectionUri); - connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.setClientID(clientId); - connection.start(); - } else { - Assert.fail("protocol = " + protocol + " not supported"); - return null; // just to compile, the previous statement will throw an exception - } - - return connection; - } - - private void setAddressFullBlockPolicy() { - // For BLOCK tests - AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); - addressSettings.setMaxSizeBytes(maxSizeBytes); - addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold); - server.getAddressSettingsRepository().addMatch("#", addressSettings); - } - - public static class AnythingSerializable implements Serializable { - - private int count; - - public AnythingSerializable(int count) { - this.count = count; - } - - public int getCount() { - return count; - } - } - - /** - * If we have an address configured with both ANYCAST and MULTICAST routing types enabled, we must ensure that any - * messages sent specifically to MULTICAST (e.g. JMS TopicProducer) are only delivered to MULTICAST queues (e.g. - * i.e. subscription queues) and **NOT** to ANYCAST queues (e.g. JMS Queue). - * - * @throws Exception - */ - @Test - public void testRoutingExclusivity() throws Exception { - - // Create Address with both ANYCAST and MULTICAST enabled - String testAddress = "testRoutingExclusivity"; - SimpleString ssTestAddress = new SimpleString(testAddress); - - AddressInfo addressInfo = new AddressInfo(ssTestAddress); - addressInfo.addRoutingType(RoutingType.MULTICAST); - addressInfo.addRoutingType(RoutingType.ANYCAST); - - server.addAddressInfo(addressInfo); - server.createQueue(ssTestAddress, RoutingType.ANYCAST, ssTestAddress, null, true, false); - - Connection connection = createConnection(UUIDGenerator.getInstance().generateStringUUID()); - - try { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Topic topic = session.createTopic(testAddress); - javax.jms.Queue queue = session.createQueue(testAddress); - - MessageProducer producer = session.createProducer(topic); - - MessageConsumer queueConsumer = session.createConsumer(queue); - MessageConsumer topicConsumer = session.createConsumer(topic); - - producer.send(session.createTextMessage("testMessage")); - - assertNotNull(topicConsumer.receive(1000)); - assertNull(queueConsumer.receive(1000)); - } finally { - connection.close(); - } - } - - @Test - public void testReleaseDisposition() throws Exception { - AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); - - AmqpSender sender = session.createSender(address); - AmqpMessage message = new AmqpMessage(); - message.setText("Test-Message"); - sender.send(message); - - AmqpReceiver receiver = session.createReceiver(address); - receiver.flow(10); - - AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(m1); - m1.release(); - - //receiver.flow(10); - AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(m2); - m2.accept(); - } finally { - connection.close(); - } - } -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java deleted file mode 100644 index ab8d2d3..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.junit.After; -import org.junit.Before; - -public class ProtonTestBase extends ActiveMQTestBase { - - protected String brokerName = "localhost"; - protected ActiveMQServer server; - - protected String tcpAmqpConnectionUri = "tcp://localhost:5672"; - protected String userName = "guest"; - protected String password = "guest"; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - - server = this.createAMQPServer(5672); - server.start(); - } - - protected ActiveMQServer createAMQPServer(int port) throws Exception { - final ActiveMQServer amqpServer = this.createServer(true, true); - HashMap<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); - HashMap<String, Object> amqpParams = new HashMap<>(); - configureAmqp(amqpParams); - - amqpServer.getConfiguration().getAcceptorConfigurations().clear(); - - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams); - - amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); - amqpServer.getConfiguration().setName(brokerName); - amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port); - amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port); - amqpServer.getConfiguration().setPagingDirectory(amqpServer.getConfiguration().getPagingDirectory() + port); - - // Default Page - AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); - amqpServer.getConfiguration().getAddressesSettings().put("#", addressSettings); - configureServer(amqpServer.getConfiguration()); - return amqpServer; - } - - protected void configureServer(Configuration serverConfig) { - } - - protected void configureAmqp(Map<String, Object> params) { - } - - @Override - @After - public void tearDown() throws Exception { - try { - server.stop(); - } finally { - super.tearDown(); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java deleted file mode 100644 index dffc12e..0000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.tests.util.Wait; -import org.fusesource.hawtbuf.Buffer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class ProtonTestForHeader extends ActiveMQTestBase { - - private ActiveMQServer server; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - server = this.createServer(true, true); - HashMap<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "5672"); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - - server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); - server.getConfiguration().setSecurityEnabled(true); - server.start(); - ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); - securityManager.getConfiguration().addUser("auser", "pass"); - } - - @Override - @After - public void tearDown() throws Exception { - try { - server.stop(); - } finally { - super.tearDown(); - } - } - - @Test - public void testSimpleBytes() throws Exception { - final AmqpHeader header = new AmqpHeader(); - - header.setProtocolId(0); - header.setMajor(1); - header.setMinor(0); - header.setRevision(0); - - final ClientConnection connection = new ClientConnection(); - connection.open("localhost", 5672); - connection.send(header); - - AmqpHeader response = connection.readAmqpHeader(); - assertNotNull(response); - IntegrationTestLogger.LOGGER.info("Broker responded with: " + response); - - assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisfied() throws Exception { - try { - connection.send(header); - return false; - } catch (Exception e) { - return true; - } - } - }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250))); - } - - private class ClientConnection { - - protected static final long RECEIVE_TIMEOUT = 10000; - protected Socket clientSocket; - - public void open(String host, int port) throws IOException { - clientSocket = new Socket(host, port); - clientSocket.setTcpNoDelay(true); - } - - public void send(AmqpHeader header) throws Exception { - IntegrationTestLogger.LOGGER.info("Client sending header: " + header); - OutputStream outputStream = clientSocket.getOutputStream(); - header.getBuffer().writeTo(outputStream); - outputStream.flush(); - } - - public AmqpHeader readAmqpHeader() throws Exception { - clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT); - InputStream is = clientSocket.getInputStream(); - - byte[] header = new byte[8]; - int read = is.read(header); - if (read == header.length) { - return new AmqpHeader(new Buffer(header)); - } else { - return null; - } - } - } - - private class AmqpHeader { - - final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'}); - - private Buffer buffer; - - AmqpHeader() { - this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0})); - } - - AmqpHeader(Buffer buffer) { - this(buffer, true); - } - - AmqpHeader(Buffer buffer, boolean validate) { - setBuffer(buffer, validate); - } - - public int getProtocolId() { - return buffer.get(4) & 0xFF; - } - - public void setProtocolId(int value) { - buffer.data[buffer.offset + 4] = (byte) value; - } - - public int getMajor() { - return buffer.get(5) & 0xFF; - } - - public void setMajor(int value) { - buffer.data[buffer.offset + 5] = (byte) value; - } - - public int getMinor() { - return buffer.get(6) & 0xFF; - } - - public void setMinor(int value) { - buffer.data[buffer.offset + 6] = (byte) value; - } - - public int getRevision() { - return buffer.get(7) & 0xFF; - } - - public void setRevision(int value) { - buffer.data[buffer.offset + 7] = (byte) value; - } - - public Buffer getBuffer() { - return buffer; - } - - public void setBuffer(Buffer value) { - setBuffer(value, true); - } - - public void setBuffer(Buffer value, boolean validate) { - if (validate && !value.startsWith(PREFIX) || value.length() != 8) { - throw new IllegalArgumentException("Not an AMQP header buffer"); - } - buffer = value.buffer(); - } - - public boolean hasValidPrefix() { - return buffer.startsWith(PREFIX); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < buffer.length(); ++i) { - char value = (char) buffer.get(i); - if (Character.isLetter(value)) { - builder.append(value); - } else { - builder.append(","); - builder.append((int) value); - } - } - return builder.toString(); - } - } -}
