This is an automated email from the ASF dual-hosted git repository.

taskain pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 174b239c1 FINERACT-2066: Send async event job tuning
174b239c1 is described below

commit 174b239c1078ef54686532aeecf8892146a9c848
Author: taskain7 <[email protected]>
AuthorDate: Wed Mar 13 14:25:54 2024 +0100

    FINERACT-2066: Send async event job tuning
---
 ...a => FineractExternalEventConfigCondition.java} | 30 ++++----
 .../condition/FineractValidationCondition.java     |  3 +
 .../core/config/FineractProperties.java            |  1 +
 .../jobs/SendAsynchronousEventsTasklet.java        | 19 ++---
 .../src/main/resources/application.properties      |  1 +
 .../FineractExternalEventConfigConditionTest.java  | 83 ++++++++++++++++++++++
 .../jobs/SendAsynchronousEventsTaskletTest.java    |  1 +
 .../src/test/resources/application-test.properties |  1 +
 8 files changed, 117 insertions(+), 22 deletions(-)

diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
similarity index 52%
copy from 
fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
copy to 
fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
index c4503cdb2..715484bfa 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigCondition.java
@@ -18,21 +18,23 @@
  */
 package org.apache.fineract.infrastructure.core.condition;
 
-import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
-import org.springframework.context.annotation.Conditional;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
 
