This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit ad60a0b02b391a75eba2a876effd9e6e8d765b5d Author: Clebert Suconic <[email protected]> AuthorDate: Wed Apr 15 11:48:22 2020 -0400 ARTEMIS-1975 Dealing with connection drops on AMQP Large Message --- .../protocol/amqp/broker/AMQPLargeMessage.java | 7 +- .../amqp/proton/ProtonServerReceiverContext.java | 7 +- .../artemis/core/persistence/StorageManager.java | 2 + .../impl/journal/JournalStorageManager.java | 42 +++--- .../impl/journal/LargeServerMessageImpl.java | 1 + .../impl/nullpm/NullStorageLargeServerMessage.java | 6 + .../impl/nullpm/NullStorageManager.java | 5 + .../artemis/core/server/ActiveMQServer.java | 4 + .../artemis/core/server/LargeServerMessage.java | 2 + .../core/transaction/impl/TransactionImplTest.java | 5 + .../largemessages/InterruptedAMQPLargeMessage.java | 167 +++++++++++++++++++++ .../SimpleStreamingLargeMessageTest.java | 33 ++++ .../client/LargeMessageAvoidLargeMessagesTest.java | 4 + .../client/LargeMessageCompressTest.java | 4 + .../tests/integration/client/LargeMessageTest.java | 73 +++++++++ .../tests/integration/client/SendAckFailTest.java | 5 + 16 files changed, 348 insertions(+), 19 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index fe51f15..5db786d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -174,6 +174,11 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } + @Override + public void validateFile() throws ActiveMQException { + largeBody.validateFile(); + } + public void setFileDurable(boolean value) { this.fileDurable = value; } @@ -207,7 +212,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage return parsingData; } - protected void parseHeader(ReadableBuffer buffer) { + public void parseHeader(ReadableBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setBuffer(buffer); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index f2d12f5..62f4e39 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -373,7 +373,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements private void initializeCurrentLargeMessage(Delivery delivery, Receiver receiver) throws Exception { long id = sessionSPI.getStorageManager().generateID(); currentLargeMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); - currentLargeMessage.addBytes(receiver.recv()); + + ReadableBuffer dataBuffer = receiver.recv(); + currentLargeMessage.parseHeader(dataBuffer); + + sessionSPI.getStorageManager().largeMessageCreated(id, currentLargeMessage); + currentLargeMessage.addBytes(dataBuffer); } private void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 5bb91c0..f131c41 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -245,6 +245,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ LargeServerMessage createLargeMessage(long id, Message message) throws Exception; + LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception; + enum LargeMessageExtension { DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); final String extension; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 71a7f45..94a2dc0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -536,13 +536,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessage.moveHeadersAndProperties(message); - largeMessage.setMessageID(id); + return largeMessageCreated(id, largeMessage); + } finally { + readUnLock(); + } + } + @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + largeMessage.setMessageID(id); - // Check durable large massage size before to allocate resources if it can't be stored - if (largeMessage.isDurable()) { - final long maxRecordSize = getMaxRecordSize(); - final int messageEncodeSize = largeMessage.getEncodeSize(); + // Check durable large massage size before to allocate resources if it can't be stored + if (largeMessage.toMessage().isDurable()) { + final long maxRecordSize = getMaxRecordSize(); + if (largeMessage instanceof LargeServerMessageImpl) { + // the following check only applies to Core + LargeServerMessageImpl coreLarge = (LargeServerMessageImpl)largeMessage; + final int messageEncodeSize = coreLarge.getEncodeSize(); if (messageEncodeSize > maxRecordSize) { ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName()); @@ -554,22 +564,20 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize); } } + } - // We do this here to avoid a case where the replication gets a list without this file - // to avoid a race - largeMessage.validateFile(); - - if (largeMessage.isDurable()) { - // We store a marker on the journal that the large file is pending - long pendingRecordID = storePendingLargeMessage(id); + // We do this here to avoid a case where the replication gets a list without this file + // to avoid a race + largeMessage.validateFile(); - largeMessage.setPendingRecordID(pendingRecordID); - } + if (largeMessage.toMessage().isDurable()) { + // We store a marker on the journal that the large file is pending + long pendingRecordID = storePendingLargeMessage(id); - return largeMessage; - } finally { - readUnLock(); + largeMessage.setPendingRecordID(pendingRecordID); } + + return largeMessage; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index c8a82be..df5cd67 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -309,6 +309,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } } + @Override public synchronized void validateFile() throws ActiveMQException { this.ensureFileExists(true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 8252f34..f7684a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -71,6 +72,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ } @Override + public void validateFile() throws ActiveMQException { + + } + + @Override public void setStorageManager(StorageManager storageManager) { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index a4a3f8a..bc6488f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -304,6 +304,11 @@ public class NullStorageManager implements StorageManager { } @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return null; + } + + @Override public long generateID() { long id = idSequence.getAndIncrement(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 603adaf..795fb92 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -609,6 +609,10 @@ public interface ActiveMQServer extends ServiceComponent { Queue locateQueue(SimpleString queueName); + default Queue locateQueue(String queueName) { + return locateQueue(SimpleString.toSimpleString(queueName)); + } + default BindingQueryResult bindingQuery(SimpleString address) throws Exception { return bindingQuery(address, true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 2dcf404..d9eb996 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -66,5 +66,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { void setStorageManager(StorageManager storageManager); + void validateFile() throws ActiveMQException; + void finishParse() throws Exception; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index f740ba8..defda9d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -465,6 +465,11 @@ public class TransactionImplTest extends ActiveMQTestBase { } @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return null; + } + + @Override public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java new file mode 100644 index 0000000..af7c5bb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.largemessages; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.junit.Assert; +import org.junit.Test; + +public class InterruptedAMQPLargeMessage extends AmqpClientTestSupport { + + private static final int NUMBER_OF_THREADS = 10; + private static final int MINIMAL_SEND = 2; + + private static final int MESSAGE_SIZE = 1024 * 300; + + private static final String smallFrameAcceptor = new String("tcp://localhost:" + (AMQP_PORT + 8)); + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + server.getConfiguration().addAcceptorConfiguration("flow", smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=" + 512 + ";amqpMinLargeMessageSize=" + 10000); + } + + public static void main(String[] arg) { + // have everybody aligned on sending before we start + CyclicBarrier startFlag = new CyclicBarrier(NUMBER_OF_THREADS); + + CountDownLatch minimalKill = new CountDownLatch(MINIMAL_SEND * NUMBER_OF_THREADS); + Runnable runnable = () -> { + + try { + AmqpClient client = createLocalClient(); + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(arg[0]); + startFlag.await(); + for (int m = 0; m < 1000; m++) { + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + byte[] bytes = new byte[MESSAGE_SIZE]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) 'z'; + } + + message.setBytes(bytes); + sender.send(message); + minimalKill.countDown(); + } + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + }; + + + for (int t = 0; t < NUMBER_OF_THREADS; t++) { + Thread thread = new Thread(runnable); + thread.start(); + } + + try { + minimalKill.await(); + } catch (Exception e) { + e.printStackTrace(); + } + System.exit(-1); + } + + private static AmqpClient createLocalClient() throws URISyntaxException { + return new AmqpClient(new URI(smallFrameAcceptor), null, null); + } + + @Test + public void testInterruptedLargeMessage() throws Exception { + Process p = SpawnedVMSupport.spawnVM(InterruptedAMQPLargeMessage.class.getName(), getQueueName()); + p.waitFor(); + + Queue serverQueue = server.locateQueue(getQueueName()); + + Assert.assertTrue(serverQueue.getMessageCount() >= MINIMAL_SEND * NUMBER_OF_THREADS); + + LinkedListIterator<MessageReference> browserIterator = serverQueue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + + System.out.println("There are " + serverQueue.getMessageCount() + " on the queue"); + int messageCount = (int)serverQueue.getMessageCount(); + + AmqpClient client = createLocalClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + int received = 0; + receiver.flow((int) (messageCount + 10)); + for (int m = 0; m < messageCount; m++) { + receiver.flow(1); + AmqpMessage message = receiver.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message); + message.accept(true); + received++; + + System.out.println("Received " + received); + Data data = (Data)message.getWrappedMessage().getBody(); + byte[] byteArray = data.getValue().getArray(); + + Assert.assertEquals(MESSAGE_SIZE, byteArray.length); + for (int i = 0; i < byteArray.length; i++) { + Assert.assertEquals((byte)'z', byteArray[i]); + } + } + + + Assert.assertNull(receiver.receiveNoWait()); + + validateNoFilesOnLargeDir(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java index 81a8a58..58dae39 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java @@ -30,10 +30,14 @@ import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -322,6 +326,35 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { session.commit(); + Queue queue = server.locateQueue(SimpleString.toSimpleString(getQueueName())); + + Wait.assertEquals(1, queue::getMessageCount); + + LinkedListIterator<MessageReference> browserIterator = queue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + org.apache.activemq.artemis.api.core.Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + + connection.close(); + + server.stop(); + + server.start(); + + connection = client.createConnection(); + addConnection(connection); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); for (int i = 0; i < 1; i++) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java index 4b46cd1..7c3bc33 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java @@ -45,6 +45,10 @@ public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest { } @Override + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + } + + @Override protected boolean isNetty() { return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index bce8375..a03ca5d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -56,6 +56,10 @@ public class LargeMessageCompressTest extends LargeMessageTest { } @Override + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + } + + @Override protected boolean isNetty() { return false; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index e5cc609..79c7674 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -45,6 +45,8 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -53,6 +55,7 @@ import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTe import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -247,6 +250,76 @@ public class LargeMessageTest extends LargeMessageTestBase { validateNoFilesOnLargeDir(); } + + @Test + public void testPendingRecord() throws Exception { + + ActiveMQServer server = createServer(true, isNetty(), storeType); + + server.start(); + + final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = addClientSession(sf.createSession(false, true, false)); + + session.createQueue(new QueueConfiguration(ADDRESS)); + + ClientProducer producer = session.createProducer(ADDRESS); + + Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); + + // Send large message which should be dropped and deleted from the filesystem + + producer.send(clientFile); + + validateLargeMessageComplete(server); + + sf.close(); + + server.stop(); + + server = createServer(true, isNetty(), storeType); + + server.start(); + + sf = addSessionFactory(createSessionFactory(locator)); + + session = addClientSession(sf.createSession(false, true, false)); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + session.start(); + + ClientMessage message = consumer.receiveImmediate(); + Assert.assertNotNull(message); + for (int i = 0; i < messageSize; i++) { + assertEquals("position = " + i, getSamplebyte(i), message.getBodyBuffer().readByte()); + } + message.acknowledge(); + + validateNoFilesOnLargeDir(); + } + + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + Queue queue = server.locateQueue(ADDRESS); + + Wait.assertEquals(1, queue::getMessageCount); + + LinkedListIterator<MessageReference> browserIterator = queue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + } + @Test public void testDeleteOnDrop() throws Exception { fillAddress(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 3d77d1a..3e99da8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -287,6 +287,11 @@ public class SendAckFailTest extends SpawnedTestBase { } @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return manager.largeMessageCreated(id, largeMessage); + } + + @Override public void stop() throws Exception { manager.stop(); }
