This is an automated email from the ASF dual-hosted git repository.

arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git


The following commit(s) were added to refs/heads/develop by this push:
     new 900d0c282 FINERACT-1678  - ThreadPoolTaskExecutor task executor is 
added to COB workers which allows concurrent processing of chunks.
900d0c282 is described below

commit 900d0c2827f9bb1a96d9d1b29853e1c6b1c3a8f4
Author: Peter Bagrij <[email protected]>
AuthorDate: Tue Jul 25 16:17:47 2023 +0200

    FINERACT-1678
     - ThreadPoolTaskExecutor task executor is added to COB workers which 
allows concurrent processing of chunks.
---
 ...eractPartitionJobConfigValidationCondition.java |   9 +-
 .../core/config/FineractProperties.java            |   5 +-
 .../fineract/cob/loan/AbstractLoanItemReader.java  |  16 +--
 .../loan/ContextAwareTaskDecorator.java}           |  19 ++-
 .../fineract/cob/loan/InlineCOBLoanItemReader.java |   4 +-
 .../cob/loan/LoanCOBWorkerConfiguration.java       |  47 ++++++-
 .../apache/fineract/cob/loan/LoanItemReader.java   |   3 +-
 .../springbatch/PropertyService.java               |   6 +
 .../springbatch/PropertyServiceImpl.java           |  38 +++---
 .../src/main/resources/application.properties      |   4 +-
 .../fineract/cob/loan/LoanItemReaderTest.java      | 146 +++++++++++++++++++++
 ...tPartitionJobConfigValidationConditionTest.java |  31 +----
 .../src/test/resources/application-test.properties |   4 +-
 13 files changed, 261 insertions(+), 71 deletions(-)

diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
index 6a71e76e7..4f54c0266 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
@@ -36,7 +36,7 @@ public class FineractPartitionJobConfigValidationCondition 
implements Condition
                 "fineract.partitioned-job", 
