This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 5a89c19d0 FINERACT-1724: Added JMS connection pooling to the external
event producer
5a89c19d0 is described below
commit 5a89c19d0a392d61513560dec6fb565e2d733d85
Author: Arnold Galovics <[email protected]>
AuthorDate: Fri Jan 20 13:36:11 2023 +0100
FINERACT-1724: Added JMS connection pooling to the external event producer
---
.../config/ExternalEventJMSConfiguration.java | 19 ++++--
.../jms/JMSMultiExternalEventProducer.java | 68 +++++++++----------
.../jms/JMSMultiExternalEventProducerTest.java | 76 ++++++++--------------
3 files changed, 68 insertions(+), 95 deletions(-)
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
index b38f90e58..3b384b672 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/config/ExternalEventJMSConfiguration.java
@@ -28,6 +28,7 @@ import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@@ -37,27 +38,31 @@ public class ExternalEventJMSConfiguration {
@Autowired
private FineractProperties fineractProperties;
- @Bean
- public ActiveMQConnectionFactory connectionFactory() {
+ @Bean(name = "externalEventConnectionFactory")
+ public CachingConnectionFactory connectionFactory() {
+ FineractExternalEventsProducerJmsProperties jmsProps =
fineractProperties.getEvents().getExternal().getProducer().getJms();
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
-
connectionFactory.setBrokerURL(fineractProperties.getEvents().getExternal().getProducer().getJms().getBrokerUrl());
+ connectionFactory.setBrokerURL(jmsProps.getBrokerUrl());
connectionFactory.setTrustAllPackages(true);
- FineractExternalEventsProducerJmsProperties jmsProps =
fineractProperties.getEvents().getExternal().getProducer().getJms();
if (jmsProps.isBrokerPasswordProtected()) {
connectionFactory.setUserName(jmsProps.getBrokerUsername());
connectionFactory.setPassword(jmsProps.getBrokerPassword());
}
- return connectionFactory;
+ CachingConnectionFactory cachingConnectionFactory = new
CachingConnectionFactory();
+
cachingConnectionFactory.setSessionCacheSize(jmsProps.getProducerCount());
+ cachingConnectionFactory.setReconnectOnException(true);
+ cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
+ return cachingConnectionFactory;
}
@Conditional(EnableExternalEventTopicCondition.class)
- @Bean(name = "eventDestination")
+ @Bean(name = "externalEventDestination")
public ActiveMQTopic activeMqTopic() {
return new
ActiveMQTopic(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventTopicName());
}
@Conditional(EnableExternalEventQueueCondition.class)
- @Bean(name = "eventDestination")
+ @Bean(name = "externalEventDestination")
public ActiveMQQueue activeMqQueue() {
return new
ActiveMQQueue(fineractProperties.getEvents().getExternal().getProducer().getJms().getEventQueueName());
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
index 794c45cc3..3511780d8 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducer.java
@@ -39,8 +39,6 @@ import
org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
import org.apache.fineract.infrastructure.core.service.HashingService;
import
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
import
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.task.AsyncTaskExecutor;
@@ -50,11 +48,12 @@ import org.springframework.stereotype.Component;
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(value =
"fineract.events.external.producer.jms.enabled", havingValue = "true")
-public class JMSMultiExternalEventProducer implements ExternalEventProducer,
InitializingBean, DisposableBean {
+public class JMSMultiExternalEventProducer implements ExternalEventProducer {
- @Qualifier("eventDestination")
+ @Qualifier("externalEventDestination")
private final Destination destination;
+ @Qualifier("externalEventConnectionFactory")
private final ConnectionFactory connectionFactory;
private final MessageFactory messageFactory;
@@ -66,32 +65,6 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer, Ini
private final FineractProperties fineractProperties;
- private final List<MessageProducer> producers = new ArrayList<>();
-
- private Connection connection;
-
- @Override
- public void afterPropertiesSet() throws Exception {
- int producerCount = getProducerCount();
- connection = connectionFactory.createConnection();
- for (int i = 0; i < producerCount; i++) {
- // It's crucial to create the session within the loop, otherwise
the producers won't be handled as
- // parallel
- // producers
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(destination);
- producers.add(producer);
- }
- log.info("Initialized JMS multi producer for external events with {}
parallel producers", producerCount);
- }
-
- @Override
- public void destroy() throws Exception {
- if (connection != null) {
- connection.close();
- }
- }
-
private int getProducerCount() {
return
fineractProperties.getEvents().getExternal().getProducer().getJms().getProducerCount();
}
@@ -104,29 +77,48 @@ public class JMSMultiExternalEventProducer implements
ExternalEventProducer, Ini
waitForSendingCompletion(tasks);
}, timeTaken -> {
if (log.isDebugEnabled()) {
- // in case execution is faster than 1sec
- long seconds = Math.max(timeTaken.toSeconds(), 1L);
- Integer eventCount =
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
- log.debug("Sent messages with {} msg/s", (eventCount /
seconds));
+ int eventCount =
partitions.values().stream().map(Collection::size).reduce(0, Integer::sum);
+ int msgPerSec = (int) (((double) eventCount /
timeTaken.toMillis()) * 1000);
+ log.debug("Sent messages with {} msg/s", msgPerSec);
}
});
}
+ private List<MessageProducer> obtainProducers() {
+ List<MessageProducer> result = new ArrayList<>();
+ int producerCount = getProducerCount();
+ try {
+ // No need to close the connection since it's a pooled one
+ Connection connection = connectionFactory.createConnection();
+ for (int i = 0; i < producerCount; i++) {
+ // It's crucial to create the session within the loop,
otherwise the producers won't be handled as
+ // parallel
+ // producers
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ result.add(producer);
+ }
+ } catch (JMSException e) {
+ throw new RuntimeException("Error while obtaining message
producers", e);
+ }
+ return result;
+ }
+
private List<Future<?>> sendPartitions(Map<Integer, List<byte[]>>
indexedPartitions) {
+ List<MessageProducer> producers = obtainProducers();
List<Future<?>> tasks = new ArrayList<>();
for (Map.Entry<Integer, List<byte[]>> entry :
indexedPartitions.entrySet()) {
Integer producerIndex = entry.getKey();
+ MessageProducer producer = producers.get(producerIndex);
List<byte[]> messages = entry.getValue();
- Future<?> future = createSendingTask(producerIndex, messages);
+ Future<?> future = createSendingTask(producer, messages);
tasks.add(future);
}
return tasks;
}
- private Future<?> createSendingTask(Integer producerIndex, List<byte[]>
messages) {
+ private Future<?> createSendingTask(MessageProducer messageProducer,
List<byte[]> messages) {
return taskExecutor.submit(() -> {
- MessageProducer messageProducer = producers.get(producerIndex);
-
for (byte[] message : messages) {
try {
messageProducer.send(destination,
messageFactory.createByteMessage(message));
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
index 2f28dca2a..8cd0dd431 100644
---
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/producer/jms/JMSMultiExternalEventProducerTest.java
@@ -19,7 +19,6 @@
package org.apache.fineract.infrastructure.event.external.producer.jms;
import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
@@ -30,15 +29,15 @@ import java.util.Map;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.apache.fineract.infrastructure.core.messaging.jms.MessageFactory;
import org.apache.fineract.infrastructure.core.service.HashingService;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -59,7 +58,7 @@ class JMSMultiExternalEventProducerTest {
@Mock
private Destination destination;
@Mock
- private ActiveMQConnectionFactory connectionFactory;
+ private ConnectionFactory connectionFactory;
@Mock
private MessageFactory messageFactory;
@@ -88,27 +87,31 @@ class JMSMultiExternalEventProducerTest {
private FineractProperties fineractProperties;
- private List<MessageProducer> producers = new ArrayList<>();
+ @BeforeEach
+ public void setUp() throws Exception {
+ FineractProperties.FineractExternalEventsProducerJmsProperties jms =
new FineractProperties.FineractExternalEventsProducerJmsProperties();
+ jms.setProducerCount(PRODUCER_COUNT);
+ FineractProperties.FineractExternalEventsProducerProperties producer =
new FineractProperties.FineractExternalEventsProducerProperties();
+ producer.setJms(jms);
+ FineractProperties.FineractExternalEventsProperties external = new
FineractProperties.FineractExternalEventsProperties();
+ external.setProducer(producer);
+ FineractProperties.FineractEventsProperties events = new
FineractProperties.FineractEventsProperties();
+ events.setExternal(external);
+ fineractProperties = new FineractProperties();
+ fineractProperties.setEvents(events);
+ underTest = new JMSMultiExternalEventProducer(destination,
connectionFactory, messageFactory, taskExecutor, hashingService,
+ fineractProperties);
- @Test
- public void testAfterPropertiesShouldCreateMultipleSessions() throws
Exception {
- // given
- initializeMocks();
- // when
- underTest.afterPropertiesSet();
- // then
- verify(connection, times(3)).createSession(false,
Session.AUTO_ACKNOWLEDGE);
- verify(session1).createProducer(destination);
- verify(session2).createProducer(destination);
- verify(session3).createProducer(destination);
+ given(connectionFactory.createConnection()).willReturn(connection);
+ given(connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
+ given(session1.createProducer(destination)).willReturn(producer1);
+ given(session2.createProducer(destination)).willReturn(producer2);
+ given(session3.createProducer(destination)).willReturn(producer3);
}
@Test
public void testSendEventsShouldWork() throws Exception {
// given
- initializeMocks();
- underTest.afterPropertiesSet();
-
byte[] msg1 = createMessage();
List<byte[]> messages = new ArrayList<>();
messages.add(msg1);
@@ -116,7 +119,7 @@ class JMSMultiExternalEventProducerTest {
BytesMessage bytesMsg1 = Mockito.mock(BytesMessage.class);
given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
- given(hashingService.consistentHash(1L,
producers.size())).willReturn(0);
+ given(hashingService.consistentHash(1L, PRODUCER_COUNT)).willReturn(0);
// when
underTest.sendEvents(partitions);
// then
@@ -126,9 +129,6 @@ class JMSMultiExternalEventProducerTest {
@Test
public void testSendEventsBalancesBetweenProducers() throws Exception {
// given
- initializeMocks();
- underTest.afterPropertiesSet();
-
byte[] msg1 = createMessage();
byte[] msg2 = createMessage();
byte[] msg3 = createMessage();
@@ -146,9 +146,9 @@ class JMSMultiExternalEventProducerTest {
given(messageFactory.createByteMessage(msg1)).willReturn(bytesMsg1);
given(messageFactory.createByteMessage(msg2)).willReturn(bytesMsg2);
given(messageFactory.createByteMessage(msg3)).willReturn(bytesMsg3);
- given(hashingService.consistentHash(1L,
producers.size())).willReturn(0);
- given(hashingService.consistentHash(2L,
producers.size())).willReturn(1);
- given(hashingService.consistentHash(3L,
producers.size())).willReturn(2);
+ given(hashingService.consistentHash(1L, PRODUCER_COUNT)).willReturn(0);
+ given(hashingService.consistentHash(2L, PRODUCER_COUNT)).willReturn(1);
+ given(hashingService.consistentHash(3L, PRODUCER_COUNT)).willReturn(2);
// when
underTest.sendEvents(partitions);
// then
@@ -172,28 +172,4 @@ class JMSMultiExternalEventProducerTest {
return messageV1.toByteBuffer().array();
}
- private void initializeMocks() throws JMSException {
- FineractProperties.FineractExternalEventsProducerJmsProperties jms =
new FineractProperties.FineractExternalEventsProducerJmsProperties();
- jms.setProducerCount(PRODUCER_COUNT);
- FineractProperties.FineractExternalEventsProducerProperties producer =
new FineractProperties.FineractExternalEventsProducerProperties();
- producer.setJms(jms);
- FineractProperties.FineractExternalEventsProperties external = new
FineractProperties.FineractExternalEventsProperties();
- external.setProducer(producer);
- FineractProperties.FineractEventsProperties events = new
FineractProperties.FineractEventsProperties();
- events.setExternal(external);
- fineractProperties = new FineractProperties();
- fineractProperties.setEvents(events);
- underTest = new JMSMultiExternalEventProducer(destination,
connectionFactory, messageFactory, taskExecutor, hashingService,
- fineractProperties);
-
- given(connectionFactory.createConnection()).willReturn(connection);
- given(connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)).willReturn(session1, session2, session3);
- given(session1.createProducer(destination)).willReturn(producer1);
- given(session2.createProducer(destination)).willReturn(producer2);
- given(session3.createProducer(destination)).willReturn(producer3);
-
- producers.add(producer1);
- producers.add(producer2);
- producers.add(producer3);
- }
}