This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ad60a0b02b391a75eba2a876effd9e6e8d765b5d
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Apr 15 11:48:22 2020 -0400

    ARTEMIS-1975 Dealing with connection drops on AMQP Large Message
---
 .../protocol/amqp/broker/AMQPLargeMessage.java     |   7 +-
 .../amqp/proton/ProtonServerReceiverContext.java   |   7 +-
 .../artemis/core/persistence/StorageManager.java   |   2 +
 .../impl/journal/JournalStorageManager.java        |  42 +++---
 .../impl/journal/LargeServerMessageImpl.java       |   1 +
 .../impl/nullpm/NullStorageLargeServerMessage.java |   6 +
 .../impl/nullpm/NullStorageManager.java            |   5 +
 .../artemis/core/server/ActiveMQServer.java        |   4 +
 .../artemis/core/server/LargeServerMessage.java    |   2 +
 .../core/transaction/impl/TransactionImplTest.java |   5 +
 .../largemessages/InterruptedAMQPLargeMessage.java | 167 +++++++++++++++++++++
 .../SimpleStreamingLargeMessageTest.java           |  33 ++++
 .../client/LargeMessageAvoidLargeMessagesTest.java |   4 +
 .../client/LargeMessageCompressTest.java           |   4 +
 .../tests/integration/client/LargeMessageTest.java |  73 +++++++++
 .../tests/integration/client/SendAckFailTest.java  |   5 +
 16 files changed, 348 insertions(+), 19 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index fe51f15..5db786d 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -174,6 +174,11 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
       }
    }
 
+   @Override
+   public void validateFile() throws ActiveMQException {
+      largeBody.validateFile();
+   }
+
    public void setFileDurable(boolean value) {
       this.fileDurable = value;
    }
@@ -207,7 +212,7 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
       return parsingData;
    }
 
-   protected void parseHeader(ReadableBuffer buffer) {
+   public void parseHeader(ReadableBuffer buffer) {
 
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setBuffer(buffer);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index f2d12f5..62f4e39 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -373,7 +373,12 @@ public class ProtonServerReceiverContext extends 
ProtonInitializable implements
    private void initializeCurrentLargeMessage(Delivery delivery, Receiver 
receiver) throws Exception {
       long id = sessionSPI.getStorageManager().generateID();
       currentLargeMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), 
sessionSPI.getStorageManager());
-      currentLargeMessage.addBytes(receiver.recv());
+
+      ReadableBuffer dataBuffer = receiver.recv();
+      currentLargeMessage.parseHeader(dataBuffer);
+
+      sessionSPI.getStorageManager().largeMessageCreated(id, 
currentLargeMessage);
+      currentLargeMessage.addBytes(dataBuffer);
    }
 
    private void actualDelivery(AMQPMessage message, Delivery delivery, 
Receiver receiver, Transaction tx) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 5bb91c0..f131c41 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -245,6 +245,8 @@ public interface StorageManager extends IDGenerator, 
ActiveMQComponent {
     */
    LargeServerMessage createLargeMessage(long id, Message message) throws 
Exception;
 
+   LargeServerMessage largeMessageCreated(long id, LargeServerMessage 
largeMessage) throws Exception;
+
    enum LargeMessageExtension {
       DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync");
       final String extension;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index 71a7f45..94a2dc0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -536,13 +536,23 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
 
          largeMessage.moveHeadersAndProperties(message);
 
-         largeMessage.setMessageID(id);
+         return largeMessageCreated(id, largeMessage);
+      } finally {
+         readUnLock();
+      }
+   }
 
