This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new e9ae88d05 FINERACT-1694 Send Asynchronous Events
e9ae88d05 is described below
commit e9ae88d05718131600b7a2f57182abb9753a2ede
Author: Ruchi Dhamankar <[email protected]>
AuthorDate: Thu Sep 15 22:53:13 2022 +0530
FINERACT-1694 Send Asynchronous Events
---
.../core/config/FineractProperties.java | 17 +++
.../AcknowledgementTimeoutException.java} | 10 +-
.../jobs/SendAsynchronousEventsConfig.java | 52 ++++++++
.../jobs/SendAsynchronousEventsTasklet.java | 82 ++++++++++++
.../ExternalEventProducer.java} | 11 +-
.../ExternalEventProducerImpl.java} | 16 ++-
.../repository/ExternalEventRepository.java | 8 +-
.../external/service/message/MessageFactory.java | 31 ++++-
.../infrastructure/jobs/service/JobName.java | 3 +-
.../src/main/resources/application.properties | 4 +
.../db/changelog/tenant/changelog-tenant.xml | 1 +
.../0049_add_send_asynchronous_events_job.xml | 45 +++++++
.../jobs/SendAsynchronousEventsTaskletTest.java | 144 +++++++++++++++++++++
.../src/test/resources/application-test.properties | 3 +
14 files changed, 412 insertions(+), 15 deletions(-)
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index d2aab3cf5..a0639e6aa 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -134,5 +134,22 @@ public class FineractProperties {
public static class FineractExternalEventsProperties {
private boolean enabled;
+ private FineractExternalEventsProducerProperties producer;
+ }
+
+ @Getter
+ @Setter
+ public static class FineractExternalEventsProducerProperties {
+
+ private int readBatchSize;
+ private FineractExternalEventsProducerJmsProperties jms;
+ }
+
+ @Getter
+ @Setter
+ public static class FineractExternalEventsProducerJmsProperties {
+
+ private boolean enabled;
+ private String eventQueueName;
}
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
similarity index 72%
copy from
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
index 0745503a1..f40864f2a 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.exception;
-import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+public class AcknowledgementTimeoutException extends RuntimeException {
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent,
Long> {}
+ public AcknowledgementTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
new file mode 100644
index 000000000..30810773a
--- /dev/null
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import
org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import
org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.launch.support.RunIdIncrementer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SendAsynchronousEventsConfig {
+
+ @Autowired
+ private JobBuilderFactory jobs;
+ @Autowired
+ private StepBuilderFactory steps;
+ @Autowired
+ private SendAsynchronousEventsTasklet tasklet;
+
+ @Bean
+ protected Step sendAsynchronousEventsStep() {
+ return
steps.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).tasklet(tasklet).build();
+ }
+
+ @Bean
+ public Job sendAsynchronousEventsJob() {
+ return
jobs.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).start(sendAsynchronousEventsStep()).incrementer(new
RunIdIncrementer())
+ .build();
+ }
+
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
new file mode 100644
index 000000000..27ad7b9ac
--- /dev/null
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import
org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import
org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+ private final FineractProperties fineractProperties;
+ private final ExternalEventRepository repository;
+ private final ExternalEventProducer eventProducer;
+ private final MessageFactory messageFactory;
+
+ @Override
+ public RepeatStatus execute(StepContribution contribution, ChunkContext
chunkContext) {
+ try {
+ List<ExternalEvent> events = getQueuedEventsBatch();
+ processEvents(events);
+ } catch (Exception e) {
+ log.error("Error occurred while processing events: ", e);
+ }
+ return RepeatStatus.FINISHED;
+ }
+
+ private List<ExternalEvent> getQueuedEventsBatch() {
+ int readBatchSize = getBatchSize();
+ Pageable batchSize = PageRequest.ofSize(readBatchSize);
+ List<ExternalEvent> events =
repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+ return events;
+ }
+
+ private void processEvents(List<ExternalEvent> queuedEvents) {
+ for (ExternalEvent event : queuedEvents) {
+ MessageV1 message = messageFactory.createMessage(event);
+ eventProducer.sendEvent(message);
+ event.setStatus(ExternalEventStatus.SENT);
+ event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
+ repository.save(event);
+ }
+ }
+
+ private int getBatchSize() {
+ return
fineractProperties.getEvents().getExternal().getProducer().getReadBatchSize();
+ }
+
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
similarity index 70%
copy from
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index 0745503a1..355f57bee 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -16,9 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.producer;
-import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+import org.apache.fineract.avro.MessageV1;
+import
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent,
Long> {}
+public interface ExternalEventProducer {
+
+ void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException;
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
similarity index 63%
copy from
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
index 0745503a1..067307b6e 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
@@ -16,9 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.producer;
-import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+import org.apache.fineract.avro.MessageV1;
+import
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.stereotype.Service;
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent,
Long> {}
+@Service
+public class ExternalEventProducerImpl implements ExternalEventProducer {
+
+ @Override
+ public void sendEvent(MessageV1 message) throws
AcknowledgementTimeoutException {
+ return;
+ }
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 0745503a1..8282bc854 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -18,7 +18,13 @@
*/
package org.apache.fineract.infrastructure.event.external.repository;
+import java.util.List;
import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent,
Long> {}
+public interface ExternalEventRepository extends JpaRepository<ExternalEvent,
Long> {
+
+ List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status,
Pageable batchSize);
+}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
index 0701dcb24..8e727515d 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -21,10 +21,14 @@ package
org.apache.fineract.infrastructure.event.external.service.message;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.BulkMessageV1;
import org.apache.fineract.avro.MessageV1;
import org.apache.fineract.infrastructure.core.service.DateUtils;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
import
org.apache.fineract.infrastructure.event.external.service.message.domain.BulkMessageData;
import
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
import
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
@@ -33,10 +37,17 @@ import
org.apache.fineract.infrastructure.event.external.service.message.domain.
import
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
import
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
import
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import
org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
+@Slf4j
@Component
-public class MessageFactory {
+@RequiredArgsConstructor
+public class MessageFactory implements InitializingBean {
+
+ private final ByteBufferConverter byteBufferConverter;
+ private static final String SOURCE_UUID = UUID.randomUUID().toString();
public MessageV1 createMessage(MessageId id, MessageSource source,
MessageType type, MessageCategory category,
MessageIdempotencyKey idempotencyKey, MessageDataSchema
dataSchema, MessageData data) {
@@ -65,6 +76,19 @@ public class MessageFactory {
return result;
}
+ public MessageV1 createMessage(ExternalEvent event) {
+ MessageId id = new MessageId(event.getId().intValue());
+ MessageSource source = new MessageSource(SOURCE_UUID);
+ MessageType type = new MessageType(event.getType());
+ String messageCategory = "nocategory";
+ MessageCategory category = new MessageCategory(messageCategory);
+ MessageIdempotencyKey idempotencyKey = new
MessageIdempotencyKey(event.getIdempotencyKey());
+ MessageDataSchema dataSchema = new
MessageDataSchema(event.getSchema());
+ MessageData data = new
MessageData(byteBufferConverter.convert(event.getData()));
+ MessageV1 message = createMessage(id, source, type, category,
idempotencyKey, dataSchema, data);
+ return message;
+ }
+
private String getTenantId() {
return ThreadLocalContextUtil.getTenant().getName();
}
@@ -73,4 +97,9 @@ public class MessageFactory {
OffsetDateTime createdAt = DateUtils.getOffsetDateTimeOfTenant();
return
createdAt.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
}
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ log.info("Message source set to {}", SOURCE_UUID);
+ }
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
index 5c02a6a21..152fe5cad 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
@@ -56,7 +56,8 @@ public enum JobName {
INCREASE_BUSINESS_DATE_BY_1_DAY("Increase Business Date by 1 day"), //
INCREASE_COB_DATE_BY_1_DAY("Increase COB Date by 1 day"), //
LOAN_COB("Loan COB"), //
- LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification");
+ LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification"), //
+ SEND_ASYNCHRONOUS_EVENTS("Send Asynchronous Events");
private final String name;
diff --git a/fineract-provider/src/main/resources/application.properties
b/fineract-provider/src/main/resources/application.properties
index 9e2f46bef..44a6c83f8 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -54,6 +54,10 @@
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB
fineract.remote-job-message-handler.jms.broker-url=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
+fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
+fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+
# Logging pattern for the console
logging.pattern.console=${CONSOLE_LOG_PATTERN:%clr(%d{yyyy-MM-dd
HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta}
%clr(%replace([%X{correlationId}]){'\\[\\]', ''}) %clr(---){faint}
%clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint}
%m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}
diff --git
a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index fe802d965..179aaf743 100644
---
a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++
b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -68,4 +68,5 @@
<include file="parts/0046_external_event_table_schema_info.xml"
relativeToChangelogFile="true"/>
<include file="parts/0047_add_loan_delinquency_tags_business_step.xml"
relativeToChangelogFile="true"/>
<include file="parts/0048_rework_loan_account_locks.xml"
relativeToChangelogFile="true"/>
+ <include file="parts/0049_add_send_asynchronous_events_job.xml"
relativeToChangelogFile="true"/>
</databaseChangeLog>
diff --git
a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
new file mode 100644
index 000000000..c0d8dce2b
--- /dev/null
+++
b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd">
+ <changeSet author="fineract" id="send_async_events_job">
+ <insert tableName="job">
+ <column name="name" value="Send Asynchronous Events"/>
+ <column name="display_name" value="Send Asynchronous Events"/>
+ <column name="cron_expression" value="0 0/1 * * * ?"/>
+ <column name="create_time" valueDate="${current_datetime}"/>
+ <column name="task_priority" valueNumeric="5"/>
+ <column name="group_name"/>
+ <column name="previous_run_start_time"/>
+ <column name="job_key" value="Send Asynchronous Events _ DEFAULT"/>
+ <column name="initializing_errorlog"/>
+ <column name="is_active" valueBoolean="true"/>
+ <column name="currently_running" valueBoolean="false"/>
+ <column name="updates_allowed" valueBoolean="true"/>
+ <column name="scheduler_group" valueNumeric="0"/>
+ <column name="is_misfired" valueBoolean="false"/>
+ <column name="node_id" valueNumeric="1"/>
+ <column name="is_mismatched_job" valueBoolean="true"/>
+ </insert>
+ </changeSet>
+</databaseChangeLog>
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
new file mode 100644
index 000000000..8550fbb5b
--- /dev/null
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import
org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import
org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.repeat.RepeatStatus;
+
+@ExtendWith(MockitoExtension.class)
+class SendAsynchronousEventsTaskletTest {
+
+ @Mock
+ private FineractProperties fineractProperties;
+ @Mock
+ private ExternalEventRepository repository;
+ @Mock
+ private ExternalEventProducer eventProducer;
+ @Mock
+ private MessageFactory messageFactory;
+ @Mock
+ private StepContribution stepContribution;
+ @Mock
+ private ChunkContext chunkContext;
+ private SendAsynchronousEventsTasklet underTest;
+ private RepeatStatus resultStatus;
+
+ @BeforeEach
+ public void setUp() {
+ ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L,
"default", "Default", "Asia/Kolkata", null));
+ ThreadLocalContextUtil
+ .setBusinessDates(new
HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE,
LocalDate.now(ZoneId.systemDefault()))));
+ configureExternalEventsProducerReadBatchSizeProperty(1000);
+ underTest = new SendAsynchronousEventsTasklet(fineractProperties,
repository, eventProducer, messageFactory);
+ }
+
+ private void configureExternalEventsProducerReadBatchSizeProperty(int
readBatchSize) {
+ FineractProperties.FineractEventsProperties eventsProperties = new
FineractProperties.FineractEventsProperties();
+ FineractProperties.FineractExternalEventsProperties externalProperties
= new FineractProperties.FineractExternalEventsProperties();
+ FineractProperties.FineractExternalEventsProducerProperties
externalEventsProducerProperties = new
FineractProperties.FineractExternalEventsProducerProperties();
+ externalProperties.setEnabled(true);
+ externalEventsProducerProperties.setReadBatchSize(readBatchSize);
+ externalProperties.setProducer(externalEventsProducerProperties);
+ eventsProperties.setExternal(externalProperties);
+ when(fineractProperties.getEvents()).thenReturn(eventsProperties);
+ }
+
+ @Test
+ public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws
Exception {
+ // given
+ List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType",
"aSchema", new byte[0], "aIdemtpotencyKey"),
+ new ExternalEvent("aType", "aSchema", new byte[0],
"aIdemtpotencyKey"));
+ when(repository.findByStatusOrderById(Mockito.any(),
Mockito.any())).thenReturn(events);
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(new
MessageV1());
+
doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+ // when
+ resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ // then
+ verify(eventProducer,
times(2)).sendEvent(Mockito.any(MessageV1.class));
+ verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
+ assertEquals(RepeatStatus.FINISHED, resultStatus);
+ }
+
+ @Test
+ public void givenBatchSize2WhenEventSendFailsThenExecutionStops() throws
Exception {
+ // given
+ List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType",
"aSchema", new byte[0], "aIdemtpotencyKey"),
+ new ExternalEvent("aType", "aSchema", new byte[0],
"aIdemtpotencyKey"));
+ when(repository.findByStatusOrderById(Mockito.any(),
Mockito.any())).thenReturn(events);
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(new
MessageV1());
+ doThrow(new AcknowledgementTimeoutException("Event Send Exception",
new RuntimeException())).when(eventProducer)
+ .sendEvent(Mockito.any(MessageV1.class));
+ // when
+ resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ // then
+ verify(repository, times(0)).save(Mockito.any(ExternalEvent.class));
+ assertEquals(RepeatStatus.FINISHED, resultStatus);
+ }
+
+ @Test
+ public void givenOneEventWhenEventSentThenEventStatusUpdates() throws
Exception {
+ // given
+ ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor =
ArgumentCaptor.forClass(ExternalEvent.class);
+ List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType",
"aSchema", new byte[0], "aIdemtpotencyKey"));
+ when(repository.findByStatusOrderById(Mockito.any(),
Mockito.any())).thenReturn(events);
+ when(messageFactory.createMessage(Mockito.any())).thenReturn(new
MessageV1());
+
doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+ // when
+ resultStatus = this.underTest.execute(stepContribution, chunkContext);
+ // then
+ verify(repository).save(externalEventArgumentCaptor.capture());
+ ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
+
assertThat(externalEvent.getStatus()).isEqualTo(ExternalEventStatus.SENT);
+ assertEquals(RepeatStatus.FINISHED, resultStatus);
+ }
+
+}
diff --git a/fineract-provider/src/test/resources/application-test.properties
b/fineract-provider/src/test/resources/application-test.properties
index 425a5d45b..3de58a747 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -46,6 +46,9 @@
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_
fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_ENABLED:false}
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue}
fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
+fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
+fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
management.health.jms.enabled=false