This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6db63ac  ARTEMIS-2785 test Netty direct ByteBuf memory leak due to 
compression
     new 86360a2  This closes #3155
6db63ac is described below

commit 6db63acee2355a54bd17dc9f63bd9d65f12a3f3b
Author: Francesco Nigro <[email protected]>
AuthorDate: Mon Jun 1 12:50:25 2020 +0200

    ARTEMIS-2785 test Netty direct ByteBuf memory leak due to compression
---
 .../client/LargeMessageCompressTest.java           | 45 ++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
index 573ac1b..e2eae05 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -38,8 +39,12 @@ import 
org.apache.activemq.artemis.core.management.impl.QueueControlImpl;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
+
 import javax.management.openmbean.CompositeData;
 
 /**
@@ -130,6 +135,46 @@ public class LargeMessageCompressTest extends 
LargeMessageTest {
    }
 
    @Test
+   public void testNoDirectByteBufLeaksOnLargeMessageCompression() throws 
Exception {
+      Assume.assumeThat(PlatformDependent.usedDirectMemory(), 
not(equalTo(Long.valueOf(-1))));
+      final int messageSize = (int) (3.5 * 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ActiveMQServer server = createServer(true, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, false, 
false));
+
+      session.createQueue(new 
QueueConfiguration(ADDRESS).setAddress(ADDRESS).setDurable(false).setTemporary(true));
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      Message clientFile = createLargeClientMessageStreaming(session, 
messageSize, true);
+
+      producer.send(clientFile);
+
+      session.commit();
+
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      final long usedDirectMemoryBeforeReceive = 
PlatformDependent.usedDirectMemory();
+      ClientMessage msg1 = consumer.receive(1000);
+      Assert.assertNotNull(msg1);
+      final long usedDirectMemoryAfterReceive = 
PlatformDependent.usedDirectMemory();
+      Assert.assertEquals("large message compression is leaking some Netty 
direct ByteBuff",
+                          usedDirectMemoryBeforeReceive, 
usedDirectMemoryAfterReceive);
+      msg1.acknowledge();
+      session.commit();
+
+      consumer.close();
+
+      session.close();
+   }
+
+   @Test
    public void testLargeMessageCompression() throws Exception {
       final int messageSize = (int) (3.5 * 
ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
 

Reply via email to