+   @Override
+   public LargeServerMessage largeMessageCreated(long id, LargeServerMessage 
largeMessage) throws Exception {
+      largeMessage.setMessageID(id);
 
-         // Check durable large massage size before to allocate resources if 
it can't be stored
-         if (largeMessage.isDurable()) {
-            final long maxRecordSize = getMaxRecordSize();
-            final int messageEncodeSize = largeMessage.getEncodeSize();
+      // Check durable large massage size before to allocate resources if it 
can't be stored
+      if (largeMessage.toMessage().isDurable()) {
+         final long maxRecordSize = getMaxRecordSize();
+         if (largeMessage instanceof LargeServerMessageImpl) {
+            // the following check only applies to Core
+            LargeServerMessageImpl coreLarge = 
(LargeServerMessageImpl)largeMessage;
+            final int messageEncodeSize = coreLarge.getEncodeSize();
 
             if (messageEncodeSize > maxRecordSize) {
                
ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(),
 logger.getName());
@@ -554,22 +564,20 @@ public class JournalStorageManager extends 
AbstractJournalStorageManager {
                throw 
ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, 
maxRecordSize);
             }
          }
+      }
 
-         // We do this here to avoid a case where the replication gets a list 
without this file
-         // to avoid a race
-         largeMessage.validateFile();
-
-         if (largeMessage.isDurable()) {
-            // We store a marker on the journal that the large file is pending
-            long pendingRecordID = storePendingLargeMessage(id);
+      // We do this here to avoid a case where the replication gets a list 
without this file
+      // to avoid a race
+      largeMessage.validateFile();
 
-            largeMessage.setPendingRecordID(pendingRecordID);
-         }
+      if (largeMessage.toMessage().isDurable()) {
+         // We store a marker on the journal that the large file is pending
+         long pendingRecordID = storePendingLargeMessage(id);
 
-         return largeMessage;
-      } finally {
-         readUnLock();
+         largeMessage.setPendingRecordID(pendingRecordID);
       }
+
+      return largeMessage;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index c8a82be..df5cd67 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -309,6 +309,7 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements CoreLar
       }
    }
 
+   @Override
    public synchronized void validateFile() throws ActiveMQException {
       this.ensureFileExists(true);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
index 8252f34..f7684a8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
@@ -18,6 +18,7 @@ package 
org.apache.activemq.artemis.core.persistence.impl.nullpm;
 
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -71,6 +72,11 @@ class NullStorageLargeServerMessage extends CoreMessage 
implements CoreLargeServ
    }
 
    @Override
