This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new f0d2e9326 FINERACT-1724: Closing JMS sessions explicitly to avoid 
memory leaking for multi external event producer
f0d2e9326 is described below

commit f0d2e9326d241db8c49d86f85c79d6e3ce0d8b8a
Author: Arnold Galovics <[email protected]>
AuthorDate: Tue Mar 28 11:28:44 2023 +0200

    FINERACT-1724: Closing JMS sessions explicitly to avoid memory leaking for 
multi external event producer
---
 .../jms/JMSMultiExternalEventProducer.java         | 30 +++++++++++++++++-----
 .../jms/JMSMultiExternalEventProducerTest.java     |  8 ++++++
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 3511780d8..15b2edcb2 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -34,6 +35,8 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
 import org.apache.fineract.infrastructure.core.service.HashingService;
@@ -73,8 +76,12 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer {
     public void sendEvents(Map<Long, List<byte[]>> partitions) throws 
AcknowledgementTimeoutException {
         Map<Integer, List<byte[]>> indexedPartitions = 
mapPartitionsToProducers(partitions);
         measure(() -> {
-            List<Future<?>> tasks = sendPartitions(indexedPartitions);
+            List<Pair<Session, MessageProducer>> producersWithSessions = 
obtainProducers();
+            List<MessageProducer> producers = 
producersWithSessions.stream().map(Pair::getRight).collect(Collectors.toList());
+            List<Session> sessions = 
producersWithSessions.stream().map(Pair::getLeft).collect(Collectors.toList());
+            List<Future<?>> tasks = sendPartitions(indexedPartitions, 
producers);
             waitForSendingCompletion(tasks);
+            closeSessions(sessions);
         }, timeTaken -> {
             if (log.isDebugEnabled()) {
                 int eventCount = 
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
@@ -84,8 +91,20 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer {
         });
     }
 
-    private List<MessageProducer> obtainProducers() {
-        List<MessageProducer> result = new ArrayList<>();
+    private void closeSessions(List<Session> sessions) {
+        // The sessions retrieved from a CachingConnectionFactory needs to be 
explicitly closed, otherwise we're making
+        // orphan sessions, leaking memory
+        for (Session session : sessions) {
+            try {
+                session.close();
+            } catch (JMSException e) {
+                log.error("Exception while trying to close sessions", e);
+            }
+        }
+    }
+
+    private List<Pair<Session, MessageProducer>> obtainProducers() {
+        List<Pair<Session, MessageProducer>> result = new ArrayList<>();
         int producerCount = getProducerCount();
         try {
             // No need to close the connection since it's a pooled one
@@ -96,7 +115,7 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer {
                 // producers
                 Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                 MessageProducer producer = session.createProducer(destination);
-                result.add(producer);
+                result.add(new ImmutablePair<>(session, producer));
             }
         } catch (JMSException e) {
             throw new RuntimeException("Error while obtaining message 
producers", e);
@@ -104,8 +123,7 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer {
         return result;
     }
 
-    private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>> 
indexedPartitions) {
-        List<MessageProducer> producers = obtainProducers();
+    private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>> 
indexedPartitions, List<MessageProducer> producers) {
         List<Future<?>> tasks = new ArrayList<>();
         for (Map.Entry<Integer, List<byte[]>> entry : 
indexedPartitions.entrySet()) {
             Integer producerIndex = entry.getKey();
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
index 8cd0dd431..79fc715fe 100644
--- 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
 import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -109,6 +110,13 @@ class JMSMultiExternalEventProducerTest {
         given(session3.createProducer(destination)).willReturn(producer3);
     }
 
+    @AfterEach
+    public void tearDown() throws Exception {
+        verify(session1).close();
+        verify(session2).close();
+        verify(session3).close();
+    }
+
     @Test
     public void testSendEventsShouldWork() throws Exception {
         // given

Reply via email to