This is an automated email from the ASF dual-hosted git repository.
arnold pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 900d0c282 FINERACT-1678 - ThreadPoolTaskExecutor task executor is
added to COB workers which allows concurrent processing of chunks.
900d0c282 is described below
commit 900d0c2827f9bb1a96d9d1b29853e1c6b1c3a8f4
Author: Peter Bagrij <[email protected]>
AuthorDate: Tue Jul 25 16:17:47 2023 +0200
FINERACT-1678
- ThreadPoolTaskExecutor task executor is added to COB workers which
allows concurrent processing of chunks.
---
...eractPartitionJobConfigValidationCondition.java | 9 +-
.../core/config/FineractProperties.java | 5 +-
.../fineract/cob/loan/AbstractLoanItemReader.java | 16 +--
.../loan/ContextAwareTaskDecorator.java} | 19 ++-
.../fineract/cob/loan/InlineCOBLoanItemReader.java | 4 +-
.../cob/loan/LoanCOBWorkerConfiguration.java | 47 ++++++-
.../apache/fineract/cob/loan/LoanItemReader.java | 3 +-
.../springbatch/PropertyService.java | 6 +
.../springbatch/PropertyServiceImpl.java | 38 +++---
.../src/main/resources/application.properties | 4 +-
.../fineract/cob/loan/LoanItemReaderTest.java | 146 +++++++++++++++++++++
...tPartitionJobConfigValidationConditionTest.java | 31 +----
.../src/test/resources/application-test.properties | 4 +-
13 files changed, 261 insertions(+), 71 deletions(-)
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
index 6a71e76e7..4f54c0266 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/condition/FineractPartitionJobConfigValidationCondition.java
@@ -36,7 +36,7 @@ public class FineractPartitionJobConfigValidationCondition
implements Condition
"fineract.partitioned-job",
FineractProperties.FineractPartitionedJob.class);
if (partitionedJobProperties != null) {
List<FineractProperties.PartitionedJobProperty> invalidConfigs =
partitionedJobProperties.getPartitionedJobProperties().stream()
-
.filter(isAnyConfigBelowOne().or(FineractPartitionJobConfigValidationCondition::isPartitionRatioInvalid)).toList();
+
.filter(isAnyConfigBelowOne().or(FineractPartitionJobConfigValidationCondition::invalidMaxPoolSize)).toList();
if (!invalidConfigs.isEmpty()) {
for (FineractProperties.PartitionedJobProperty invalidConfig :
invalidConfigs) {
log.error(
@@ -52,10 +52,11 @@ public class FineractPartitionJobConfigValidationCondition
implements Condition
private static Predicate<FineractProperties.PartitionedJobProperty>
isAnyConfigBelowOne() {
return partitionedJobProperty ->
!(partitionedJobProperty.getPartitionSize() > 0 &&
partitionedJobProperty.getChunkSize() > 0
- && partitionedJobProperty.getThreadCount() > 0);
+ && partitionedJobProperty.getThreadPoolCorePoolSize() > 0 &&
partitionedJobProperty.getThreadPoolMaxPoolSize() > 0
+ && partitionedJobProperty.getThreadPoolQueueCapacity() > 0);
}
- private static boolean
isPartitionRatioInvalid(FineractProperties.PartitionedJobProperty
partitionedJobProperty) {
- return partitionedJobProperty.getPartitionSize() <
partitionedJobProperty.getThreadCount() * partitionedJobProperty.getChunkSize();
+ private static boolean
invalidMaxPoolSize(FineractProperties.PartitionedJobProperty
partitionedJobProperty) {
+ return partitionedJobProperty.getThreadPoolMaxPoolSize() <
partitionedJobProperty.getThreadPoolCorePoolSize();
}
}
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
index bd149d4eb..f37d27326 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/core/config/FineractProperties.java
@@ -155,8 +155,11 @@ public class FineractProperties {
private String jobName;
private Integer chunkSize;
private Integer partitionSize;
- private Integer threadCount;
+ private Integer threadPoolCorePoolSize;
+ private Integer threadPoolMaxPoolSize;
+ private Integer threadPoolQueueCapacity;
private Integer retryLimit;
+
}
@Getter
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
index 40f39a6d3..8fd736fdd 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/AbstractLoanItemReader.java
@@ -18,7 +18,7 @@
*/
package org.apache.fineract.cob.loan;
-import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
@@ -40,21 +40,19 @@ public abstract class AbstractLoanItemReader implements
ItemReader<Loan> {
protected final LoanRepository loanRepository;
@Setter(AccessLevel.PROTECTED)
- private List<Long> remainingData;
- private Long loanId;
+ private LinkedBlockingQueue<Long> remainingData;
@Override
public Loan read() throws Exception {
- try {
- if (remainingData.size() > 0) {
- loanId = remainingData.remove(0);
+ final Long loanId = remainingData.poll();
+ if (loanId != null) {
+ try {
return loanRepository.findById(loanId).orElseThrow(() -> new
LoanNotFoundException(loanId));
+ } catch (Exception e) {
+ throw new LoanReadException(loanId, e);
}
- } catch (Exception e) {
- throw new LoanReadException(loanId, e);
}
return null;
-
}
@AfterStep
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
similarity index 56%
copy from
fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
copy to
fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
index 053b2536b..c6bccf0ed 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/ContextAwareTaskDecorator.java
@@ -16,13 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.fineract.infrastructure.springbatch;
+package org.apache.fineract.cob.loan;
-public interface PropertyService {
+import org.apache.fineract.infrastructure.core.domain.FineractContext;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.core.task.TaskDecorator;
- Integer getPartitionSize(String jobName);
+public class ContextAwareTaskDecorator implements TaskDecorator {
- Integer getChunkSize(String jobName);
+ @Override
+ public Runnable decorate(@NotNull Runnable runnable) {
+ final FineractContext context = ThreadLocalContextUtil.getContext();
+ return () -> {
+ ThreadLocalContextUtil.init(context);
+ runnable.run();
+ };
+ }
- Integer getRetryLimit(String jobName);
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
index deaa9aaee..f1030f01e 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/InlineCOBLoanItemReader.java
@@ -18,8 +18,8 @@
*/
package org.apache.fineract.cob.loan;
-import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.fineract.portfolio.loanaccount.domain.LoanRepository;
import org.jetbrains.annotations.NotNull;
import org.springframework.batch.core.StepExecution;
@@ -37,6 +37,6 @@ public class InlineCOBLoanItemReader extends
AbstractLoanItemReader {
public void beforeStep(@NotNull StepExecution stepExecution) {
ExecutionContext executionContext =
stepExecution.getJobExecution().getExecutionContext();
List<Long> loanIds = (List<Long>)
executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER);
- setRemainingData(new ArrayList<>(loanIds));
+ setRemainingData(new LinkedBlockingQueue<>(loanIds));
}
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
index c9e417c53..46c6c9736 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanCOBWorkerConfiguration.java
@@ -34,6 +34,7 @@ import
org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import
org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -41,7 +42,10 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.core.task.SyncTaskExecutor;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.QueueChannel;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
@@ -86,7 +90,7 @@ public class LoanCOBWorkerConfiguration {
@Bean
public Flow flow() {
- return new
FlowBuilder<Flow>("cobFlow").start(initialisationStep(null)).next(applyLockStep(null)).next(loanBusinessStep(null))
+ return new
FlowBuilder<Flow>("cobFlow").start(initialisationStep(null)).next(applyLockStep(null)).next(loanBusinessStep(null,
null))
.next(resetContextStep(null)).build();
}
@@ -97,14 +101,43 @@ public class LoanCOBWorkerConfiguration {
.build();
}
+ @Bean
+ public TaskExecutor cobTaskExecutor() {
+ if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME)
== 1) {
+ return new SyncTaskExecutor();
+ }
+ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+ taskExecutor.setThreadNamePrefix("COB-Thread-");
+ taskExecutor.setThreadGroupName("COB-Thread");
+
taskExecutor.setCorePoolSize(propertyService.getThreadPoolCorePoolSize(JobName.LOAN_COB.name()));
+
taskExecutor.setMaxPoolSize(propertyService.getThreadPoolMaxPoolSize(JobName.LOAN_COB.name()));
+
taskExecutor.setQueueCapacity(propertyService.getThreadPoolQueueCapacity(JobName.LOAN_COB.name()));
+ taskExecutor.setAllowCoreThreadTimeOut(true);
+ taskExecutor.setTaskDecorator(new ContextAwareTaskDecorator());
+ return taskExecutor;
+ }
+
@Bean
@StepScope
- public Step
loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String
partitionName) {
- return new StepBuilder("Loan Business - Step:" + partitionName,
jobRepository)
- .<Loan,
Loan>chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()),
transactionManager).reader(cobWorkerItemReader())
-
.processor(cobWorkerItemProcessor()).writer(cobWorkerItemWriter()).faultTolerant().retry(Exception.class)
-
.retryLimit(propertyService.getRetryLimit(LoanCOBConstant.JOB_NAME)).skip(Exception.class)
-
.skipLimit(propertyService.getChunkSize(JobName.LOAN_COB.name()) +
1).listener(loanItemListener()).build();
+ public Step
loanBusinessStep(@Value("#{stepExecutionContext['partition']}") String
partitionName, TaskExecutor cobTaskExecutor) {
+ SimpleStepBuilder<Loan, Loan> stepBuilder = new StepBuilder("Loan
Business - Step:" + partitionName, jobRepository)
+ .<Loan,
Loan>chunk(propertyService.getChunkSize(JobName.LOAN_COB.name()),
transactionManager) //
+ .reader(cobWorkerItemReader()) //
+ .processor(cobWorkerItemProcessor()) //
+ .writer(cobWorkerItemWriter()) //
+ .faultTolerant() //
+ .retry(Exception.class) //
+
.retryLimit(propertyService.getRetryLimit(LoanCOBConstant.JOB_NAME)) //
+ .skip(Exception.class) //
+
.skipLimit(propertyService.getChunkSize(LoanCOBConstant.JOB_NAME) + 1) //
+ .listener(loanItemListener()) //
+ .transactionManager(transactionManager);
+
+ if (propertyService.getThreadPoolMaxPoolSize(LoanCOBConstant.JOB_NAME)
> 1) {
+ stepBuilder.taskExecutor(cobTaskExecutor);
+ }
+
+ return stepBuilder.build();
}
@Bean
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
index 8e5c0a3ec..c236257f7 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/cob/loan/LoanItemReader.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.fineract.cob.common.CustomJobParameterResolver;
import org.apache.fineract.cob.data.LoanCOBParameter;
import org.apache.fineract.cob.domain.LoanAccountLock;
@@ -64,7 +65,7 @@ public class LoanItemReader extends AbstractLoanItemReader {
List<Long> lockedByCOBChunkProcessingAccountIds =
getLoanIdsLockedWithChunkProcessingLock(loanIds);
loanIds.retainAll(lockedByCOBChunkProcessingAccountIds);
}
- setRemainingData(new ArrayList<>(loanIds));
+ setRemainingData(new LinkedBlockingQueue<>(loanIds));
}
private List<Long> getLoanIdsLockedWithChunkProcessingLock(List<Long>
loanIds) {
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
index 053b2536b..6e234a29a 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyService.java
@@ -25,4 +25,10 @@ public interface PropertyService {
Integer getChunkSize(String jobName);
Integer getRetryLimit(String jobName);
+
+ Integer getThreadPoolCorePoolSize(String jobName);
+
+ Integer getThreadPoolMaxPoolSize(String jobName);
+
+ Integer getThreadPoolQueueCapacity(String jobName);
}
diff --git
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
index d8cc1ba52..1eab3979c 100644
---
a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
+++
b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/springbatch/PropertyServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.fineract.infrastructure.springbatch;
import java.util.List;
+import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
import org.springframework.stereotype.Service;
@@ -31,34 +32,41 @@ public class PropertyServiceImpl implements PropertyService
{
@Override
public Integer getPartitionSize(String jobName) {
- List<FineractProperties.PartitionedJobProperty> jobProperties =
fineractProperties.getPartitionedJob()
- .getPartitionedJobProperties();
- return jobProperties.stream() //
- .filter(jobProperty ->
jobName.equals(jobProperty.getJobName())) //
- .findFirst() //
-
.map(FineractProperties.PartitionedJobProperty::getPartitionSize) //
- .orElse(1);
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getPartitionSize);
}
@Override
public Integer getChunkSize(String jobName) {
- List<FineractProperties.PartitionedJobProperty> jobProperties =
fineractProperties.getPartitionedJob()
- .getPartitionedJobProperties();
- return jobProperties.stream() //
- .filter(jobProperty ->
jobName.equals(jobProperty.getJobName())) //
- .findFirst() //
- .map(FineractProperties.PartitionedJobProperty::getChunkSize)
//
- .orElse(1);
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getChunkSize);
}
@Override
public Integer getRetryLimit(String jobName) {
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getRetryLimit);
+ }
+
+ @Override
+ public Integer getThreadPoolCorePoolSize(String jobName) {
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getThreadPoolCorePoolSize);
+ }
+
+ @Override
+ public Integer getThreadPoolMaxPoolSize(String jobName) {
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getThreadPoolMaxPoolSize);
+ }
+
+ @Override
+ public Integer getThreadPoolQueueCapacity(String jobName) {
+ return getProperty(jobName,
FineractProperties.PartitionedJobProperty::getThreadPoolQueueCapacity);
+ }
+
+ private Integer getProperty(String jobName, Function<? super
FineractProperties.PartitionedJobProperty, Integer> function) {
List<FineractProperties.PartitionedJobProperty> jobProperties =
fineractProperties.getPartitionedJob()
.getPartitionedJobProperties();
return jobProperties.stream() //
.filter(jobProperty ->
jobName.equals(jobProperty.getJobName())) //
.findFirst() //
- .map(FineractProperties.PartitionedJobProperty::getRetryLimit)
//
+ .map(function) //
.orElse(1);
}
}
diff --git a/fineract-provider/src/main/resources/application.properties
b/fineract-provider/src/main/resources/application.properties
index 333e094d2..77b59ad1a 100644
--- a/fineract-provider/src/main/resources/application.properties
+++ b/fineract-provider/src/main/resources/application.properties
@@ -64,7 +64,9 @@
fineract.job.stuck-retry-threshold=${FINERACT_JOB_STUCK_RETRY_THRESHOLD:5}
fineract.partitioned-job.partitioned-job-properties[0].job-name=LOAN_COB
fineract.partitioned-job.partitioned-job-properties[0].chunk-size=${LOAN_COB_CHUNK_SIZE:100}
fineract.partitioned-job.partitioned-job-properties[0].partition-size=${LOAN_COB_PARTITION_SIZE:100}
-fineract.partitioned-job.partitioned-job-properties[0].thread-count=${LOAN_COB_THREAD_COUNT:1}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-core-pool-size=${LOAN_COB_THREAD_POOL_CORE_POOL_SIZE:5}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-max-pool-size=${LOAN_COB_THREAD_POOL_MAX_POOL_SIZE:5}
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-queue-capacity=${LOAN_COB_THREAD_POOL_QUEUE_CAPACITY:20}
fineract.partitioned-job.partitioned-job-properties[0].retry-limit=${LOAN_COB_RETRY_LIMIT:5}
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_SPRING_EVENTS_ENABLED:true}
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
b/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
new file mode 100644
index 000000000..9fe36b1dc
--- /dev/null
+++
b/fineract-provider/src/test/java/org/apache/fineract/cob/loan/LoanItemReaderTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.fineract.cob.loan;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.fineract.cob.common.CustomJobParameterResolver;
+import org.apache.fineract.cob.data.LoanCOBParameter;
+import org.apache.fineract.cob.domain.LoanAccountLock;
+import org.apache.fineract.cob.domain.LockOwner;
+import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
+import org.apache.fineract.portfolio.loanaccount.domain.Loan;
+import org.apache.fineract.portfolio.loanaccount.domain.LoanRepository;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.item.ExecutionContext;
+
+@ExtendWith(MockitoExtension.class)
+class LoanItemReaderTest {
+
+ @Mock
+ private LoanRepository loanRepository;
+
+ @Mock
+ private RetrieveLoanIdService retrieveLoanIdService;
+
+ @Mock
+ private CustomJobParameterResolver customJobParameterResolver;
+
+ @Mock
+ private LoanLockingService loanLockingService;
+
+ @Mock
+ private StepExecution stepExecution;
+
+ @Mock
+ private ExecutionContext executionContext;
+
+ @Mock
+ private Loan loan;
+
+ @Test
+ public void testLoanItemReaderSimple() throws Exception {
+ // given
+ ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L,
"test", "test", "UTC", null));
+ LoanItemReader loanItemReader = new LoanItemReader(loanRepository,
retrieveLoanIdService, customJobParameterResolver,
+ loanLockingService);
+ when(stepExecution.getExecutionContext()).thenReturn(executionContext);
+ LoanCOBParameter loanCOBParameter = new LoanCOBParameter(1L, 5L);
+
when(executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER)).thenReturn(loanCOBParameter);
+
when(retrieveLoanIdService.retrieveAllNonClosedLoansByLastClosedBusinessDateAndMinAndMaxLoanId(loanCOBParameter,
false))
+ .thenReturn(new ArrayList<>(List.of(1L, 2L, 3L, 4L, 5L)));
+ List<LoanAccountLock> accountLocks = List.of(1L, 2L, 3L, 4L,
5L).stream()
+ .map(l -> new LoanAccountLock(l,
LockOwner.LOAN_COB_CHUNK_PROCESSING, LocalDate.of(2023, 7, 25))).toList();
+ when(loanLockingService.findAllByLoanIdInAndLockOwner(List.of(1L, 2L,
3L, 4L, 5L), LockOwner.LOAN_COB_CHUNK_PROCESSING))
+ .thenReturn(accountLocks);
+ when(loanRepository.findById(anyLong())).thenReturn(Optional.of(loan));
+
+ // when + then
+ loanItemReader.beforeStep(stepExecution);
+ for (long i = 1; i <= 5; i++) {
+ Loan myLoan = loanItemReader.read();
+ Assertions.assertEquals(loan, myLoan);
+ verify(loanRepository, times(1)).findById(i);
+ }
+
+ Mockito.verifyNoMoreInteractions(loanRepository);
+ }
+
+ @Test
+ public void testLoanItemReaderMultiThreadRead() throws Exception {
+ // given
+ ThreadLocalContextUtil.setTenant(new FineractPlatformTenant(1L,
"test", "test", "UTC", null));
+ LoanItemReader loanItemReader = new LoanItemReader(loanRepository,
retrieveLoanIdService, customJobParameterResolver,
+ loanLockingService);
+ when(stepExecution.getExecutionContext()).thenReturn(executionContext);
+ LoanCOBParameter loanCOBParameter = new LoanCOBParameter(1L, 100L);
+
when(executionContext.get(LoanCOBConstant.LOAN_COB_PARAMETER)).thenReturn(loanCOBParameter);
+
when(retrieveLoanIdService.retrieveAllNonClosedLoansByLastClosedBusinessDateAndMinAndMaxLoanId(loanCOBParameter,
false))
+ .thenReturn(new ArrayList<>(IntStream.rangeClosed(1,
100).boxed().map(Long::valueOf).toList()));
+ List<LoanAccountLock> accountLocks = IntStream.rangeClosed(1,
100).boxed().map(Long::valueOf)
+ .map(l -> new LoanAccountLock(l,
LockOwner.LOAN_COB_CHUNK_PROCESSING, LocalDate.of(2023, 7, 25))).toList();
+
when(loanLockingService.findAllByLoanIdInAndLockOwner(IntStream.rangeClosed(1,
100).boxed().map(Long::valueOf).toList(),
+ LockOwner.LOAN_COB_CHUNK_PROCESSING)).thenReturn(accountLocks);
+ when(loanRepository.findById(anyLong())).thenReturn(Optional.of(loan));
+
+ // when + then
+ loanItemReader.beforeStep(stepExecution);
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ for (int i = 1; i <= 100; i++) {
+ Future<?> notUsed = executorService.submit(() -> {
+ try {
+ Loan myLoan = loanItemReader.read();
+ Assertions.assertEquals(loan, myLoan);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ executorService.shutdown();
+ boolean b = executorService.awaitTermination(5L, TimeUnit.SECONDS);
+ Assertions.assertTrue(b, "Executor did not terminate successfully");
+
+ // verify that this was called 100times, and for each loan it was
called exactly once
+ for (long i = 1; i <= 100; i++) {
+ verify(loanRepository, times(1)).findById(i);
+ }
+
+ Mockito.verifyNoMoreInteractions(loanRepository);
+ }
+
+}
diff --git
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
index 6342a7a21..a2f0413e9 100644
---
a/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
+++
b/fineract-provider/src/test/java/org/apache/fineract/infrastructure/core/config/FineractPartitionJobConfigValidationConditionTest.java
@@ -65,16 +65,6 @@ class FineractPartitionJobConfigValidationConditionTest {
}
}
- @Test
- public void
testApplicationStartup_ShouldApplicationStartupFails_WhenPartitionSizeInvalid()
{
- try (MockedStatic<ExplicitConfigurationPropertiesFactory>
propertyFactory = Mockito
- .mockStatic(ExplicitConfigurationPropertiesFactory.class)) {
- propertyFactory.when(() ->
ExplicitConfigurationPropertiesFactory.getProperty(context,
"fineract.partitioned-job",
-
FineractProperties.FineractPartitionedJob.class)).thenReturn(getInvalidPartitionSize());
- assertTrue(testObj.matches(context, metadata));
- }
- }
-
@Test
public void
testApplicationStartup_ShouldApplicationStart_WhenConfigValid() {
try (MockedStatic<ExplicitConfigurationPropertiesFactory>
propertyFactory = Mockito
@@ -92,20 +82,9 @@ class FineractPartitionJobConfigValidationConditionTest {
partitionedJobProperty.setJobName("LOAN_COB");
partitionedJobProperty.setPartitionSize(100);
partitionedJobProperty.setChunkSize(10);
- partitionedJobProperty.setThreadCount(10);
- jobProperties.add(partitionedJobProperty);
- partitionedJob.setPartitionedJobProperties(jobProperties);
- return partitionedJob;
- }
-
- private FineractProperties.FineractPartitionedJob
getInvalidPartitionSize() {
- FineractProperties.FineractPartitionedJob partitionedJob = new
FineractProperties.FineractPartitionedJob();
- List<FineractProperties.PartitionedJobProperty> jobProperties = new
ArrayList<>();
- FineractProperties.PartitionedJobProperty partitionedJobProperty = new
FineractProperties.PartitionedJobProperty();
- partitionedJobProperty.setJobName("LOAN_COB");
- partitionedJobProperty.setPartitionSize(99);
- partitionedJobProperty.setChunkSize(10);
- partitionedJobProperty.setThreadCount(10);
+ partitionedJobProperty.setThreadPoolCorePoolSize(10);
+ partitionedJobProperty.setThreadPoolMaxPoolSize(20);
+ partitionedJobProperty.setThreadPoolQueueCapacity(10);
jobProperties.add(partitionedJobProperty);
partitionedJob.setPartitionedJobProperties(jobProperties);
return partitionedJob;
@@ -118,7 +97,9 @@ class FineractPartitionJobConfigValidationConditionTest {
partitionedJobProperty.setJobName("LOAN_COB");
partitionedJobProperty.setPartitionSize(0);
partitionedJobProperty.setChunkSize(1);
- partitionedJobProperty.setThreadCount(1);
+ partitionedJobProperty.setThreadPoolCorePoolSize(1);
+ partitionedJobProperty.setThreadPoolMaxPoolSize(1);
+ partitionedJobProperty.setThreadPoolQueueCapacity(1);
jobProperties.add(partitionedJobProperty);
partitionedJob.setPartitionedJobProperties(jobProperties);
return partitionedJob;
diff --git a/fineract-provider/src/test/resources/application-test.properties
b/fineract-provider/src/test/resources/application-test.properties
index 584ff5a36..caa5ff6b4 100644
--- a/fineract-provider/src/test/resources/application-test.properties
+++ b/fineract-provider/src/test/resources/application-test.properties
@@ -42,7 +42,9 @@ fineract.mode.batch-enabled=true
fineract.partitioned-job.partitioned-job-properties[0].job-name=LOAN_COB
fineract.partitioned-job.partitioned-job-properties[0].chunk-size=100
fineract.partitioned-job.partitioned-job-properties[0].partition-size=100
-fineract.partitioned-job.partitioned-job-properties[0].thread-count=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-core-pool-size=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-max-pool-size=1
+fineract.partitioned-job.partitioned-job-properties[0].thread-pool-queue-capacity=1
fineract.partitioned-job.partitioned-job-properties[0].retry-limit=5
fineract.remote-job-message-handler.spring-events.enabled=${FINERACT_REMOTE_JOB_MESSAGE_HANDLER_SPRING_EVENTS_ENABLED:true}