This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new f0d2e9326 FINERACT-1724: Closing JMS sessions explicitly to avoid
memory leaking for multi external event producer
f0d2e9326 is described below
commit f0d2e9326d241db8c49d86f85c79d6e3ce0d8b8a
Author: Arnold Galovics <[email protected]>
AuthorDate: Tue Mar 28 11:28:44 2023 +0200
FINERACT-1724: Closing JMS sessions explicitly to avoid memory leaking for
multi external event producer
---
.../jms/JMSMultiExternalEventProducer.java | 30 +++++++++++++++++-----
.../jms/JMSMultiExternalEventProducerTest.java | 8 ++++++
2 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 3511780d8..15b2edcb2 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -34,6 +35,8 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
import org.apache.fineract.infrastructure.core.service.HashingService;
@@ -73,8 +76,12 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer {
public void sendEvents(Map<Long, List<byte[]>> partitions) throws
AcknowledgementTimeoutException {
Map<Integer, List<byte[]>> indexedPartitions =
mapPartitionsToProducers(partitions);
measure(() -> {
- List<Future<?>> tasks = sendPartitions(indexedPartitions);
+ List<Pair<Session, MessageProducer>> producersWithSessions =
obtainProducers();
+ List<MessageProducer> producers =
producersWithSessions.stream().map(Pair::getRight).collect(Collectors.toList());
+ List<Session> sessions =
producersWithSessions.stream().map(Pair::getLeft).collect(Collectors.toList());
+ List<Future<?>> tasks = sendPartitions(indexedPartitions,
producers);
waitForSendingCompletion(tasks);
+ closeSessions(sessions);
}, timeTaken -> {
if (log.isDebugEnabled()) {
int eventCount =
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
@@ -84,8 +91,20 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer {
});
}
- private List<MessageProducer> obtainProducers() {
- List<MessageProducer> result = new ArrayList<>();
+ private void closeSessions(List<Session> sessions) {
+ // The sessions retrieved from a CachingConnectionFactory needs to be
explicitly closed, otherwise we're making
+ // orphan sessions, leaking memory
+ for (Session session : sessions) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ log.error("Exception while trying to close sessions", e);
+ }
+ }
+ }
+
+ private List<Pair<Session, MessageProducer>> obtainProducers() {
+ List<Pair<Session, MessageProducer>> result = new ArrayList<>();
int producerCount = getProducerCount();
try {
// No need to close the connection since it's a pooled one
@@ -96,7 +115,7 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer {
// producers
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
- result.add(producer);
+ result.add(new ImmutablePair<>(session, producer));
}
} catch (JMSException e) {
throw new RuntimeException("Error while obtaining message
producers", e);
@@ -104,8 +123,7 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer {
return result;
}
- private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>>
indexedPartitions) {
- List<MessageProducer> producers = obtainProducers();
+ private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>>
indexedPartitions, List<MessageProducer> producers) {
List<Future<?>> tasks = new ArrayList<>();
for (Map.Entry<Integer, List<byte[]>> entry :
indexedPartitions.entrySet()) {
Integer producerIndex = entry.getKey();
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
index 8cd0dd431..79fc715fe 100644
---
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -37,6 +37,7 @@ import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -109,6 +110,13 @@ class JMSMultiExternalEventProducerTest {
given(session3.createProducer(destination)).willReturn(producer3);
}
+ @AfterEach
+ public void tearDown() throws Exception {
+ verify(session1).close();
+ verify(session2).close();
+ verify(session3).close();
+ }
+
@Test
public void testSendEventsShouldWork() throws Exception {
// given