This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new e9ae88d05 FINERACT-1694 Send Asynchronous Events
e9ae88d05 is described below

commit e9ae88d05718131600b7a2f57182abb9753a2ede
Author: Ruchi Dhamankar <[email protected]>
AuthorDate: Thu Sep 15 22:53:13 2022 +0530

    FINERACT-1694 Send Asynchronous Events
---
 .../core/config/FineractProperties.java            |  17 +++
 .../AcknowledgementTimeoutException.java}          |  10 +-
 .../jobs/SendAsynchronousEventsConfig.java         |  52 ++++++++
 .../jobs/SendAsynchronousEventsTasklet.java        |  82 ++++++++++++
 .../ExternalEventProducer.java}                    |  11 +-
 .../ExternalEventProducerImpl.java}                |  16 ++-
 .../repository/ExternalEventRepository.java        |   8 +-
 .../external/service/message/MessageFactory.java   |  31 ++++-
 .../infrastructure/jobs/service/JobName.java       |   3 +-
 .../src/main/resources/application.properties      |   4 +
 .../db/changelog/tenant/changelog-tenant.xml       |   1 +
 .../0049_add_send_asynchronous_events_job.xml      |  45 +++++++
 .../jobs/SendAsynchronousEventsTaskletTest.java    | 144 +++++++++++++++++++++
 .../src/test/resources/application-test.properties |   3 +
 14 files changed, 412 insertions(+), 15 deletions(-)

diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index d2aab3cf5..a0639e6aa 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -134,5 +134,22 @@ public class FineractProperties {
     public static class FineractExternalEventsProperties {
 
         private boolean enabled;
+        private FineractExternalEventsProducerProperties producer;
+    }
+
+    @Getter
+    @Setter
+    public static class FineractExternalEventsProducerProperties {
+
+        private int readBatchSize;
+        private FineractExternalEventsProducerJmsProperties jms;
+    }
+
+    @Getter
+    @Setter
+    public static class FineractExternalEventsProducerJmsProperties {
+
+        private boolean enabled;
+        private String eventQueueName;
     }
 }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
similarity index 72%
copy from 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
index 0745503a1..f40864f2a 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/exception/AcknowledgementTimeoutException.java
@@ -16,9 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.exception;
 
