ARTEMIS-331 support 0-length large msg
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/978f8eed Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/978f8eed Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/978f8eed Branch: refs/heads/master Commit: 978f8eeda8c1a597402363e62c1f26e2a5cce9b7 Parents: 2838128 Author: jbertram <[email protected]> Authored: Mon Jan 4 14:44:09 2016 -0600 Committer: Clebert Suconic <[email protected]> Committed: Tue Jan 5 10:44:54 2016 -0500 ---------------------------------------------------------------------- .../core/server/impl/ServerConsumerImpl.java | 9 +- .../integration/client/LargeMessageTest.java | 150 +++++++++++++++++++ 2 files changed, 158 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 7d54d31..7936c76 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1036,7 +1036,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { context.encode(bodyBuffer, localChunkLen); - byte[] body = bodyBuffer.toByteBuffer().array(); + byte[] body; + + if (bodyBuffer.toByteBuffer().hasArray()) { + body = bodyBuffer.toByteBuffer().array(); + } + else { + body = new byte[0]; + } int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index d99ba63..bc94ab3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -2104,6 +2104,156 @@ public class LargeMessageTest extends LargeMessageTestBase { } } + // https://issues.apache.org/jira/browse/ARTEMIS-331 + @Test + public void testSendStreamingSingleEmptyMessage() throws Exception { + final String propertyName = "myStringPropertyName"; + final String propertyValue = "myStringPropertyValue"; + ClientSession session = null; + ActiveMQServer server = null; + + final int SIZE = 0; + try { + + server = createServer(true, isNetty()); + + server.start(); + + locator.setMinLargeMessageSize(100 * 1024); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + session = sf.createSession(null, null, false, true, true, false, 0); + + session.createQueue(ADDRESS, ADDRESS, null, true); + + ClientMessage clientFile = session.createMessage(true); + clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE)); + clientFile.putStringProperty(propertyName, propertyValue); + + ClientProducer producer = session.createProducer(ADDRESS); + + session.start(); + + log.debug("Sending"); + producer.send(clientFile); + + producer.close(); + + log.debug("Waiting"); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + + ClientMessage msg2 = consumer.receive(10000); + + msg2.acknowledge(); + + msg2.setOutputStream(createFakeOutputStream()); + Assert.assertTrue(msg2.waitOutputStreamCompletion(60000)); + Assert.assertEquals(propertyValue, msg2.getStringProperty(propertyName)); + + session.commit(); + + Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); + + } + finally { + try { + session.close(); + } + catch (Throwable ignored) { + } + + try { + server.stop(); + } + catch (Throwable ignored) { + } + } + } + + // https://issues.apache.org/jira/browse/ARTEMIS-331 + @Test + public void testSendStreamingEmptyMessagesWithRestart() throws Exception { + final String propertyName = "myStringPropertyName"; + final String propertyValue = "myStringPropertyValue"; + ClientSession session = null; + ActiveMQServer server = null; + + final int SIZE = 0; + try { + + server = createServer(true, isNetty()); + + server.start(); + + locator.setMinLargeMessageSize(100 * 1024); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + session = sf.createSession(null, null, false, true, true, false, 0); + + session.createQueue(ADDRESS, ADDRESS, null, true); + + ClientProducer producer = session.createProducer(ADDRESS); + + for (int i = 0; i < 10; i++) { + ClientMessage clientFile = session.createMessage(true); + clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE)); + clientFile.putStringProperty(propertyName, propertyValue + i); + producer.send(clientFile); + } + + producer.close(); + + session.close(); + + sf.close(); + + server.stop(); + + server.start(); + + sf = addSessionFactory(createSessionFactory(locator)); + + session = sf.createSession(null, null, false, true, true, false, 0); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + + session.start(); + + for (int i = 0; i < 10; i++) { + ClientMessage msg2 = consumer.receive(10000); + + msg2.acknowledge(); + + msg2.setOutputStream(createFakeOutputStream()); + Assert.assertTrue(msg2.waitOutputStreamCompletion(60000)); + Assert.assertEquals(propertyValue + i, msg2.getStringProperty(propertyName)); + + session.commit(); + } + + Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount()); + Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()))); + + } + finally { + try { + session.close(); + } + catch (Throwable ignored) { + } + + try { + server.stop(); + } + catch (Throwable ignored) { + } + } + } + /** * Receive messages but never reads them, leaving the buffer pending */
