This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5a89c19d0 FINERACT-1724: Added JMS connection pooling to the external 
event producer
5a89c19d0 is described below

commit 5a89c19d0a392d61513560dec6fb565e2d733d85
Author: Arnold Galovics <[email protected]>
AuthorDate: Fri Jan 20 13:36:11 2023 +0100

    FINERACT-1724: Added JMS connection pooling to the external event producer
---
 .../config/ExternalEventJMSConfiguration.java      | 19 ++++--
 .../jms/JMSMultiExternalEventProducer.java         | 68 +++++++++----------
 .../jms/JMSMultiExternalEventProducerTest.java     | 76 ++++++++--------------
 3 files changed, 68 insertions(+), 95 deletions(-)

diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index b38f90e58..3b384b672 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -28,6 +28,7 @@ import 
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Conditional;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.connection.CachingConnectionFactory;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 @Configuration
@@ -37,27 +38,31 @@ public class ExternalEventJMSConfiguration {
     @Autowired
     private FineractProperties fineractProperties;
 
-    @Bean
-    public ActiveMQConnectionFactory connectionFactory() {
+    @Bean(name = "externalEventConnectionFactory")
+    public CachingConnectionFactory connectionFactory() {
+        FineractExternalEventsProducerJmsProperties jmsProps = 
fineractProperties.getEvents().getExternal().getProducer().getJms();
         ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory();
-        
connectionFactory.setBrokerURL(fineractProperties.getEvents().getExternal().getProducer().getJms().getBrokerUrl());
+        connectionFactory.setBrokerURL(jmsProps.getBrokerUrl());
         connectionFactory.setTrustAllPackages(true);
-        FineractExternalEventsProducerJmsProperties jmsProps = 
fineractProperties.getEvents().getExternal().getProducer().getJms();
         if (jmsProps.isBrokerPasswordProtected()) {
             connectionFactory.setUserName(jmsProps.getBrokerUsername());
             connectionFactory.setPassword(jmsProps.getBrokerPassword());
         }
-        return connectionFactory;
+        CachingConnectionFactory cachingConnectionFactory = new 
CachingConnectionFactory();
+        
cachingConnectionFactory.setSessionCacheSize(jmsProps.getProducerCount());
+        cachingConnectionFactory.setReconnectOnException(true);
+        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
+        return cachingConnectionFactory;
     }
 
     @Conditional(EnableExternalEventTopicCondition.class)
-    @Bean(name = "eventDestination")
+    @Bean(name = "externalEventDestination")
     public ActiveMQTopic activeMqTopic() {
         return new 
ActiveMQTopic(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventTopicName());
     }
 
     @Conditional(EnableExternalEventQueueCondition.class)
-    @Bean(name = "eventDestination")
+    @Bean(name = "externalEventDestination")
     public ActiveMQQueue activeMqQueue() {
         return new 
ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
     }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 794c45cc3..3511780d8 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -39,8 +39,6 @@ import 
org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
 import org.apache.fineract.infrastructure.core.service.HashingService;
 import 
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
 import 
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.core.task.AsyncTaskExecutor;
@@ -50,11 +48,12 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @RequiredArgsConstructor
 @ConditionalOnProperty(value = 
"fineract.events.external.producer.jms.enabled", havingValue = "true")
-public class JMSMultiExternalEventProducer implements ExternalEventProducer, 
InitializingBean, DisposableBean {
+public class JMSMultiExternalEventProducer implements ExternalEventProducer {
 
-    @Qualifier("eventDestination")
+    @Qualifier("externalEventDestination")
     private final Destination destination;
 
+    @Qualifier("externalEventConnectionFactory")
     private final ConnectionFactory connectionFactory;
 
     private final MessageFactory messageFactory;
@@ -66,32 +65,6 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer, Ini
 
     private final FineractProperties fineractProperties;
 
-    private final List<MessageProducer> producers = new ArrayList<>();
-
-    private Connection connection;
-
-    @Override
-    public void afterPropertiesSet() throws Exception {
-        int producerCount = getProducerCount();
-        connection = connectionFactory.createConnection();
-        for (int i = 0; i < producerCount; i++) {
-            // It's crucial to create the session within the loop, otherwise 
the producers won't be handled as
-            // parallel
-            // producers
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageProducer producer = session.createProducer(destination);
-            producers.add(producer);
-        }
-        log.info("Initialized JMS multi producer for external events with {} 
parallel producers", producerCount);
-    }
-
-    @Override
-    public void destroy() throws Exception {
-        if (connection != null) {
-            connection.close();
-        }
-    }
-
     private int getProducerCount() {
         return 
fineractProperties.getEvents().getExternal().getProducer().getJms().getProducerCount();
     }
@@ -104,29 +77,48 @@ public class JMSMultiExternalEventProducer implements 
ExternalEventProducer, Ini
             waitForSendingCompletion(tasks);
         }, timeTaken -> {
             if (log.isDebugEnabled()) {
-                // in case execution is faster than 1sec
-                long seconds = Math.max(timeTaken.toSeconds(), 1L);
-                Integer eventCount = 
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
-                log.debug("Sent messages with {} msg/s", (eventCount / 
seconds));
+                int eventCount = 
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
+                int msgPerSec = (int) (((double) eventCount / 
timeTaken.toMillis()) * 1000);
+                log.debug("Sent messages with {} msg/s", msgPerSec);
             }
         });
     }
 
+    private List<MessageProducer> obtainProducers() {
+        List<MessageProducer> result = new ArrayList<>();
+        int producerCount = getProducerCount();
+        try {
+            // No need to close the connection since it's a pooled one
+            Connection connection = connectionFactory.createConnection();
+            for (int i = 0; i < producerCount; i++) {
+                // It's crucial to create the session within the loop, 
otherwise the producers won't be handled as
+                // parallel
+                // producers
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                result.add(producer);
+            }
+        } catch (JMSException e) {
+            throw new RuntimeException("Error while obtaining message 
producers", e);
+        }
+        return result;
+    }
+
     private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>> 
indexedPartitions) {
+        List<MessageProducer> producers = obtainProducers();
         List<Future<?>> tasks = new ArrayList<>();
         for (Map.Entry<Integer, List<byte[]>> entry : 
indexedPartitions.entrySet()) {
             Integer producerIndex = entry.getKey();
+            MessageProducer producer = producers.get(producerIndex);
             List<byte[]> messages = entry.getValue();
-            Future<?> future = createSendingTask(producerIndex, messages);
+            Future<?> future = createSendingTask(producer, messages);
             tasks.add(future);
         }
         return tasks;
     }
 
-    private Future<?> createSendingTask(Integer producerIndex, List<byte[]> 
messages) {
+    private Future<?> createSendingTask(MessageProducer messageProducer, 
List<byte[]> messages) {
         return taskExecutor.submit(() -> {
-            MessageProducer messageProducer = producers.get(producerIndex);
-
             for (byte[] message : messages) {
                 try {
                     messageProducer.send(destination, 
messageFactory.createByteMessage(message));
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
index 2f28dca2a..8cd0dd431 100644
--- 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -19,7 +19,6 @@
 package org.apache.fineract.infrastructure.event.external.producer.jms;
 
 import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
@@ -30,15 +29,15 @@ import java.util.Map;
 import java.util.Random;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
 import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -59,7 +58,7 @@ class JMSMultiExternalEventProducerTest {
     @Mock
     private Destination destination;
     @Mock
-    private ActiveMQConnectionFactory connectionFactory;
+    private ConnectionFactory connectionFactory;
     @Mock
     private MessageFactory messageFactory;
 
@@ -88,27 +87,31 @@ class JMSMultiExternalEventProducerTest {
 
     private FineractProperties fineractProperties;
 
-    private List<MessageProducer> producers = new ArrayList<>();
+    @BeforeEach
+    public void setUp() throws Exception {
+        FineractProperties.FineractExternalEventsProducerJmsProperties jms = 
new FineractProperties.FineractExternalEventsProducerJmsProperties();
+        jms.setProducerCount(PRODUCER_COUNT);
+        FineractProperties.FineractExternalEventsProducerProperties producer = 
new FineractProperties.FineractExternalEventsProducerProperties();
+        producer.setJms(jms);
+        FineractProperties.FineractExternalEventsProperties external = new 
FineractProperties.FineractExternalEventsProperties();
+        external.setProducer(producer);
+        FineractProperties.FineractEventsProperties events = new 
FineractProperties.FineractEventsProperties();
+        events.setExternal(external);
+        fineractProperties = new FineractProperties();
+        fineractProperties.setEvents(events);
+        underTest = new JMSMultiExternalEventProducer(destination, 
connectionFactory, messageFactory, taskExecutor, hashingService,
+                fineractProperties);
 
-    @Test
-    public void testAfterPropertiesShouldCreateMultipleSessions() throws 
Exception {
-        // given
-        initializeMocks();
-        // when
-        underTest.afterPropertiesSet();
-        // then
-        verify(connection, times(3)).createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        verify(session1).createProducer(destination);
-        verify(session2).createProducer(destination);
-        verify(session3).createProducer(destination);
+        given(connectionFactory.createConnection()).willReturn(connection);
+        given(connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
+        given(session1.createProducer(destination)).willReturn(producer1);
+        given(session2.createProducer(destination)).willReturn(producer2);
+        given(session3.createProducer(destination)).willReturn(producer3);
     }
 
     @Test
     public void testSendEventsShouldWork() throws Exception {
         // given
-        initializeMocks();
-        underTest.afterPropertiesSet();
-
         byte[] msg1 = createMessage();
         List<byte[]> messages = new ArrayList<>();
         messages.add(msg1);
@@ -116,7 +119,7 @@ class JMSMultiExternalEventProducerTest {
 
         BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
         given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
-        given(hashingService.consistentHash(1L, 
producers.size())).willReturn(0);
+        given(hashingService.consistentHash(1L, PRODUCER_COUNT)).willReturn(0);
         // when
         underTest.sendEvents(partitions);
         // then
@@ -126,9 +129,6 @@ class JMSMultiExternalEventProducerTest {
     @Test
     public void testSendEventsBalancesBetweenProducers() throws Exception {
         // given
-        initializeMocks();
-        underTest.afterPropertiesSet();
-
         byte[] msg1 = createMessage();
         byte[] msg2 = createMessage();
         byte[] msg3 = createMessage();
@@ -146,9 +146,9 @@ class JMSMultiExternalEventProducerTest {
         given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
         given(messageFactory.createByteMessage(msg2)).willReturn(bytesMsg2);
         given(messageFactory.createByteMessage(msg3)).willReturn(bytesMsg3);
-        given(hashingService.consistentHash(1L, 
producers.size())).willReturn(0);
-        given(hashingService.consistentHash(2L, 
producers.size())).willReturn(1);
-        given(hashingService.consistentHash(3L, 
producers.size())).willReturn(2);
+        given(hashingService.consistentHash(1L, PRODUCER_COUNT)).willReturn(0);
+        given(hashingService.consistentHash(2L, PRODUCER_COUNT)).willReturn(1);
+        given(hashingService.consistentHash(3L, PRODUCER_COUNT)).willReturn(2);
         // when
         underTest.sendEvents(partitions);
         // then
@@ -172,28 +172,4 @@ class JMSMultiExternalEventProducerTest {
         return messageV1.toByteBuffer().array();
     }
 
-    private void initializeMocks() throws JMSException {
-        FineractProperties.FineractExternalEventsProducerJmsProperties jms = 
new FineractProperties.FineractExternalEventsProducerJmsProperties();
-        jms.setProducerCount(PRODUCER_COUNT);
-        FineractProperties.FineractExternalEventsProducerProperties producer = 
new FineractProperties.FineractExternalEventsProducerProperties();
-        producer.setJms(jms);
-        FineractProperties.FineractExternalEventsProperties external = new 
FineractProperties.FineractExternalEventsProperties();
-        external.setProducer(producer);
-        FineractProperties.FineractEventsProperties events = new 
FineractProperties.FineractEventsProperties();
-        events.setExternal(external);
-        fineractProperties = new FineractProperties();
-        fineractProperties.setEvents(events);
-        underTest = new JMSMultiExternalEventProducer(destination, 
connectionFactory, messageFactory, taskExecutor, hashingService,
-                fineractProperties);
-
-        given(connectionFactory.createConnection()).willReturn(connection);
-        given(connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
-        given(session1.createProducer(destination)).willReturn(producer1);
-        given(session2.createProducer(destination)).willReturn(producer2);
-        given(session3.createProducer(destination)).willReturn(producer3);
-
-        producers.add(producer1);
-        producers.add(producer2);
-        producers.add(producer3);
-    }
 }

Reply via email to