Repository: activemq Updated Branches: refs/heads/master 548403ad9 -> 0752d840b
https://issues.apache.org/jira/browse/AMQ-6638 Adds some additional logging to the connection validation code, adds some additional tests as well. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0752d840 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0752d840 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0752d840 Branch: refs/heads/master Commit: 0752d840b90a5acdb04410362fb16c943f09dc2f Parents: 548403a Author: Timothy Bish <[email protected]> Authored: Wed Apr 5 16:20:28 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Apr 5 16:20:28 2017 -0400 ---------------------------------------------------------------------- .../activemq/transport/amqp/AmqpWireFormat.java | 8 + .../transport/amqp/AmqpTestSupport.java | 16 +- .../transport/amqp/client/AmqpMessage.java | 78 +++++- .../amqp/interop/AmqpExpiredMessageTest.java | 269 +++++++++++++++++++ 4 files changed, 360 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 2b81ec7..89facbe 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -32,9 +32,13 @@ import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AmqpWireFormat implements WireFormat { + private static final Logger LOG = LoggerFactory.getLogger(AmqpWireFormat.class); + public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; public static final int NO_AMQP_MAX_FRAME_SIZE = -1; public static final int DEFAULT_CONNECTION_TIMEOUT = 30000; @@ -137,18 +141,22 @@ public class AmqpWireFormat implements WireFormat { */ public boolean isHeaderValid(AmqpHeader header, boolean authenticated) { if (!header.hasValidPrefix()) { + LOG.trace("AMQP Header arrived with invalid prefix: {}", header); return false; } if (!(header.getProtocolId() == 0 || header.getProtocolId() == SASL_PROTOCOL)) { + LOG.trace("AMQP Header arrived with invalid protocol ID: {}", header); return false; } if (!authenticated && !isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) { + LOG.trace("AMQP Header arrived without SASL and server requires SASL: {}", header); return false; } if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0) { + LOG.trace("AMQP Header arrived invalid version: {}", header); return false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index fd4accb..86402dc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -72,7 +72,7 @@ public class AmqpTestSupport { protected ExecutorService testService = Executors.newSingleThreadExecutor(); protected BrokerService brokerService; - protected Vector<Throwable> exceptions = new Vector<Throwable>(); + protected Vector<Throwable> exceptions = new Vector<>(); protected int numberOfMessages; protected URI amqpURI; @@ -150,7 +150,7 @@ public class AmqpTestSupport { System.setProperty("javax.net.ssl.keyStorePassword", "password"); System.setProperty("javax.net.ssl.keyStoreType", "jks"); - ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>(); + ArrayList<BrokerPlugin> plugins = new ArrayList<>(); addAdditionalPlugins(plugins); @@ -182,28 +182,28 @@ public class AmqpTestSupport { } if (isUseTcpConnector()) { connector = brokerService.addConnector( - "amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "amqp://0.0.0.0:" + amqpPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpPort = connector.getConnectUri().getPort(); amqpURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp port " + amqpPort); } if (isUseSslConnector()) { connector = brokerService.addConnector( - "amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpSslPort = connector.getConnectUri().getPort(); amqpSslURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+ssl port " + amqpSslPort); } if (isUseNioConnector()) { connector = brokerService.addConnector( - "amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpNioPort = connector.getConnectUri().getPort(); amqpNioURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+nio port " + amqpNioPort); } if (isUseNioPlusSslConnector()) { connector = brokerService.addConnector( - "amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpNioPlusSslPort = connector.getConnectUri().getPort(); amqpNioPlusSslURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort); @@ -238,14 +238,14 @@ public class AmqpTestSupport { } if (isUseWsConnector()) { connector = brokerService.addConnector( - "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpWsPort = connector.getConnectUri().getPort(); amqpWsURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+ws port " + amqpWsPort); } if (isUseWssConnector()) { connector = brokerService.addConnector( - "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); amqpWssPort = connector.getConnectUri().getPort(); amqpWssURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+wss port " + amqpWssPort); http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index d28ac8e..da0c4e6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -433,6 +433,78 @@ public class AmqpMessage { } /** + * Sets the priority header on the outgoing message. + * + * @param priority the priority value to set. + */ + public void setPriority(short priority) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setPriority(priority); + } + + /** + * Gets the priority header on the message. + */ + public short getPriority() { + return getWrappedMessage().getPriority(); + } + + /** + * Sets the ttl header on the outgoing message. + * + * @param timeToLive the ttl value to set. + */ + public void setTimeToLive(long timeToLive) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setTtl(timeToLive); + } + + /** + * Sets the ttl header on the outgoing message. + */ + public long getTimeToLive() { + return getWrappedMessage().getTtl(); + } + + /** + * Sets the absolute expiration time property on the message. + * + * @param absoluteExpiryTime the expiration time value to set. + */ + public void setAbsoluteExpiryTime(long absoluteExpiryTime) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setExpiryTime(absoluteExpiryTime); + } + + /** + * Gets the absolute expiration time property on the message. + */ + public long getAbsoluteExpiryTime() { + return getWrappedMessage().getExpiryTime(); + } + + /** + * Sets the creation time property on the message. + * + * @param creationTime the time value to set. + */ + public void setCreationTime(long creationTime) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCreationTime(creationTime); + } + + /** + * Gets the absolute expiration time property on the message. + */ + public long getCreationTime() { + return getWrappedMessage().getCreationTime(); + } + + /** * Sets a given application property on an outbound message. * * @param key @@ -615,21 +687,21 @@ public class AmqpMessage { private void lazyCreateMessageAnnotations() { if (messageAnnotationsMap == null) { - messageAnnotationsMap = new HashMap<Symbol,Object>(); + messageAnnotationsMap = new HashMap<>(); message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); } } private void lazyCreateDeliveryAnnotations() { if (deliveryAnnotationsMap == null) { - deliveryAnnotationsMap = new HashMap<Symbol,Object>(); + deliveryAnnotationsMap = new HashMap<>(); message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); } } private void lazyCreateApplicationProperties() { if (applicationPropertiesMap == null) { - applicationPropertiesMap = new HashMap<String, Object>(); + applicationPropertiesMap = new HashMap<>(); message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/0752d840/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java new file mode 100644 index 0000000..902ca55 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpExpiredMessageTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +public class AmqpExpiredMessageTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + // Broker doesn't track messages that arrived already expired. + assertEquals(0, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); + assertNull(received); + + // Broker doesn't track messages that arrived already expired. + assertEquals(0, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000); + // AET should override any TTL set + message.setTimeToLive(60000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + // Broker doesn't track messages that arrived already expired. + assertEquals(0, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); + assertNull(received); + + // Broker doesn't track messages that arrived already expired. + assertEquals(0, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsExpiredUsingTTLWhenAbsoluteIsZero() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(0); + // AET should override any TTL set unless it is zero + message.setTimeToLive(1000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getQueueSize()); + + Thread.sleep(1000); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); + assertNull(received); + + assertEquals(1, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000); + // AET should override any TTL set + message.setTimeToLive(10); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + Thread.sleep(50); + + assertEquals(1, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getExpiredCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = trackConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(10); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + Thread.sleep(50); + + assertEquals(1, queueView.getQueueSize()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS); + assertNull(received); + + assertEquals(1, queueView.getExpiredCount()); + + connection.close(); + } +}
