This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 03fb630  ARTEMIS-1975 Fixing LargeMessage encoding for replication
     new e01ba09  This closes #3048
03fb630 is described below

commit 03fb630f73451bf770906f927595a0f951f09678
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Mar 24 22:57:24 2020 -0400

    ARTEMIS-1975 Fixing LargeMessage encoding for replication
---
 .../artemis/core/journal/impl/JournalImpl.java     |  4 +-
 .../protocol/amqp/broker/AMQPLargeMessage.java     | 38 ++++++++++-
 .../amqp/broker/AMQPLargeMessagePersister.java     | 27 ++++----
 .../amqp/AmqpBridgeClusterRedistributionTest.java  | 13 ++++
 .../largemessages/AMQPLargeMessagesTestUtil.java   | 55 ++++++++++++++++
 .../AmqpReplicatedLargeMessageTest.java            | 10 +--
 .../SimpleStreamingLargeMessageTest.java           | 77 ++++++++++++++++++++++
 7 files changed, 202 insertions(+), 22 deletions(-)

diff --git 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index c274301..1161756 100644
--- 
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ 
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -1029,6 +1029,8 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                   tx.checkErrorCondition();
                }
                JournalInternalRecord addRecord = new JournalAddRecordTX(true, 
txID, id, recordType, persister, record);
+               // we need to calculate the encodeSize here, as it may use 
caches that are eliminated once the record is written
+               int encodeSize = addRecord.getEncodeSize();
                JournalFile usedFile = appendRecord(addRecord, false, false, 
tx, null);
 
                if (logger.isTraceEnabled()) {
@@ -1042,7 +1044,7 @@ public class JournalImpl extends JournalBase implements 
TestableJournal, Journal
                                   usedFile);
                }
 