FineractProperties.FineractPartitionedJob.class);
         if (partitionedJobProperties != null) {
             List<FineractProperties.PartitionedJobProperty> invalidConfigs = 
partitionedJobProperties.getPartitionedJobProperties().stream()
-                    
.filter(isAnyConfigBelowOne().or(FineractPartitionJobConfigValidationCondition::isPartitionRatioInvalid)).toList();
+                    
.filter(isAnyConfigBelowOne().or(FineractPartitionJobConfigValidationCondition::invalidMaxPoolSize)).toList();
             if (!invalidConfigs.isEmpty()) {
                 for (FineractProperties.PartitionedJobProperty invalidConfig : 
invalidConfigs) {
                     log.error(
@@ -52,10 +52,11 @@ public class FineractPartitionJobConfigValidationCondition 
implements Condition
 
     private static Predicate<FineractProperties.PartitionedJobProperty> 
isAnyConfigBelowOne() {
         return partitionedJobProperty -> 
!(partitionedJobProperty.getPartitionSize() > 0 && 
partitionedJobProperty.getChunkSize() > 0
-                && partitionedJobProperty.getThreadCount() > 0);
+                && partitionedJobProperty.getThreadPoolCorePoolSize() > 0 && 
partitionedJobProperty.getThreadPoolMaxPoolSize() > 0
+                && partitionedJobProperty.getThreadPoolQueueCapacity() > 0);
     }
 
-    private static boolean 
isPartitionRatioInvalid(FineractProperties.PartitionedJobProperty 
partitionedJobProperty) {
-        return partitionedJobProperty.getPartitionSize() < 
partitionedJobProperty.getThreadCount() * partitionedJobProperty.getChunkSize();
+    private static boolean 
invalidMaxPoolSize(FineractProperties.PartitionedJobProperty 
partitionedJobProperty) {
+        return partitionedJobProperty.getThreadPoolMaxPoolSize() < 
partitionedJobProperty.getThreadPoolCorePoolSize();
     }
 }
diff --git 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index bd149d4eb..f37d27326 100644
--- 
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++ 
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -155,8 +155,11 @@ public class FineractProperties {
         private String jobName;
         private Integer chunkSize;
         private Integer partitionSize;
-        private Integer threadCount;
+        private Integer threadPoolCorePoolSize;
+        private Integer threadPoolMaxPoolSize;
+        private Integer threadPoolQueueCapacity;
         private Integer retryLimit;
+
     }
 
     @Getter
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
index 40f39a6d3..8fd736fdd 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
@@ -18,7 +18,7 @@
  */
 package org.apache.fineract.cob.loan;
 
-import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import lombok.AccessLevel;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
@@ -40,21 +40,19 @@ public abstract class AbstractLoanItemReader implements 
ItemReader<Loan> {
     protected final LoanRepository loanRepository;
 
     @Setter(AccessLevel.PROTECTED)
-    private List<Long> remainingData;
-    private Long loanId;
+    private LinkedBlockingQueue<Long> remainingData;
 
     @Override
     public Loan read() throws Exception {
-        try {
-            if (remainingData.size() > 0) {
-                loanId = remainingData.remove(0);
+        final Long loanId = remainingData.poll();
+        if (loanId != null) {
+            try {
                 return loanRepository.findById(loanId).orElseThrow(() -> new 
LoanNotFoundException(loanId));
+            } catch (Exception e) {
+                throw new LoanReadException(loanId, e);
             }
-        } catch (Exception e) {
-            throw new LoanReadException(loanId, e);
         }
         return null;
-
     }
 
     @AfterStep
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
similarity index 56%
copy from 
fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
copy to 
fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
index 053b2536b..c6bccf0ed 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
@@ -16,13 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.fineract.infrastructure.springbatch;
+package org.apache.fineract.cob.loan;
 
-public interface PropertyService {
+import org.apache.fineract.infrastructure.core.domain.FineractContext;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.core.task.TaskDecorator;
 
-    Integer getPartitionSize(String jobName);
+public class ContextAwareTaskDecorator implements TaskDecorator {
 
-    Integer getChunkSize(String jobName);
+    @Override
+    public Runnable decorate(@NotNull Runnable runnable) {
+        final FineractContext context = ThreadLocalContextUtil.getContext();
+        return () -> {
+            ThreadLocalContextUtil.init(context);
+            runnable.run();
+        };
+    }
 
-    Integer getRetryLimit(String jobName);
 }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
index deaa9aaee..f1030f01e 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
@@ -18,8 +18,8 @@
  */
 package org.apache.fineract.cob.loan;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.fineract.portfolio.loanaccount.domain.LoanRepository;
 import org.jetbrains.annotations.NotNull;
 import org.springframework.batch.core.StepExecution;
@@ -37,6 +37,6 @@ public class InlineCOBLoanItemReader extends 
AbstractLoanItemReader {
     public void beforeStep(@NotNull StepExecution stepExecution) {
         ExecutionContext executionContext = 
stepExecution.getJobExecution().getExecutionContext();
         List<Long> loanIds = (List<Long>) 
executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER);
-        setRemainingData(new ArrayList<>(loanIds));
+        setRemainingData(new LinkedBlockingQueue<>(loanIds));
     }
 }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
index c9e417c53..46c6c9736 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
@@ -34,6 +34,7 @@ import 
org.springframework.batch.core.configuration.annotation.StepScope;
 import org.springframework.batch.core.job.builder.FlowBuilder;
 import org.springframework.batch.core.job.flow.Flow;
 import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.SimpleStepBuilder;
 import org.springframework.batch.core.step.builder.StepBuilder;
 import 
org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +42,10 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.SyncTaskExecutor;
+import org.springframework.core.task.TaskExecutor;
 import org.springframework.integration.channel.QueueChannel;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.support.TransactionTemplate;
 
@@ -86,7 +90,7 @@ public class LoanCOBWorkerConfiguration {
 
     @Bean
     public Flow flow() {
-        return new 
FlowBuilder<Flow>("cobFlow").start(initialisationStep(null)).next(applyLockStep(null)).next(loanBusinessStep(null))
+        return new 
FlowBuilder<Flow>("cobFlow").start(initialisationStep(null)).next(applyLockStep(null)).next(loanBusinessStep(null,
 null))
                 .next(resetContextStep(null)).build();
     }
 
@@ -97,14 +101,43 @@ public class LoanCOBWorkerConfiguration {
                 .build();
     }
 
+    @Bean
+    public TaskExecutor cobTaskExecutor() {
+        if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) 
== 1) {
+            return new SyncTaskExecutor();
+        }
+        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+        taskExecutor.setThreadNamePrefix("COB-Thread-");
+        taskExecutor.setThreadGroupName("COB-Thread");
+        
taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name()));
+        
taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name()));
+        
taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name()));
+        taskExecutor.setAllowCoreThreadTimeOut(true);
+        taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator());
+        return taskExecutor;
+    }
+
     @Bean
     @StepScope
