This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 52d236d  ARTEMIS-2188 fix address size leak caused by large page 
message
     new a04f805  This closes #2446
52d236d is described below

commit 52d236d850aeca10a49aa54833141df2f51722a2
Author: yang wei <wy96...@gmail.com>
AuthorDate: Thu Nov 29 22:52:16 2018 +0800

    ARTEMIS-2188 fix address size leak caused by large page message
---
 .../impl/journal/LargeServerMessageImpl.java       | 23 ++++--
 .../tests/integration/client/LargeMessageTest.java | 81 +++++++++++++++++++++-
 2 files changed, 99 insertions(+), 5 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 2824ff7..431c19d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -26,6 +26,7 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.RefCountMessageListener;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
@@ -187,7 +188,15 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
    public synchronized void incrementDelayDeletionCount() {
       delayDeletionCount.incrementAndGet();
       try {
-         incrementRefCount();
+         if (paged) {
+            RefCountMessageListener tmpContext = super.getContext();
+            setContext(null);
+            incrementRefCount();
+            setContext(tmpContext);
+         } else {
+            incrementRefCount();
+         }
+
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.errorIncrementDelayDeletionCount(e);
       }
@@ -226,7 +235,15 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
 
    @Override
    public synchronized int decrementRefCount() throws Exception {
-      int currentRefCount = super.decrementRefCount();
+      int currentRefCount;
+      if (paged) {
+         RefCountMessageListener tmpContext = super.getContext();
+         setContext(null);
+         currentRefCount = super.decrementRefCount();
+         setContext(tmpContext);
+      } else {
+         currentRefCount = super.decrementRefCount();
+      }
 
       // We use <= as this could be used by load.
       // because of a failure, no references were loaded, so we have 0... and 
we still need to delete the associated
@@ -234,7 +251,6 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
       if (delayDeletionCount.get() <= 0) {
          checkDelete();
       }
-
       return currentRefCount;
    }
 
@@ -534,5 +550,4 @@ public final class LargeServerMessageImpl extends 
CoreMessage implements LargeSe
       }
    }
 
-
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 1d9075d..eac724c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -40,6 +40,7 @@ import 
org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.paging.PagingStore;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -2475,8 +2476,86 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
       session.close();
    }
 
+   @Test
+   public void testGlobalSizeBytesAndAddressSizeOnPage() throws Exception {
+      testGlobalSizeBytesAndAddressSize(true);
+   }
+
+   @Test
+   public void testGlobalSizeBytesAndAddressSize() throws Exception {
+      testGlobalSizeBytesAndAddressSize(false);
+   }
+
+   public void testGlobalSizeBytesAndAddressSize(boolean isPage) throws 
Exception {
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      ClientSessionFactory sf = 
addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(false, false);
+
+      LargeServerMessageImpl fileMessage = new 
LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
+
+      fileMessage.setMessageID(1005);
+
+      for (int i = 0; i < largeMessageSize; i++) {
+         fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
+      }
+
+      fileMessage.releaseResources();
+
+      session.createQueue(ADDRESS, ADDRESS, true);
+
+      PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+
+      if (isPage) {
+         store.startPaging();
+      }
+
+      ClientProducer prod = session.createProducer(ADDRESS);
+
+      prod.send(fileMessage);
+
+      fileMessage.deleteFile();
+
+      session.commit();
+
+      if (isPage) {
+         
server.getPagingManager().getPageStore(ADDRESS).getCursorProvider().clearCache();
+      }
+
+      if (isPage) {
+         Assert.assertEquals(0, 
server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
+         Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
+      } else {
+         Assert.assertNotEquals(0, 
server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
+         Assert.assertNotEquals(0, server.getPagingManager().getGlobalSize());
+      }
+
+      session.start();
+
+      ClientConsumer cons = session.createConsumer(ADDRESS);
+
+      ClientMessage msg = cons.receive(5000);
+
+      Assert.assertNotNull(msg);
+
+      msg.acknowledge();
+
+      session.commit();
+
+      Assert.assertEquals(0, 
server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
+
+      Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
+
+      session.close();
+
+      cons.close();
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
 
-}
+}
\ No newline at end of file

Reply via email to