+   public void validateFile() throws ActiveMQException {
+
+   }
+
+   @Override
    public void setStorageManager(StorageManager storageManager) {
 
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index a4a3f8a..bc6488f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -304,6 +304,11 @@ public class NullStorageManager implements StorageManager {
    }
 
    @Override
+   public LargeServerMessage largeMessageCreated(long id, LargeServerMessage 
largeMessage) throws Exception {
+      return null;
+   }
+
+   @Override
    public long generateID() {
       long id = idSequence.getAndIncrement();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 603adaf..795fb92 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -609,6 +609,10 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue locateQueue(SimpleString queueName);
 
+   default Queue locateQueue(String queueName) {
+      return locateQueue(SimpleString.toSimpleString(queueName));
+   }
+
    default BindingQueryResult bindingQuery(SimpleString address) throws 
Exception {
       return bindingQuery(address, true);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
index 2dcf404..d9eb996 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java
@@ -66,5 +66,7 @@ public interface LargeServerMessage extends 
ReplicatedLargeMessage {
 
    void setStorageManager(StorageManager storageManager);
 
+   void validateFile() throws ActiveMQException;
+
    void finishParse() throws Exception;
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index f740ba8..defda9d 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -465,6 +465,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public LargeServerMessage largeMessageCreated(long id, 
LargeServerMessage largeMessage) throws Exception {
+         return null;
+      }
+
+      @Override
       public SequentialFile createFileForLargeMessage(long messageID, 
LargeMessageExtension extension) {
          return null;
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java
new file mode 100644
index 0000000..af7c5bb
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InterruptedAMQPLargeMessage extends AmqpClientTestSupport {
+
+   private static final int NUMBER_OF_THREADS = 10;
+   private static final int MINIMAL_SEND = 2;
+
+   private static final int MESSAGE_SIZE = 1024 * 300;
+
+   private static final String smallFrameAcceptor = new 
String("tcp://localhost:" + (AMQP_PORT + 8));
+
+   @Override
+   protected void addAdditionalAcceptors(ActiveMQServer server) throws 
Exception {
+      server.getConfiguration().addAcceptorConfiguration("flow", 
smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=" + 512 + 
";amqpMinLargeMessageSize=" + 10000);
+   }
+
+   public static void main(String[] arg) {
+      // have everybody aligned on sending before we start
+      CyclicBarrier startFlag = new CyclicBarrier(NUMBER_OF_THREADS);
+
+      CountDownLatch minimalKill = new CountDownLatch(MINIMAL_SEND * 
NUMBER_OF_THREADS);
+      Runnable runnable = () -> {
+
+         try {
+            AmqpClient client = createLocalClient();
+            AmqpConnection connection = client.createConnection();
+            connection.setMaxFrameSize(2 * 1024);
+            connection.connect();
+            AmqpSession session = connection.createSession();
+
+            AmqpSender sender = session.createSender(arg[0]);
+            startFlag.await();
+            for (int m = 0; m < 1000; m++) {
+               AmqpMessage message = new AmqpMessage();
+               message.setDurable(true);
+               byte[] bytes = new byte[MESSAGE_SIZE];
+               for (int i = 0; i < bytes.length; i++) {
+                  bytes[i] = (byte) 'z';
+               }
+
+               message.setBytes(bytes);
+               sender.send(message);
+               minimalKill.countDown();
+            }
+            connection.close();
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+      };
+
+
+      for (int t = 0; t < NUMBER_OF_THREADS; t++) {
+         Thread thread = new Thread(runnable);
+         thread.start();
+      }
+
+      try {
+         minimalKill.await();
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
+      System.exit(-1);
+   }
+
+   private static AmqpClient createLocalClient() throws URISyntaxException {
+      return new AmqpClient(new URI(smallFrameAcceptor), null, null);
+   }
+
+   @Test
+   public void testInterruptedLargeMessage() throws Exception {
+      Process p = 
SpawnedVMSupport.spawnVM(InterruptedAMQPLargeMessage.class.getName(), 
getQueueName());
+      p.waitFor();
+
+      Queue serverQueue = server.locateQueue(getQueueName());
+
+      Assert.assertTrue(serverQueue.getMessageCount() >= MINIMAL_SEND * 
NUMBER_OF_THREADS);
+
+      LinkedListIterator<MessageReference> browserIterator = 
serverQueue.browserIterator();
+
+      while (browserIterator.hasNext()) {
+         MessageReference ref = browserIterator.next();
+         Message message = ref.getMessage();
+
+         Assert.assertNotNull(message);
+         Assert.assertTrue(message instanceof LargeServerMessage);
+
+         Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
+      }
+      browserIterator.close();
+
+      System.out.println("There are " + serverQueue.getMessageCount() + " on 
the queue");
+      int messageCount = (int)serverQueue.getMessageCount();
+
+      AmqpClient client = createLocalClient();
+      AmqpConnection connection = addConnection(client.createConnection());
+      connection.setMaxFrameSize(2 * 1024);
+      connection.connect();
+      AmqpSession session = connection.createSession();
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      int received = 0;
+      receiver.flow((int) (messageCount + 10));
+      for (int m = 0; m < messageCount; m++) {
+         receiver.flow(1);
+         AmqpMessage message = receiver.receive(10, TimeUnit.SECONDS);
+         Assert.assertNotNull(message);
+         message.accept(true);
+         received++;
+
+         System.out.println("Received " + received);
+         Data data = (Data)message.getWrappedMessage().getBody();
+         byte[] byteArray = data.getValue().getArray();
+
+         Assert.assertEquals(MESSAGE_SIZE, byteArray.length);
+         for (int i = 0; i < byteArray.length; i++) {
+            Assert.assertEquals((byte)'z', byteArray[i]);
+         }
+      }
+
+
+      Assert.assertNull(receiver.receiveNoWait());
+
+      validateNoFilesOnLargeDir();
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
index 81a8a58..58dae39 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
@@ -30,10 +30,14 @@ import java.util.Arrays;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -322,6 +326,35 @@ public class SimpleStreamingLargeMessageTest extends 
AmqpClientTestSupport {
 
          session.commit();
 
+         Queue queue = 
server.locateQueue(SimpleString.toSimpleString(getQueueName()));
+
+         Wait.assertEquals(1, queue::getMessageCount);
+
+         LinkedListIterator<MessageReference> browserIterator = 
queue.browserIterator();
+
+         while (browserIterator.hasNext()) {
+            MessageReference ref = browserIterator.next();
+            org.apache.activemq.artemis.api.core.Message message = 
ref.getMessage();
+
+            Assert.assertNotNull(message);
+            Assert.assertTrue(message instanceof LargeServerMessage);
+
+            
Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
+         }
+         browserIterator.close();
+
+         connection.close();
+
+         server.stop();
+
+         server.start();
+
+         connection = client.createConnection();
+         addConnection(connection);
+         connection.setMaxFrameSize(2 * 1024);
+         connection.connect();
+         session = connection.createSession();
+
          AmqpReceiver receiver = session.createReceiver(getQueueName());
          receiver.flow(1);
          for (int i = 0; i < 1; i++) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
index 4b46cd1..7c3bc33 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java
@@ -45,6 +45,10 @@ public class LargeMessageAvoidLargeMessagesTest extends 
LargeMessageTest {
    }
 
    @Override
+   protected void validateLargeMessageComplete(ActiveMQServer server) throws 
Exception {
+   }
+
+   @Override
    protected boolean isNetty() {
       return false;
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index bce8375..a03ca5d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -56,6 +56,10 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
    }
 
    @Override
+   protected void validateLargeMessageComplete(ActiveMQServer server) throws 
Exception {
+   }
+
+   @Override
    protected boolean isNetty() {
       return false;
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index e5cc609..79c7674 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -45,6 +45,8 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -53,6 +55,7 @@ import 
org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTe
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -247,6 +250,76 @@ public class LargeMessageTest extends LargeMessageTestBase 
{
       validateNoFilesOnLargeDir();
    }
 
+
+   @Test
+   public void testPendingRecord() throws Exception {
+
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      final int messageSize = (int) (3.5 * 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = addClientSession(sf.createSession(false, true, 
false));
+
+      session.createQueue(new QueueConfiguration(ADDRESS));
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      Message clientFile = createLargeClientMessageStreaming(session, 
messageSize, true);
+
+      // Send large message which should be dropped and deleted from the 
filesystem
+
+      producer.send(clientFile);
+
+      validateLargeMessageComplete(server);
+
+      sf.close();
+
+      server.stop();
+
+      server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      sf = addSessionFactory(createSessionFactory(locator));
+
+      session = addClientSession(sf.createSession(false, true, false));
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      session.start();
+
+      ClientMessage message = consumer.receiveImmediate();
+      Assert.assertNotNull(message);
+      for (int i = 0; i < messageSize; i++) {
+         assertEquals("position = " + i, getSamplebyte(i), 
message.getBodyBuffer().readByte());
+      }
+      message.acknowledge();
+
+      validateNoFilesOnLargeDir();
+   }
+
+   protected void validateLargeMessageComplete(ActiveMQServer server) throws 
Exception {
+      Queue queue = server.locateQueue(ADDRESS);
+
+      Wait.assertEquals(1, queue::getMessageCount);
+
+      LinkedListIterator<MessageReference> browserIterator = 
queue.browserIterator();
+
+      while (browserIterator.hasNext()) {
+         MessageReference ref = browserIterator.next();
+         Message message = ref.getMessage();
+
+         Assert.assertNotNull(message);
+         Assert.assertTrue(message instanceof LargeServerMessage);
+
+         Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord());
+      }
+      browserIterator.close();
+   }
+
    @Test
    public void testDeleteOnDrop() throws Exception {
       fillAddress();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index 3d77d1a..3e99da8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -287,6 +287,11 @@ public class SendAckFailTest extends SpawnedTestBase {
       }
 
       @Override
+      public LargeServerMessage largeMessageCreated(long id, 
LargeServerMessage largeMessage) throws Exception {
+         return manager.largeMessageCreated(id, largeMessage);
+      }
+
+      @Override
       public void stop() throws Exception {
          manager.stop();
       }

Reply via email to