-    public Step 
loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String 
partitionName) {
-        return new StepBuilder("Loan Business - Step:" + partitionName, 
jobRepository)
-                .<Loan, 
Loan>chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()), 
transactionManager).reader(cobWorkerItemReader())
-                
.processor(cobWorkerItemProcessor()).writer(cobWorkerItemWriter()).faultTolerant().retry(Exception.class)
-                
.retryLimit(propertyService.getRetryLimit(LoanCOBConstant.JOB_NAME)).skip(Exception.class)
-                
.skipLimit(propertyService.getChunkSize(JobName.LOAN_COB.name()) + 
1).listener(loanItemListener()).build();
+    public Step 
loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String 
partitionName, TaskExecutor cobTaskExecutor) {
+        SimpleStepBuilder<Loan, Loan> stepBuilder = new StepBuilder("Loan 
Business - Step:" + partitionName, jobRepository)
+                .<Loan, 
Loan>chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()), 
transactionManager) //
+                .reader(cobWorkerItemReader()) //
+                .processor(cobWorkerItemProcessor()) //
+                .writer(cobWorkerItemWriter()) //
+                .faultTolerant() //
+                .retry(Exception.class) //
+                
.retryLimit(propertyService.getRetryLimit(LoanCOBConstant.JOB_NAME)) //
+                .skip(Exception.class) //
+                
.skipLimit(propertyService.getChunkSize(LoanCOBConstant.JOB_NAME) + 1) //
+                .listener(loanItemListener()) //
+                .transactionManager(transactionManager);
+
+        if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME) 
> 1) {
+            stepBuilder.taskExecutor(cobTaskExecutor);
+        }
+
+        return stepBuilder.build();
     }
 
     @Bean
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
index 8e5c0a3ec..c236257f7 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.fineract.cob.common.CustomJobParameterResolver;
 import org.apache.fineract.cob.data.LoanCOBParameter;
 import org.apache.fineract.cob.domain.LoanAccountLock;
@@ -64,7 +65,7 @@ public class LoanItemReader extends AbstractLoanItemReader {
             List<Long> lockedByCOBChunkProcessingAccountIds = 
getLoanIdsLockedWithChunkProcessingLock(loanIds);
             loanIds.retainAll(lockedByCOBChunkProcessingAccountIds);
         }