-               tx.addPositive(usedFile, id, addRecord.getEncodeSize());
+               tx.addPositive(usedFile, id, encodeSize);
             } catch (Throwable e) {
                logger.error("appendAddRecordTransactional:" + e, e);
                setErrorCondition(null, tx, e);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 7937f7f..f310fb3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import java.nio.ByteBuffer;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -79,7 +80,7 @@ public class AMQPLargeMessage extends AMQPMessage implements 
LargeServerMessage
    /**
     * AMQPLargeMessagePersister will save the buffer here.
     * */
-   volatile ByteBuf temporaryBuffer;
+   private ByteBuf temporaryBuffer;
 
    private final LargeBody largeBody;
    /**
@@ -126,6 +127,41 @@ public class AMQPLargeMessage extends AMQPMessage 
implements LargeServerMessage
       parsingData = null;
    }
 
+   public void releaseEncodedBuffer() {
+      internalReleaseBuffer(1);
+   }
+
+   /** {@link #getSavedEncodeBuffer()} will retain two counters from the 
buffer, one meant for the call,
+    * and one that must be released only after encoding.
+    *
+    * This method is meant to be called when the buffer is actually encoded on 
the journal, meaning both refs are gone.
+    * and the actual buffer can be released.
+    */
+   public void releaseEncodedBufferAfterWrite() {
+      internalReleaseBuffer(2);
+   }
+
+   private synchronized void internalReleaseBuffer(int releases) {
+      for (int i = 0; i < releases; i++) {
+         if (temporaryBuffer != null && temporaryBuffer.release()) {
+            temporaryBuffer = null;
+         }
+      }
+   }
+
+   /** This is used on test assertions to make sure the buffers are released 
corrected */
+   public ByteBuf inspectTemporaryBuffer() {
+      return temporaryBuffer;
+   }
+
+   public synchronized ByteBuf getSavedEncodeBuffer() {
+      if (temporaryBuffer == null) {
+         temporaryBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(getEstimateSavedEncode());
+         saveEncoding(temporaryBuffer);
+      }
+      return temporaryBuffer.retain(1);
+   }
+
    @Override
    public void finishParse() throws Exception {
       openLargeMessage();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
index cffda5d..f573e6b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java
@@ -18,7 +18,6 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -57,22 +56,18 @@ public class AMQPLargeMessagePersister extends 
MessagePersister {
 
    @Override
    public int getEncodeSize(Message record) {
-      ByteBuf buf = getSavedEncodeBuffer(record);
-
-      int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + 
DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + 
SimpleString.sizeofNullableString(record.getAddressSimpleString()) + 
DataConstants.SIZE_BOOLEAN + buf.writerIndex();
+      AMQPLargeMessage msgEncode = (AMQPLargeMessage) record;
+      ByteBuf buf = msgEncode.getSavedEncodeBuffer();
 
-      TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
+      try {
+         int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + 
DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + 
SimpleString.sizeofNullableString(record.getAddressSimpleString()) + 
DataConstants.SIZE_BOOLEAN + buf.writerIndex();
 
-      return encodeSize + (properties != null ? properties.getEncodeSize() : 
0);
-   }
+         TypedProperties properties = ((AMQPMessage) 
record).getExtraProperties();
 
-   private ByteBuf getSavedEncodeBuffer(Message record) {
-      AMQPLargeMessage largeMessage = (AMQPLargeMessage)record;
-      if (largeMessage.temporaryBuffer == null) {
-         largeMessage.temporaryBuffer = 
PooledByteBufAllocator.DEFAULT.buffer(largeMessage.getEstimateSavedEncode());
-         largeMessage.saveEncoding(largeMessage.temporaryBuffer);
+         return encodeSize + (properties != null ? properties.getEncodeSize() 
: 0);
+      } finally {
+         msgEncode.releaseEncodedBuffer();
       }
-      return largeMessage.temporaryBuffer;
    }
 
    /**
@@ -96,10 +91,10 @@ public class AMQPLargeMessagePersister extends 
MessagePersister {
          properties.encode(buffer.byteBuf());
       }
 
-      ByteBuf savedEncodeBuffer = getSavedEncodeBuffer(record);
+      ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer();
       buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
-      savedEncodeBuffer.release();
-      msgEncode.temporaryBuffer = null;
+      msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as 
getSavedEncodedBuffer will keep 1 for himself until encoding has happened
+                                                  // which this is the 
expected event where we need to release the extra refCounter
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
index 1ba439c..328f7f9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import 
org.apache.activemq.artemis.tests.integration.amqp.largemessages.AMQPLargeMessagesTestUtil;
 import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
 import org.junit.After;
 import org.junit.Before;
@@ -180,12 +181,18 @@ public class AmqpBridgeClusterRedistributionTest extends 
AmqpClientTestSupport {
 
          sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 
1, RoutingType.ANYCAST, true);
 
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
+
          ClientMessage message = consumer.receive(5000);
          assertNotNull(message);
 
          message = consumer.receiveImmediate();
          assertNull(message);
       }
+
+      AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
+      AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
    }
 
    @Test
@@ -196,12 +203,18 @@ public class AmqpBridgeClusterRedistributionTest extends 
AmqpClientTestSupport {
 
          sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 
1, RoutingType.ANYCAST, true);
 
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
+
          ClientMessage message = consumer.receive(5000);
          assertNotNull(message);
 
          message = consumer.receiveImmediate();
          assertNull(message);
       }
+
+      AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
+      AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
    }
 
    protected void setupClusterConnection(final String name,
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
new file mode 100644
index 0000000..769172f
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
+
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Assert;
+
+public class AMQPLargeMessagesTestUtil {
+
+
+   public static void validateAllTemporaryBuffers(ActiveMQServer server) {
+      for (Binding binding : server.getPostOffice().getAllBindings().values()) 
{
+         if (binding instanceof QueueBinding) {
+            validateTemporaryBuffers(((QueueBinding)binding).getQueue());
+         }
+      }
+   }
+
+   public static void validateTemporaryBuffers(Queue serverQueue) {
+      LinkedListIterator<MessageReference> totalIterator = 
serverQueue.browserIterator();
+      while (totalIterator.hasNext()) {
+         MessageReference ref = totalIterator.next();
+         if (ref.getMessage() instanceof AMQPLargeMessage) {
+            AMQPLargeMessage amqpLargeMessage = (AMQPLargeMessage) 
ref.getMessage();
+            // Using a Wait.waitFor here as we may have something working with 
the buffer in parallel
+            Wait.waitFor(() -> amqpLargeMessage.inspectTemporaryBuffer() == 
null, 1000, 10);
+            Assert.assertNull("Temporary buffers are being retained", 
amqpLargeMessage.inspectTemporaryBuffer());
+         }
+      }
+      totalIterator.close();
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java
index 5fc4e60..a87a82e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java
@@ -98,7 +98,7 @@ public class AmqpReplicatedLargeMessageTest extends 
AmqpReplicatedTestSupport {
          assertEquals(0, queueView.getMessageCount());
 
          session.begin();
-         for (int m = 0; m < 10; m++) {
+         for (int m = 0; m < 100; m++) {
             AmqpMessage message = new AmqpMessage();
             message.setDurable(true);
             message.setApplicationProperty("i", "m " + m);
@@ -112,6 +112,8 @@ public class AmqpReplicatedLargeMessageTest extends 
AmqpReplicatedTestSupport {
          }
          session.commit();
 
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
+
          if (crashServer) {
             connection.close();
             liveServer.crash();
@@ -129,11 +131,11 @@ public class AmqpReplicatedLargeMessageTest extends 
AmqpReplicatedTestSupport {
          }
 
          queueView = server.locateQueue(getQueueName());
-         Wait.assertEquals(10, queueView::getMessageCount);
+         Wait.assertEquals(100, queueView::getMessageCount);
 
          AmqpReceiver receiver = 
session.createReceiver(getQueueName().toString());
-         receiver.flow(10);
-         for (int i = 0; i < 10; i++) {
+         receiver.flow(100);
+         for (int i = 0; i < 100; i++) {
             AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
             Assert.assertNotNull(msgReceived);
             Data body = (Data)msgReceived.getWrappedMessage().getBody();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
index 2539838..81a8a58 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
@@ -125,6 +125,8 @@ public class SimpleStreamingLargeMessageTest extends 
AmqpClientTestSupport {
          }
          session.commit();
 
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
+
          if (restartServer) {
             connection.close();
             server.stop();
@@ -230,6 +232,8 @@ public class SimpleStreamingLargeMessageTest extends 
AmqpClientTestSupport {
             }
          }
 
+         AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
+
          if (restartServer) {
             connection.close();
             server.stop();
@@ -274,6 +278,79 @@ public class SimpleStreamingLargeMessageTest extends 
AmqpClientTestSupport {
 
    }
 
+
+   @Test
+   public void testSingleMessage() throws Exception {
+      try {
+
+         int size = 100 * 1024;
+         AmqpClient client = createAmqpClient(new URI(smallFrameAcceptor));
+         AmqpConnection connection = client.createConnection();
+         addConnection(connection);
+         connection.setMaxFrameSize(2 * 1024);
+         connection.connect();
+
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(getQueueName());
+
+         Queue queueView = getProxyToQueue(getQueueName());
+         assertNotNull(queueView);
+         assertEquals(0, queueView.getMessageCount());
+
+         session.begin();
+         int oddID = 0;
+         for (int m = 0; m < 1; m++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDurable(true);
+            boolean odd = (m % 2 == 0);
+            message.setApplicationProperty("i", m);
+            message.setApplicationProperty("oddString", odd ? "odd" : "even");
+            message.setApplicationProperty("odd", odd);
+            if (odd) {
+               message.setApplicationProperty("oddID", oddID++);
+            }
+
+            byte[] bytes = new byte[size];
+            for (int i = 0; i < bytes.length; i++) {
+               bytes[i] = (byte) 'z';
+            }
+
+            message.setBytes(bytes);
+            sender.send(message);
+         }
+
+         session.commit();
+
+         AmqpReceiver receiver = session.createReceiver(getQueueName());
+         receiver.flow(1);
+         for (int i = 0; i < 1; i++) {
+            AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
+            Assert.assertNotNull(msgReceived);
+            
Assert.assertTrue((boolean)msgReceived.getApplicationProperty("odd"));
+            Assert.assertEquals(i, 
(int)msgReceived.getApplicationProperty("oddID"));
+            Data body = (Data) msgReceived.getWrappedMessage().getBody();
+            byte[] bodyArray = body.getValue().getArray();
+            for (int bI = 0; bI < size; bI++) {
+               Assert.assertEquals((byte) 'z', bodyArray[bI]);
+            }
+            msgReceived.accept(true);
+         }
+
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         receiver.close();
+         connection.close();
+
+         validateNoFilesOnLargeDir(getLargeMessagesDir(), 0);
+      } catch (Exception e) {
+         e.printStackTrace();
+         throw e;
+      }
+
+   }
+
    @Test
    public void testJMSPersistentTX() throws Exception {
 

Reply via email to