This is an automated email from the ASF dual-hosted git repository.
taskain pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 174b239c1 FINERACT-2066: Send async event job tuning
174b239c1 is described below
commit 174b239c1078ef54686532aeecf8892146a9c848
Author: taskain7 <[email protected]>
AuthorDate: Wed Mar 13 14:25:54 2024 +0100
FINERACT-2066: Send async event job tuning
---
...a => FineractExternalEventConfigCondition.java} | 30 ++++----
.../condition/FineractValidationCondition.java | 3 +
.../core/config/FineractProperties.java | 1 +
.../jobs/SendAsynchronousEventsTasklet.java | 19 ++---
.../src/main/resources/application.properties | 1 +
.../FineractExternalEventConfigConditionTest.java | 83 ++++++++++++++++++++++
.../jobs/SendAsynchronousEventsTaskletTest.java | 1 +
.../src/test/resources/application-test.properties | 1 +
8 files changed, 117 insertions(+), 22 deletions(-)
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
similarity index 52%
copy from
fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
copy to
fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
index c4503cdb2..715484bfa 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
@@ -18,21 +18,23 @@
*/
package org.apache.fineract.infrastructure.core.condition;
-import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
-import org.springframework.context.annotation.Conditional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
-public class FineractValidationCondition extends AnyNestedCondition {
+@Slf4j
+public class FineractExternalEventConfigCondition extends PropertiesCondition {
- public FineractValidationCondition() {
- super(ConfigurationPhase.PARSE_CONFIGURATION);
+ @Override
+ protected boolean matches(FineractProperties properties) {
+ int partitionSize =
properties.getEvents().getExternal().getPartitionSize();
+ boolean conditionFails = false;
+ if (partitionSize > 25000) {
+ conditionFails = true;
+ log.error("The partition size for external event partitions cannot
be bigger than 25000.");
+ } else if (partitionSize < 1) {
+ conditionFails = true;
+ log.error("The partition size for external event partitions must
be positive.");
+ }
+ return conditionFails;
}
-
- @Conditional(FineractModeValidationCondition.class)
- static class FineractModeValidation {}
-
- @Conditional(FineractPartitionJobConfigValidationCondition.class)
- static class FineractPartitionedJobValidation {}
-
- @Conditional(FineractRemoteJobMessageHandlerCondition.class)
- static class FineractRemoteJobMessageHandlerValidation {}
}
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
index c4503cdb2..afebbf407 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
@@ -35,4 +35,7 @@ public class FineractValidationCondition extends
AnyNestedCondition {
@Conditional(FineractRemoteJobMessageHandlerCondition.class)
static class FineractRemoteJobMessageHandlerValidation {}
+
+ @Conditional(FineractExternalEventConfigCondition.class)
+ static class FineractExternalEventConfigValidation {}
}
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index 0e0b57de8..7a5d647c8 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -284,6 +284,7 @@ public class FineractProperties {
private boolean enabled;
private FineractExternalEventsProducerProperties producer;
+ private int partitionSize;
}
@Getter
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index ac9d01a25..7277cfae6 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -102,14 +102,17 @@ public class SendAsynchronousEventsTasklet implements
Tasklet {
OffsetDateTime sentAt = DateUtils.getAuditOffsetDateTime();
// Partitioning dataset to avoid exception: PreparedStatement can have
at most 65,535 parameters
- List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
- partitions.forEach(partitionedEventIds -> {
- measure(() -> {
- repository.markEventsSent(partitionedEventIds, sentAt);
- }, timeTaken -> {
- log.debug("Took {}ms to update {} events",
timeTaken.toMillis(), partitionedEventIds.size());
- });
- });
+ final int partitionSize =
fineractProperties.getEvents().getExternal().getPartitionSize();
+ List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
+ partitions.stream() //
+ .parallel() //
+ .forEach(partitionedEventIds -> {
+ measure(() -> {
+ repository.markEventsSent(partitionedEventIds, sentAt);
+ }, timeTaken -> {
+ log.debug("Took {}ms to update {} events",
timeTaken.toMillis(), partitionedEventIds.size());
+ });
+ });
}
private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView>
queuedEvents) {
diff --git a/fineract-provider/src/main/resources/application.properties
b/fineract-provider/src/main/resources/application.properties
index aa3e5f723..77624ba83 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -98,6 +98,7 @@
fineract.remote-job-message-handler.kafka.admin.extra-properties-key-value-separ
fineract.remote-job-message-handler.kafka.admin.extra-properties=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_KAFKA_ADMIN_EXTRA_PROPERTIES:}
fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.async-send-enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:}
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
new file mode 100644
index 000000000..11ba7c5ed
--- /dev/null
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.condition;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+import org.springframework.mock.env.MockEnvironment;
+
+@ExtendWith(MockitoExtension.class)
+class FineractExternalEventConfigConditionTest {
+
+ @Mock
+ private ConditionContext conditionContext;
+
+ @Mock
+ private AnnotatedTypeMetadata metadata;
+
+ private MockEnvironment environment;
+
+ @InjectMocks
+ private FineractExternalEventConfigCondition underTest = new
FineractExternalEventConfigCondition();
+
+ @BeforeEach
+ public void setUp() {
+ environment = new MockEnvironment();
+ given(conditionContext.getEnvironment()).willReturn(environment);
+ }
+
+ @Test
+ public void
testMatchesShouldReturnFalseWhenPartitionSizeIsNotBiggerThanLimit() {
+ // given
+ environment.withProperty("fineract.events.external.partition-size",
"25000");
+ // when
+ boolean result = underTest.matches(conditionContext, metadata);
+ // then
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ public void
testMatchesShouldReturnTrueWhenPartitionSizeIsBiggerThanLimit() {
+ // given
+ environment.withProperty("fineract.events.external.partition-size",
"25001");
+ // when
+ boolean result = underTest.matches(conditionContext, metadata);
+ // then
+ assertThat(result).isTrue();
+ }
+
+ @Test
+ public void testMatchesShouldReturnTrueWhenPartitionSizeIsNotPositive() {
+ // given
+ environment.withProperty("fineract.events.external.partition-size",
"0");
+ // when
+ boolean result = underTest.matches(conditionContext, metadata);
+ // then
+ assertThat(result).isTrue();
+ }
+}
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index df7e3215b..0caaf3c4e 100644
---
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -110,6 +110,7 @@ class SendAsynchronousEventsTaskletTest {
FineractProperties.FineractExternalEventsProducerJmsProperties
externalEventsProducerJMSProperties = new
FineractProperties.FineractExternalEventsProducerJmsProperties();
externalEventsProducerJMSProperties.setEnabled(true);
externalProperties.setEnabled(true);
+ externalProperties.setPartitionSize(5000);
externalEventsProducerProperties.setJms(externalEventsProducerJMSProperties);
externalProperties.setProducer(externalEventsProducerProperties);
eventsProperties.setExternal(externalProperties);
diff --git a/fineract-provider/src/test/resources/application-test.properties
b/fineract-provider/src/test/resources/application-test.properties
index f85488881..36c7231ae 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -51,6 +51,7 @@
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_
fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_ENABLED:false}
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue}
fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}