-        setRemainingData(new ArrayList<>(loanIds));
+        setRemainingData(new LinkedBlockingQueue<>(loanIds));
     }
 
     private List<Long> getLoanIdsLockedWithChunkProcessingLock(List<Long> 
loanIds) {
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
index 053b2536b..6e234a29a 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
@@ -25,4 +25,10 @@ public interface PropertyService {
     Integer getChunkSize(String jobName);
 
     Integer getRetryLimit(String jobName);
+
+    Integer getThreadPoolCorePoolSize(String jobName);
+
+    Integer getThreadPoolMaxPoolSize(String jobName);
+
+    Integer getThreadPoolQueueCapacity(String jobName);
 }
diff --git 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
index d8cc1ba52..1eab3979c 100644
--- 
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
+++ 
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.fineract.infrastructure.springbatch;
 
 import java.util.List;
+import java.util.function.Function;
 import lombok.RequiredArgsConstructor;
 import org.apache.fineract.infrastructure.core.config.FineractProperties;
 import org.springframework.stereotype.Service;
@@ -31,34 +32,41 @@ public class PropertyServiceImpl implements PropertyService 
{
 
     @Override
     public Integer getPartitionSize(String jobName) {
-        List<FineractProperties.PartitionedJobProperty> jobProperties = 
fineractProperties.getPartitionedJob()
-                .getPartitionedJobProperties();
-        return jobProperties.stream() //
-                .filter(jobProperty -> 
jobName.equals(jobProperty.getJobName())) //
-                .findFirst() //
-                
.map(FineractProperties.PartitionedJobProperty::getPartitionSize) //
-                .orElse(1);
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getPartitionSize);
     }
 
     @Override
     public Integer getChunkSize(String jobName) {
-        List<FineractProperties.PartitionedJobProperty> jobProperties = 
fineractProperties.getPartitionedJob()
-                .getPartitionedJobProperties();
-        return jobProperties.stream() //
-                .filter(jobProperty -> 
jobName.equals(jobProperty.getJobName())) //
-                .findFirst() //
-                .map(FineractProperties.PartitionedJobProperty::getChunkSize) 
//
-                .orElse(1);
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getChunkSize);
     }
 
     @Override
     public Integer getRetryLimit(String jobName) {
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getRetryLimit);
+    }
+
+    @Override
+    public Integer getThreadPoolCorePoolSize(String jobName) {
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getThreadPoolCorePoolSize);
+    }
+
+    @Override
+    public Integer getThreadPoolMaxPoolSize(String jobName) {
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getThreadPoolMaxPoolSize);
+    }
+
+    @Override
+    public Integer getThreadPoolQueueCapacity(String jobName) {
+        return getProperty(jobName, 
FineractProperties.PartitionedJobProperty::getThreadPoolQueueCapacity);
+    }
+
+    private Integer getProperty(String jobName, Function<? super 
FineractProperties.PartitionedJobProperty, Integer> function) {
         List<FineractProperties.PartitionedJobProperty> jobProperties = 
fineractProperties.getPartitionedJob()
                 .getPartitionedJobProperties();
         return jobProperties.stream() //
                 .filter(jobProperty -> 
jobName.equals(jobProperty.getJobName())) //
                 .findFirst() //
-                .map(FineractProperties.PartitionedJobProperty::getRetryLimit) 
//
+                .map(function) //
                 .orElse(1);
     }
 }
diff --git a/fineract-provider/src/main/resources/application.properties 
b/fineract-provider/src/main/resources/application.properties
index 333e094d2..77b59ad1a 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -64,7 +64,9 @@ 
fineract.job.stuck-retry-threshold=${FINERACT_JOB_STUCK_RETRY_THRESHOLD:5}
 fineract.partitioned-job.partitioned-job-properties[0].job-name=LOAN_COB
 
fineract.partitioned-job.partitioned-job-properties[0].chunk-size=${LOAN_COB_CHUNK_SIZE:100}
 
fineract.partitioned-job.partitioned-job-properties[0].partition-size=${LOAN_COB_PARTITION_SIZE:100}
-fineract.partitioned-job.partitioned-job-properties[0].thread-count=${LOAN_COB_THREAD_COUNT:1}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-core-pool-size=${LOAN_COB_THREAD_POOL_CORE_POOL_SIZE:5}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-max-pool-size=${LOAN_COB_THREAD_POOL_MAX_POOL_SIZE:5}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-queue-capacity=${LOAN_COB_THREAD_POOL_QUEUE_CAPACITY:20}
 
fineract.partitioned-job.partitioned-job-properties[0].retry-limit=${LOAN_COB_RETRY_LIMIT:5}
 
 
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_SPRING_EVENTS_ENABLED:true}
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
new file mode 100644
index 000000000..9fe36b1dc
--- /dev/null
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.cob.loan;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.fineract.cob.common.CustomJobParameterResolver;
+import org.apache.fineract.cob.data.LoanCOBParameter;
+import org.apache.fineract.cob.domain.LoanAccountLock;
+import org.apache.fineract.cob.domain.LockOwner;
+import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.portfolio.loanaccount.domain.Loan;
+import org.apache.fineract.portfolio.loanaccount.domain.LoanRepository;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.item.ExecutionContext;
+
+@ExtendWith(MockitoExtension.class)
+class LoanItemReaderTest {
+
+    @Mock
+    private LoanRepository loanRepository;
+
+    @Mock
+    private RetrieveLoanIdService retrieveLoanIdService;
+
+    @Mock
+    private CustomJobParameterResolver customJobParameterResolver;
+
+    @Mock
+    private LoanLockingService loanLockingService;
+
+    @Mock
+    private StepExecution stepExecution;
+
+    @Mock
+    private ExecutionContext executionContext;
+
+    @Mock
+    private Loan loan;
+
+    @Test
+    public void testLoanItemReaderSimple() throws Exception {
+        // given
+        ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, 
"test", "test", "UTC", null));
+        LoanItemReader loanItemReader = new LoanItemReader(loanRepository, 
retrieveLoanIdService, customJobParameterResolver,
+                loanLockingService);
+        when(stepExecution.getExecutionContext()).thenReturn(executionContext);
+        LoanCOBParameter loanCOBParameter = new LoanCOBParameter(1L, 5L);
+        
when(executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER)).thenReturn(loanCOBParameter);
+        
when(retrieveLoanIdService.retrieveAllNonClosedLoansByLastClosedBusinessDateAndMinAndMaxLoanId(loanCOBParameter,
 false))
