This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ed12b5  ARTEMIS-3467 FD leak on receiving AMQP large messages
     new 6f2d58f  This closes #3732
9ed12b5 is described below

commit 9ed12b53d63fe63f886b8dfa513c051e7e19e134
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Thu Sep 9 16:25:35 2021 +0200

    ARTEMIS-3467 FD leak on receiving AMQP large messages
---
 .../amqp/proton/ProtonAbstractReceiver.java        |  1 +
 .../tests/integration/client/LargeMessageTest.java | 57 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index e0146c0..4a5675c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -252,6 +252,7 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
             currentLargeMessage.addBytes(receiver.recv());
             receiver.advance();
             message = currentLargeMessage;
+            currentLargeMessage.releaseResources(true, true);
             currentLargeMessage = null;
          } else {
             ReadableBuffer data = receiver.recv();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index d98876b..b1b8bb0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -16,9 +16,16 @@
  */
 package org.apache.activemq.artemis.tests.integration.client;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.io.ByteArrayOutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.UUID;
@@ -26,9 +33,11 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.sun.management.UnixOperatingSystemMXBean;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -53,11 +62,13 @@ import 
org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -2738,6 +2749,52 @@ public class LargeMessageTest extends 
LargeMessageTestBase {
       cons.close();
    }
 
+   @Test
+   public void testAMQPLargeMessageFDs() throws Exception {
+      OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
+
+      Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean);
+
+      final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+      final int numberOfMessages = 30;
+      ActiveMQServer server = createServer(true, true);
+
+      server.start();
+
+      long fdBefore = 
((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount();
+
+      server.createQueue(new 
QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
+
+      ConnectionFactory connectionFactory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
+      Connection connection = connectionFactory.createConnection();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+      byte[] bufferSample = new byte[300 * 1024];
+
+      for (int i = 0; i < bufferSample.length; i++) {
+         bufferSample[i] = getSamplebyte(i);
+      }
+
+      javax.jms.Queue jmsQueue = session.createQueue(MY_QUEUE.toString());
+
+      MessageProducer producer = session.createProducer(jmsQueue);
+      producer.setTimeToLive(300);
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         BytesMessage message = session.createBytesMessage();
+         message.writeBytes(bufferSample);
+
+         message.setIntProperty("count", i);
+
+         producer.send(message);
+      }
+
+      session.close();
+      connection.close();
+
+      Wait.assertTrue(() -> 
((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount() - fdBefore < 3);
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Reply via email to