-public class FineractValidationCondition extends AnyNestedCondition {
+@Slf4j
+public class FineractExternalEventConfigCondition extends PropertiesCondition {
 
-    public FineractValidationCondition() {
-        super(ConfigurationPhase.PARSE_CONFIGURATION);
+    @Override
+    protected boolean matches(FineractProperties properties) {
+        int partitionSize = 
properties.getEvents().getExternal().getPartitionSize();
+        boolean conditionFails = false;
+        if (partitionSize > 25000) {
+            conditionFails = true;
+            log.error("The partition size for external event partitions cannot 
be bigger than 25000.");
+        } else if (partitionSize < 1) {
+            conditionFails = true;
+            log.error("The partition size for external event partitions must 
be positive.");
+        }
+        return conditionFails;
     }
-
-    @Conditional(FineractModeValidationCondition.class)
-    static class FineractModeValidation {}
-
-    @Conditional(FineractPartitionJobConfigValidationCondition.class)
-    static class FineractPartitionedJobValidation {}
-
-    @Conditional(FineractRemoteJobMessageHandlerCondition.class)
-    static class FineractRemoteJobMessageHandlerValidation {}
 }
diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
index c4503cdb2..afebbf407 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractValidationCondition.java
@@ -35,4 +35,7 @@ public class FineractValidationCondition extends 
AnyNestedCondition {
 
     @Conditional(FineractRemoteJobMessageHandlerCondition.class)
     static class FineractRemoteJobMessageHandlerValidation {}
+
+    @Conditional(FineractExternalEventConfigCondition.class)
+    static class FineractExternalEventConfigValidation {}
 }
diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index 0e0b57de8..7a5d647c8 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -284,6 +284,7 @@ public class FineractProperties {
 
         private boolean enabled;
         private FineractExternalEventsProducerProperties producer;
+        private int partitionSize;
     }
 
     @Getter
diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index ac9d01a25..7277cfae6 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -102,14 +102,17 @@ public class SendAsynchronousEventsTasklet implements 
Tasklet {
         OffsetDateTime sentAt = DateUtils.getAuditOffsetDateTime();
 
         // Partitioning dataset to avoid exception: PreparedStatement can have 
at most 65,535 parameters
-        List<List<Long>> partitions = Lists.partition(eventIds, 5_000);
-        partitions.forEach(partitionedEventIds -> {
-            measure(() -> {
-                repository.markEventsSent(partitionedEventIds, sentAt);
-            }, timeTaken -> {
-                log.debug("Took {}ms to update {} events", 
timeTaken.toMillis(), partitionedEventIds.size());
-            });
-        });
+        final int partitionSize = 
fineractProperties.getEvents().getExternal().getPartitionSize();
+        List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
+        partitions.stream() //
+                .parallel() //
+                .forEach(partitionedEventIds -> {
+                    measure(() -> {
+                        repository.markEventsSent(partitionedEventIds, sentAt);
+                    }, timeTaken -> {
+                        log.debug("Took {}ms to update {} events", 
timeTaken.toMillis(), partitionedEventIds.size());
+                    });
+                });
     }
 
     private Map<Long, List<byte[]>> generatePartitions(List<ExternalEventView> 
queuedEvents) {
diff --git a/fineract-provider/src/main/resources/application.properties 
b/fineract-provider/src/main/resources/application.properties
index aa3e5f723..77624ba83 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -98,6 +98,7 @@ 
fineract.remote-job-message-handler.kafka.admin.extra-properties-key-value-separ
 
fineract.remote-job-message-handler.kafka.admin.extra-properties=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_KAFKA_ADMIN_EXTRA_PROPERTIES:}
 
 fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
 
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
 
fineract.events.external.producer.jms.async-send-enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ASYNC_SEND_ENABLED:false}
 
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:}
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
new file mode 100644
index 000000000..11ba7c5ed
--- /dev/null
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/condition/FineractExternalEventConfigConditionTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.core.condition;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+import org.springframework.mock.env.MockEnvironment;
+
+@ExtendWith(MockitoExtension.class)
+class FineractExternalEventConfigConditionTest {
+
+    @Mock
+    private ConditionContext conditionContext;
+
+    @Mock
+    private AnnotatedTypeMetadata metadata;
+
+    private MockEnvironment environment;
+
+    @InjectMocks
+    private FineractExternalEventConfigCondition underTest = new 
FineractExternalEventConfigCondition();
+
+    @BeforeEach
+    public void setUp() {
+        environment = new MockEnvironment();
+        given(conditionContext.getEnvironment()).willReturn(environment);
+    }
+
+    @Test
+    public void 
testMatchesShouldReturnFalseWhenPartitionSizeIsNotBiggerThanLimit() {
+        // given
+        environment.withProperty("fineract.events.external.partition-size", 
"25000");
+        // when
+        boolean result = underTest.matches(conditionContext, metadata);
+        // then
+        assertThat(result).isFalse();
+    }
+
+    @Test
+    public void 
testMatchesShouldReturnTrueWhenPartitionSizeIsBiggerThanLimit() {
+        // given
+        environment.withProperty("fineract.events.external.partition-size", 
"25001");
+        // when
+        boolean result = underTest.matches(conditionContext, metadata);
+        // then
+        assertThat(result).isTrue();
+    }
+
+    @Test
+    public void testMatchesShouldReturnTrueWhenPartitionSizeIsNotPositive() {
+        // given
+        environment.withProperty("fineract.events.external.partition-size", 
"0");
+        // when
+        boolean result = underTest.matches(conditionContext, metadata);
+        // then
+        assertThat(result).isTrue();
+    }
+}
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
index df7e3215b..0caaf3c4e 100644
--- 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -110,6 +110,7 @@ class SendAsynchronousEventsTaskletTest {
         FineractProperties.FineractExternalEventsProducerJmsProperties 
externalEventsProducerJMSProperties = new 
FineractProperties.FineractExternalEventsProducerJmsProperties();
         externalEventsProducerJMSProperties.setEnabled(true);
         externalProperties.setEnabled(true);
+        externalProperties.setPartitionSize(5000);
         
externalEventsProducerProperties.setJms(externalEventsProducerJMSProperties);
         externalProperties.setProducer(externalEventsProducerProperties);
         eventsProperties.setExternal(externalProperties);
diff --git a/fineract-provider/src/test/resources/application-test.properties 
b/fineract-provider/src/test/resources/application-test.properties
index f85488881..36c7231ae 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -51,6 +51,7 @@ 
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_
 
fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_ENABLED:false}
 
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue}
 fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.partition-size=${FINERACT_EXTERNAL_EVENTS_PARTITION_SIZE:5000}
 
fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
 
fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
 
fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}

Reply via email to