+                .thenReturn(new ArrayList<>(List.of(1L, 2L, 3L, 4L, 5L)));
+        List<LoanAccountLock> accountLocks = List.of(1L, 2L, 3L, 4L, 
5L).stream()
+                .map(l -> new LoanAccountLock(l, 
LockOwner.LOAN_COB_CHUNK_PROCESSING, LocalDate.of(2023, 7, 25))).toList();
+        when(loanLockingService.findAllByLoanIdInAndLockOwner(List.of(1L, 2L, 
3L, 4L, 5L), LockOwner.LOAN_COB_CHUNK_PROCESSING))
+                .thenReturn(accountLocks);
+        when(loanRepository.findById(anyLong())).thenReturn(Optional.of(loan));
+
+        // when + then
+        loanItemReader.beforeStep(stepExecution);
+        for (long i = 1; i <= 5; i++) {
+            Loan myLoan = loanItemReader.read();
+            Assertions.assertEquals(loan, myLoan);
+            verify(loanRepository, times(1)).findById(i);
+        }
+
+        Mockito.verifyNoMoreInteractions(loanRepository);
+    }
+
+    @Test
+    public void testLoanItemReaderMultiThreadRead() throws Exception {
+        // given
+        ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L, 
"test", "test", "UTC", null));
+        LoanItemReader loanItemReader = new LoanItemReader(loanRepository, 
retrieveLoanIdService, customJobParameterResolver,
+                loanLockingService);
+        when(stepExecution.getExecutionContext()).thenReturn(executionContext);
+        LoanCOBParameter loanCOBParameter = new LoanCOBParameter(1L, 100L);
+        
when(executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER)).thenReturn(loanCOBParameter);
+        
when(retrieveLoanIdService.retrieveAllNonClosedLoansByLastClosedBusinessDateAndMinAndMaxLoanId(loanCOBParameter,
 false))