-import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+public class AcknowledgementTimeoutException extends RuntimeException {
 
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent, 
Long> {}
+    public AcknowledgementTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
new file mode 100644
index 000000000..30810773a
--- /dev/null
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import org.apache.fineract.infrastructure.jobs.service.JobName;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.Step;
+import 
org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
+import 
org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
+import org.springframework.batch.core.launch.support.RunIdIncrementer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class SendAsynchronousEventsConfig {
+
+    @Autowired
+    private JobBuilderFactory jobs;
+    @Autowired
+    private StepBuilderFactory steps;
+    @Autowired
+    private SendAsynchronousEventsTasklet tasklet;
+
+    @Bean
+    protected Step sendAsynchronousEventsStep() {
+        return 
steps.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).tasklet(tasklet).build();
+    }
+
+    @Bean
+    public Job sendAsynchronousEventsJob() {
+        return 
jobs.get(JobName.SEND_ASYNCHRONOUS_EVENTS.name()).start(sendAsynchronousEventsStep()).incrementer(new
 RunIdIncrementer())
+                .build();
+    }
+
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
new file mode 100644
index 000000000..27ad7b9ac
--- /dev/null
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.service.DateUtils;
+import 
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import 
org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import 
org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Pageable;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@RequiredArgsConstructor
+@Component
+public class SendAsynchronousEventsTasklet implements Tasklet {
+
+    private final FineractProperties fineractProperties;
+    private final ExternalEventRepository repository;
+    private final ExternalEventProducer eventProducer;
+    private final MessageFactory messageFactory;
+
+    @Override
+    public RepeatStatus execute(StepContribution contribution, ChunkContext 
chunkContext) {
+        try {
+            List<ExternalEvent> events = getQueuedEventsBatch();
+            processEvents(events);
+        } catch (Exception e) {
+            log.error("Error occurred while processing events: ", e);
+        }
+        return RepeatStatus.FINISHED;
+    }
+
+    private List<ExternalEvent> getQueuedEventsBatch() {
+        int readBatchSize = getBatchSize();
+        Pageable batchSize = PageRequest.ofSize(readBatchSize);
+        List<ExternalEvent> events = 
repository.findByStatusOrderById(ExternalEventStatus.TO_BE_SENT, batchSize);
+        return events;
+    }
+
+    private void processEvents(List<ExternalEvent> queuedEvents) {
+        for (ExternalEvent event : queuedEvents) {
+            MessageV1 message = messageFactory.createMessage(event);
+            eventProducer.sendEvent(message);
+            event.setStatus(ExternalEventStatus.SENT);
+            event.setSentAt(DateUtils.getOffsetDateTimeOfTenant());
+            repository.save(event);
+        }
+    }
+
+    private int getBatchSize() {
+        return 
fineractProperties.getEvents().getExternal().getProducer().getReadBatchSize();
+    }
+
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
similarity index 70%
copy from 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
index 0745503a1..355f57bee 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducer.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.producer;
 
-import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+import org.apache.fineract.avro.MessageV1;
+import 
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
 
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent, 
Long> {}
+public interface ExternalEventProducer {
+
+    void sendEvent(MessageV1 message) throws AcknowledgementTimeoutException;
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
similarity index 63%
copy from 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
copy to 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
index 0745503a1..067307b6e 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/producer/ExternalEventProducerImpl.java
@@ -16,9 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.event.external.repository;
+package org.apache.fineract.infrastructure.event.external.producer;
 
-import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
-import org.springframework.data.jpa.repository.JpaRepository;
+import org.apache.fineract.avro.MessageV1;
+import 
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import org.springframework.stereotype.Service;
 
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent, 
Long> {}
+@Service
+public class ExternalEventProducerImpl implements ExternalEventProducer {
+
+    @Override
+    public void sendEvent(MessageV1 message) throws 
AcknowledgementTimeoutException {
+        return;
+    }
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 0745503a1..8282bc854 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -18,7 +18,13 @@
  */
 package org.apache.fineract.infrastructure.event.external.repository;
 
+import java.util.List;
 import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.JpaRepository;
 
-public interface ExternalEventRepository extends JpaRepository<ExternalEvent, 
Long> {}
+public interface ExternalEventRepository extends JpaRepository<ExternalEvent, 
Long> {
+
+    List<ExternalEvent> findByStatusOrderById(ExternalEventStatus status, 
Pageable batchSize);
+}
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
index 0701dcb24..8e727515d 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/event/external/service/message/MessageFactory.java
@@ -21,10 +21,14 @@ package 
org.apache.fineract.infrastructure.event.external.service.message;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.fineract.avro.BulkMessageV1;
 import org.apache.fineract.avro.MessageV1;
 import org.apache.fineract.infrastructure.core.service.DateUtils;
 import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.BulkMessageData;
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageCategory;
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageData;
@@ -33,10 +37,17 @@ import 
org.apache.fineract.infrastructure.event.external.service.message.domain.
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageIdempotencyKey;
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageSource;
 import 
org.apache.fineract.infrastructure.event.external.service.message.domain.MessageType;
+import 
org.apache.fineract.infrastructure.event.external.service.support.ByteBufferConverter;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.stereotype.Component;
 
+@Slf4j
 @Component
-public class MessageFactory {
+@RequiredArgsConstructor
+public class MessageFactory implements InitializingBean {
+
+    private final ByteBufferConverter byteBufferConverter;
+    private static final String SOURCE_UUID = UUID.randomUUID().toString();
 
     public MessageV1 createMessage(MessageId id, MessageSource source, 
MessageType type, MessageCategory category,
             MessageIdempotencyKey idempotencyKey, MessageDataSchema 
dataSchema, MessageData data) {
@@ -65,6 +76,19 @@ public class MessageFactory {
         return result;
     }
 
+    public MessageV1 createMessage(ExternalEvent event) {
+        MessageId id = new MessageId(event.getId().intValue());
+        MessageSource source = new MessageSource(SOURCE_UUID);
+        MessageType type = new MessageType(event.getType());
+        String messageCategory = "nocategory";
+        MessageCategory category = new MessageCategory(messageCategory);
+        MessageIdempotencyKey idempotencyKey = new 
MessageIdempotencyKey(event.getIdempotencyKey());
+        MessageDataSchema dataSchema = new 
MessageDataSchema(event.getSchema());
+        MessageData data = new 
MessageData(byteBufferConverter.convert(event.getData()));
+        MessageV1 message = createMessage(id, source, type, category, 
idempotencyKey, dataSchema, data);
+        return message;
+    }
+
     private String getTenantId() {
         return ThreadLocalContextUtil.getTenant().getName();
     }
@@ -73,4 +97,9 @@ public class MessageFactory {
         OffsetDateTime createdAt = DateUtils.getOffsetDateTimeOfTenant();
         return 
createdAt.withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
     }
+
+    @Override
+    public void afterPropertiesSet() throws Exception {
+        log.info("Message source set to {}", SOURCE_UUID);
+    }
 }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
index 5c02a6a21..152fe5cad 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java
@@ -56,7 +56,8 @@ public enum JobName {
     INCREASE_BUSINESS_DATE_BY_1_DAY("Increase Business Date by 1 day"), //
     INCREASE_COB_DATE_BY_1_DAY("Increase COB Date by 1 day"), //
     LOAN_COB("Loan COB"), //
-    LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification");
+    LOAN_DELINQUENCY_CLASSIFICATION("Loan Delinquency Classification"), //
+    SEND_ASYNCHRONOUS_EVENTS("Send Asynchronous Events");
 
     private final String name;
 
diff --git a/fineract-provider/src/main/resources/application.properties 
b/fineract-provider/src/main/resources/application.properties
index 9e2f46bef..44a6c83f8 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -54,6 +54,10 @@ 
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB
 
fineract.remote-job-message-handler.jms.broker-url=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_BROKER_URL:tcp://127.0.0.1:61616}
 
 fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
+fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
+fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
+
 
 # Logging pattern for the console
 logging.pattern.console=${CONSOLE_LOG_PATTERN:%clr(%d{yyyy-MM-dd 
HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} 
%clr(%replace([%X{correlationId}]){'\\[\\]', ''}) %clr(---){faint} 
%clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} 
%m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}}
diff --git 
a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml 
b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
index fe802d965..179aaf743 100644
--- 
a/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
+++ 
b/fineract-provider/src/main/resources/db/changelog/tenant/changelog-tenant.xml
@@ -68,4 +68,5 @@
     <include file="parts/0046_external_event_table_schema_info.xml" 
relativeToChangelogFile="true"/>
     <include file="parts/0047_add_loan_delinquency_tags_business_step.xml" 
relativeToChangelogFile="true"/>
     <include file="parts/0048_rework_loan_account_locks.xml" 
relativeToChangelogFile="true"/>
+    <include file="parts/0049_add_send_asynchronous_events_job.xml" 
relativeToChangelogFile="true"/>
 </databaseChangeLog>
diff --git 
a/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
 
b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
new file mode 100644
index 000000000..c0d8dce2b
--- /dev/null
+++ 
b/fineract-provider/src/main/resources/db/changelog/tenant/parts/0049_add_send_asynchronous_events_job.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog";
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                   
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog 
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-4.3.xsd";>
+    <changeSet author="fineract" id="send_async_events_job">
+        <insert tableName="job">
+            <column name="name" value="Send Asynchronous Events"/>
+            <column name="display_name" value="Send Asynchronous Events"/>
+            <column name="cron_expression" value="0 0/1 * * *  ?"/>
+            <column name="create_time" valueDate="${current_datetime}"/>
+            <column name="task_priority" valueNumeric="5"/>
+            <column name="group_name"/>
+            <column name="previous_run_start_time"/>
+            <column name="job_key" value="Send Asynchronous Events _ DEFAULT"/>
+            <column name="initializing_errorlog"/>
+            <column name="is_active" valueBoolean="true"/>
+            <column name="currently_running" valueBoolean="false"/>
+            <column name="updates_allowed" valueBoolean="true"/>
+            <column name="scheduler_group" valueNumeric="0"/>
+            <column name="is_misfired" valueBoolean="false"/>
+            <column name="node_id" valueNumeric="1"/>
+            <column name="is_mismatched_job" valueBoolean="true"/>
+        </insert>
+    </changeSet>
+</databaseChangeLog>
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
new file mode 100644
index 000000000..8550fbb5b
--- /dev/null
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTaskletTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.infrastructure.event.external.jobs;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.fineract.avro.MessageV1;
+import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
+import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import 
org.apache.fineract.infrastructure.event.external.exception.AcknowledgementTimeoutException;
+import 
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
+import 
org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent;
+import 
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
+import 
org.apache.fineract.infrastructure.event.external.service.message.MessageFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.repeat.RepeatStatus;
+
+@ExtendWith(MockitoExtension.class)
+class SendAsynchronousEventsTaskletTest {
+
+    @Mock
+    private FineractProperties fineractProperties;
+    @Mock
+    private ExternalEventRepository repository;
+    @Mock
+    private ExternalEventProducer eventProducer;
+    @Mock
+    private MessageFactory messageFactory;
+    @Mock
+    private StepContribution stepContribution;
+    @Mock
+    private ChunkContext chunkContext;
+    private SendAsynchronousEventsTasklet underTest;
+    private RepeatStatus resultStatus;
+
+    @BeforeEach
+    public void setUp() {
+        ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, 
"default", "Default", "Asia/Kolkata", null));
+        ThreadLocalContextUtil
+                .setBusinessDates(new 
HashMap<>(Map.of(BusinessDateType.BUSINESS_DATE, 
LocalDate.now(ZoneId.systemDefault()))));
+        configureExternalEventsProducerReadBatchSizeProperty(1000);
+        underTest = new SendAsynchronousEventsTasklet(fineractProperties, 
repository, eventProducer, messageFactory);
+    }
+
+    private void configureExternalEventsProducerReadBatchSizeProperty(int 
readBatchSize) {
+        FineractProperties.FineractEventsProperties eventsProperties = new 
FineractProperties.FineractEventsProperties();
+        FineractProperties.FineractExternalEventsProperties externalProperties 
= new FineractProperties.FineractExternalEventsProperties();
+        FineractProperties.FineractExternalEventsProducerProperties 
externalEventsProducerProperties = new 
FineractProperties.FineractExternalEventsProducerProperties();
+        externalProperties.setEnabled(true);
+        externalEventsProducerProperties.setReadBatchSize(readBatchSize);
+        externalProperties.setProducer(externalEventsProducerProperties);
+        eventsProperties.setExternal(externalProperties);
+        when(fineractProperties.getEvents()).thenReturn(eventsProperties);
+    }
+
+    @Test
+    public void givenBatchSize2WhenTaskExecutionThenSend2Events() throws 
Exception {
+        // given
+        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", 
"aSchema", new byte[0], "aIdemtpotencyKey"),
+                new ExternalEvent("aType", "aSchema", new byte[0], 
"aIdemtpotencyKey"));
+        when(repository.findByStatusOrderById(Mockito.any(), 
Mockito.any())).thenReturn(events);
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(new 
MessageV1());
+        
doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+        // when
+        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        // then
+        verify(eventProducer, 
times(2)).sendEvent(Mockito.any(MessageV1.class));
+        verify(repository, times(2)).save(Mockito.any(ExternalEvent.class));
+        assertEquals(RepeatStatus.FINISHED, resultStatus);
+    }
+
+    @Test
+    public void givenBatchSize2WhenEventSendFailsThenExecutionStops() throws 
Exception {
+        // given
+        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", 
"aSchema", new byte[0], "aIdemtpotencyKey"),
+                new ExternalEvent("aType", "aSchema", new byte[0], 
"aIdemtpotencyKey"));
+        when(repository.findByStatusOrderById(Mockito.any(), 
Mockito.any())).thenReturn(events);
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(new 
MessageV1());
+        doThrow(new AcknowledgementTimeoutException("Event Send Exception", 
new RuntimeException())).when(eventProducer)
+                .sendEvent(Mockito.any(MessageV1.class));
+        // when
+        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        // then
+        verify(repository, times(0)).save(Mockito.any(ExternalEvent.class));
+        assertEquals(RepeatStatus.FINISHED, resultStatus);
+    }
+
+    @Test
+    public void givenOneEventWhenEventSentThenEventStatusUpdates() throws 
Exception {
+        // given
+        ArgumentCaptor<ExternalEvent> externalEventArgumentCaptor = 
ArgumentCaptor.forClass(ExternalEvent.class);
+        List<ExternalEvent> events = Arrays.asList(new ExternalEvent("aType", 
"aSchema", new byte[0], "aIdemtpotencyKey"));
+        when(repository.findByStatusOrderById(Mockito.any(), 
Mockito.any())).thenReturn(events);
+        when(messageFactory.createMessage(Mockito.any())).thenReturn(new 
MessageV1());
+        
doNothing().when(eventProducer).sendEvent(Mockito.any(MessageV1.class));
+        // when
+        resultStatus = this.underTest.execute(stepContribution, chunkContext);
+        // then
+        verify(repository).save(externalEventArgumentCaptor.capture());
+        ExternalEvent externalEvent = externalEventArgumentCaptor.getValue();
+        
assertThat(externalEvent.getStatus()).isEqualTo(ExternalEventStatus.SENT);
+        assertEquals(RepeatStatus.FINISHED, resultStatus);
+    }
+
+}
diff --git a/fineract-provider/src/test/resources/application-test.properties 
b/fineract-provider/src/test/resources/application-test.properties
index 425a5d45b..3de58a747 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -46,6 +46,9 @@ 
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_
 
fineract.remote-job-message-handler.jms.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_ENABLED:false}
 
fineract.remote-job-message-handler.jms.request-queue-name=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_JMS_QUEUE_NAME:JMS-request-queue}
 fineract.events.external.enabled=${FINERACT_EXTERNAL_EVENTS_ENABLED:false}
+fineract.events.external.producer.read-batch-size=${FINERACT_EXTERNAL_EVENTS_PRODUCER_READ_BATCH_SIZE:1000}
+fineract.events.external.producer.jms.enabled=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_ENABLED:false}
+fineract.events.external.producer.jms.event-queue-name=${FINERACT_EXTERNAL_EVENTS_PRODUCER_JMS_QUEUE_NAME:JMS-event-queue}
 
 management.health.jms.enabled=false
 

Reply via email to