+                .thenReturn(new ArrayList<>(IntStream.rangeClosed(1, 
100).boxed().map(Long::valueOf).toList()));
+        List<LoanAccountLock> accountLocks = IntStream.rangeClosed(1, 
100).boxed().map(Long::valueOf)
+                .map(l -> new LoanAccountLock(l, 
LockOwner.LOAN_COB_CHUNK_PROCESSING, LocalDate.of(2023, 7, 25))).toList();
+        
when(loanLockingService.findAllByLoanIdInAndLockOwner(IntStream.rangeClosed(1, 
100).boxed().map(Long::valueOf).toList(),
+                LockOwner.LOAN_COB_CHUNK_PROCESSING)).thenReturn(accountLocks);
+        when(loanRepository.findById(anyLong())).thenReturn(Optional.of(loan));
+
+        // when + then
+        loanItemReader.beforeStep(stepExecution);
+        ExecutorService executorService = Executors.newFixedThreadPool(10);
+        for (int i = 1; i <= 100; i++) {
+            Future<?> notUsed = executorService.submit(() -> {
+                try {
+                    Loan myLoan = loanItemReader.read();
+                    Assertions.assertEquals(loan, myLoan);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        executorService.shutdown();
+        boolean b = executorService.awaitTermination(5L, TimeUnit.SECONDS);
+        Assertions.assertTrue(b, "Executor did not terminate successfully");
+
+        // verify that this was called 100times, and for each loan it was 
called exactly once
+        for (long i = 1; i <= 100; i++) {
+            verify(loanRepository, times(1)).findById(i);
+        }
+
+        Mockito.verifyNoMoreInteractions(loanRepository);
+    }
+
+}
diff --git 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
index 6342a7a21..a2f0413e9 100644
--- 
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
+++ 
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
@@ -65,16 +65,6 @@ class FineractPartitionJobConfigValidationConditionTest {
         }
     }
 
-    @Test
-    public void 
testApplicationStartup_ShouldApplicationStartupFails_WhenPartitionSizeInvalid() 
{
-        try (MockedStatic<ExplicitConfigurationPropertiesFactory> 
propertyFactory = Mockito
-                .mockStatic(ExplicitConfigurationPropertiesFactory.class)) {
-            propertyFactory.when(() -> 
ExplicitConfigurationPropertiesFactory.getProperty(context, 
"fineract.partitioned-job",
-                    
FineractProperties.FineractPartitionedJob.class)).thenReturn(getInvalidPartitionSize());
-            assertTrue(testObj.matches(context, metadata));
-        }
-    }
-
     @Test
     public void 
testApplicationStartup_ShouldApplicationStart_WhenConfigValid() {
         try (MockedStatic<ExplicitConfigurationPropertiesFactory> 
propertyFactory = Mockito
@@ -92,20 +82,9 @@ class FineractPartitionJobConfigValidationConditionTest {
         partitionedJobProperty.setJobName("LOAN_COB");
         partitionedJobProperty.setPartitionSize(100);
         partitionedJobProperty.setChunkSize(10);
-        partitionedJobProperty.setThreadCount(10);
-        jobProperties.add(partitionedJobProperty);
-        partitionedJob.setPartitionedJobProperties(jobProperties);
-        return partitionedJob;
-    }
-
-    private FineractProperties.FineractPartitionedJob 
getInvalidPartitionSize() {
-        FineractProperties.FineractPartitionedJob partitionedJob = new 
FineractProperties.FineractPartitionedJob();
-        List<FineractProperties.PartitionedJobProperty> jobProperties = new 
ArrayList<>();
-        FineractProperties.PartitionedJobProperty partitionedJobProperty = new 
FineractProperties.PartitionedJobProperty();
-        partitionedJobProperty.setJobName("LOAN_COB");
-        partitionedJobProperty.setPartitionSize(99);
-        partitionedJobProperty.setChunkSize(10);
-        partitionedJobProperty.setThreadCount(10);
+        partitionedJobProperty.setThreadPoolCorePoolSize(10);
+        partitionedJobProperty.setThreadPoolMaxPoolSize(20);
+        partitionedJobProperty.setThreadPoolQueueCapacity(10);
         jobProperties.add(partitionedJobProperty);
         partitionedJob.setPartitionedJobProperties(jobProperties);
         return partitionedJob;
@@ -118,7 +97,9 @@ class FineractPartitionJobConfigValidationConditionTest {
         partitionedJobProperty.setJobName("LOAN_COB");
         partitionedJobProperty.setPartitionSize(0);
         partitionedJobProperty.setChunkSize(1);
-        partitionedJobProperty.setThreadCount(1);
+        partitionedJobProperty.setThreadPoolCorePoolSize(1);
+        partitionedJobProperty.setThreadPoolMaxPoolSize(1);
+        partitionedJobProperty.setThreadPoolQueueCapacity(1);
         jobProperties.add(partitionedJobProperty);
         partitionedJob.setPartitionedJobProperties(jobProperties);
         return partitionedJob;
diff --git a/fineract-provider/src/test/resources/application-test.properties 
b/fineract-provider/src/test/resources/application-test.properties
index 584ff5a36..caa5ff6b4 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -42,7 +42,9 @@ fineract.mode.batch-enabled=true
 fineract.partitioned-job.partitioned-job-properties[0].job-name=LOAN_COB
 fineract.partitioned-job.partitioned-job-properties[0].chunk-size=100
 fineract.partitioned-job.partitioned-job-properties[0].partition-size=100
-fineract.partitioned-job.partitioned-job-properties[0].thread-count=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-core-pool-size=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-max-pool-size=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-queue-capacity=1
 fineract.partitioned-job.partitioned-job-properties[0].retry-limit=5
 
 
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_SPRING_EVENTS_ENABLED:true}

Reply via email to