This is an automated email from the ASF dual-hosted git repository. avikg pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push: new c243a96 FINERACT-1442 : Multi-threading-for-savings-interest-posting-job (#1971) c243a96 is described below commit c243a96548a056ecc30b40e3aff441c22f3f3d1b Author: Manoj <56669674+fynma...@users.noreply.github.com> AuthorDate: Thu Nov 18 23:13:55 2021 +0530 FINERACT-1442 : Multi-threading-for-savings-interest-posting-job (#1971) --- .../jobs/service/JobRegisterServiceImpl.java | 4 + .../service/SavingsAccountReadPlatformService.java | 1 + .../SavingsAccountReadPlatformServiceImpl.java | 12 ++ .../service/SavingsSchedularInterestPoster.java | 138 +++++++++++++++++++ .../savings/service/SavingsSchedularService.java | 3 +- .../service/SavingsSchedularServiceImpl.java | 147 +++++++++++++++++---- ..._for_parallelizing_savings_interest_posting.sql | 22 +++ 7 files changed, 299 insertions(+), 28 deletions(-) diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java index 6bb2061..af229e7 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java @@ -381,6 +381,10 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi jobDetailFactoryBean.setTargetMethod(jobDetails.methodName); jobDetailFactoryBean.setGroup(scheduledJobDetail.getGroupName()); jobDetailFactoryBean.setConcurrent(false); + Map<String, String> jobParameterMap = getJobParameter(scheduledJobDetail); + if (!jobParameterMap.isEmpty()) { + jobDetailFactoryBean.setArguments(jobParameterMap); + } jobDetailFactoryBean.afterPropertiesSet(); return jobDetailFactoryBean.getObject(); } diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformService.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformService.java index 9bfa4fa..9c794fb 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformService.java +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformService.java @@ -62,4 +62,5 @@ public interface SavingsAccountReadPlatformService { String retrieveAccountNumberByAccountId(Long accountId); + List<Long> getAccountsIdsByStatusPaged(Integer status, int pageSize, Long maxSavingsIdInList); } diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformServiceImpl.java index 565a278..9f0b854 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsAccountReadPlatformServiceImpl.java @@ -1240,4 +1240,16 @@ public class SavingsAccountReadPlatformServiceImpl implements SavingsAccountRead throw new SavingsAccountNotFoundException(accountId, e); } } + + @Override + public List<Long> getAccountsIdsByStatusPaged(Integer status, int pageSize, Long maxSavingsIdInList) { + String sql = new StringBuilder().append(" SELECT sa.id FROM m_savings_account sa ") + .append(" where sa.id > ? and sa.status_enum = ? ").append(" order by sa.id limit ?").toString(); + + try { + return this.jdbcTemplate.queryForList(sql, Long.class, new Object[] { maxSavingsIdInList, status, pageSize }); + } catch (EmptyResultDataAccessException e) { + return new ArrayList<>(); + } + } } diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularInterestPoster.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularInterestPoster.java new file mode 100644 index 0000000..6dcbdae --- /dev/null +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularInterestPoster.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.portfolio.savings.service; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.security.SecureRandom; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; +import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException; +import org.apache.fineract.portfolio.savings.domain.SavingsAccount; +import org.apache.fineract.portfolio.savings.domain.SavingsAccountAssembler; +import org.apache.fineract.portfolio.savings.domain.SavingsAccountRepositoryWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.orm.ObjectOptimisticLockingFailureException; +import org.springframework.stereotype.Component; + +/** + * @author manoj + */ + +@Component +@Scope("prototype") +public class SavingsSchedularInterestPoster implements Callable<Void> { + + private static final Logger LOG = LoggerFactory.getLogger(SavingsSchedularInterestPoster.class); + private static final SecureRandom random = new SecureRandom(); + + private Collection<Long> savingsIds; + private SavingsAccountWritePlatformService savingsAccountWritePlatformService; + private SavingsAccountRepositoryWrapper savingsAccountRepository; + private SavingsAccountAssembler savingAccountAssembler; + private FineractPlatformTenant tenant; + + public void setSavingsIds(Collection<Long> savingsIds) { + this.savingsIds = savingsIds; + } + + public void setSavingsAccountWritePlatformService(SavingsAccountWritePlatformService savingsAccountWritePlatformService) { + this.savingsAccountWritePlatformService = savingsAccountWritePlatformService; + } + + public void setSavingsAccountRepository(SavingsAccountRepositoryWrapper savingsAccountRepository) { + this.savingsAccountRepository = savingsAccountRepository; + } + + public void setSavingAccountAssembler(SavingsAccountAssembler savingAccountAssembler) { + this.savingAccountAssembler = savingAccountAssembler; + } + + public void setTenant(FineractPlatformTenant tenant) { + this.tenant = tenant; + } + + @Override + @SuppressFBWarnings(value = { + "DMI_RANDOM_USED_ONLY_ONCE" }, justification = "False positive for random object created and used only once") + public Void call() throws org.apache.fineract.infrastructure.jobs.exception.JobExecutionException { + ThreadLocalContextUtil.setTenant(tenant); + Integer maxNumberOfRetries = tenant.getConnection().getMaxRetriesOnDeadlock(); + Integer maxIntervalBetweenRetries = tenant.getConnection().getMaxIntervalBetweenRetries(); + + int i = 0; + if (!savingsIds.isEmpty()) { + List<Throwable> errors = new ArrayList<>(); + for (Long savingsId : savingsIds) { + LOG.info("Savings ID {}", savingsId); + Integer numberOfRetries = 0; + while (numberOfRetries <= maxNumberOfRetries) { + try { + SavingsAccount savingsAccount = this.savingsAccountRepository.findOneWithNotFoundDetection(savingsId); + this.savingAccountAssembler.assignSavingAccountHelpers(savingsAccount); + boolean postInterestAsOn = false; + LocalDate transactionDate = null; + this.savingsAccountWritePlatformService.postInterest(savingsAccount, postInterestAsOn, transactionDate); + + numberOfRetries = maxNumberOfRetries + 1; + } catch (CannotAcquireLockException | ObjectOptimisticLockingFailureException exception) { + LOG.info("Interest posting job for savings ID {} has been retried {} time(s)", savingsId, numberOfRetries); + // Fail if the transaction has been retired for + // maxNumberOfRetries + if (numberOfRetries >= maxNumberOfRetries) { + LOG.error( + "Interest posting job for savings ID {} has been retried for the max allowed attempts of {} and will be rolled back", + savingsId, numberOfRetries); + errors.add(exception); + break; + } + // Else sleep for a random time (between 1 to 10 + // seconds) and continue + try { + int randomNum = random.nextInt(maxIntervalBetweenRetries + 1); + Thread.sleep(1000 + (randomNum * 1000)); + numberOfRetries = numberOfRetries + 1; + } catch (InterruptedException e) { + LOG.error("Interest posting job for savings retry failed due to InterruptedException", e); + errors.add(e); + break; + } + } catch (Exception e) { + LOG.error("Interest posting job for savings failed for account {}", savingsId, e); + numberOfRetries = maxNumberOfRetries + 1; + errors.add(e); + } + i++; + } + LOG.info("Savings count {}", i); + } + if (!errors.isEmpty()) { + throw new JobExecutionException(errors); + } + } + return null; + } +} diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularService.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularService.java index a99ecd8..3c834ef 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularService.java +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularService.java @@ -18,11 +18,12 @@ */ package org.apache.fineract.portfolio.savings.service; +import java.util.Map; import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException; public interface SavingsSchedularService { - void postInterestForAccounts() throws JobExecutionException; + void postInterestForAccounts(Map<String, String> jobParameters) throws JobExecutionException; void updateSavingsDormancyStatus() throws JobExecutionException; diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularServiceImpl.java index c3464de..c684440 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/savings/service/SavingsSchedularServiceImpl.java @@ -22,20 +22,27 @@ import static org.apache.fineract.portfolio.savings.domain.SavingsAccountStatusT import java.time.LocalDate; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.fineract.infrastructure.core.service.DateUtils; +import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; import org.apache.fineract.infrastructure.jobs.annotation.CronTarget; import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException; import org.apache.fineract.infrastructure.jobs.service.JobName; -import org.apache.fineract.portfolio.savings.domain.SavingsAccount; import org.apache.fineract.portfolio.savings.domain.SavingsAccountAssembler; import org.apache.fineract.portfolio.savings.domain.SavingsAccountRepositoryWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.Page; -import org.springframework.data.domain.PageRequest; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; @Service public class SavingsSchedularServiceImpl implements SavingsSchedularService { @@ -46,45 +53,131 @@ public class SavingsSchedularServiceImpl implements SavingsSchedularService { private final SavingsAccountWritePlatformService savingsAccountWritePlatformService; private final SavingsAccountReadPlatformService savingAccountReadPlatformService; private final SavingsAccountRepositoryWrapper savingsAccountRepository; + private final ApplicationContext applicationContext; @Autowired public SavingsSchedularServiceImpl(final SavingsAccountAssembler savingAccountAssembler, final SavingsAccountWritePlatformService savingsAccountWritePlatformService, final SavingsAccountReadPlatformService savingAccountReadPlatformService, - final SavingsAccountRepositoryWrapper savingsAccountRepository) { + final SavingsAccountRepositoryWrapper savingsAccountRepository, final ApplicationContext applicationContext) { this.savingAccountAssembler = savingAccountAssembler; this.savingsAccountWritePlatformService = savingsAccountWritePlatformService; this.savingAccountReadPlatformService = savingAccountReadPlatformService; this.savingsAccountRepository = savingsAccountRepository; + this.applicationContext = applicationContext; } @Override @CronTarget(jobName = JobName.POST_INTEREST_FOR_SAVINGS) - public void postInterestForAccounts() throws JobExecutionException { - int page = 0; - Integer initialSize = 500; - Integer totalPageSize = 0; - List<Throwable> errors = new ArrayList<>(); - do { - PageRequest pageRequest = PageRequest.of(page, initialSize); - Page<SavingsAccount> savingsAccounts = this.savingsAccountRepository.findByStatus(ACTIVE.getValue(), pageRequest); - for (SavingsAccount savingsAccount : savingsAccounts.getContent()) { - try { - this.savingAccountAssembler.assignSavingAccountHelpers(savingsAccount); - boolean postInterestAsOn = false; - LocalDate transactionDate = null; - this.savingsAccountWritePlatformService.postInterest(savingsAccount, postInterestAsOn, transactionDate); - } catch (Exception e) { - LOG.error("Failed to post interest for Savings with id {}", savingsAccount.getId(), e); - errors.add(e); - } + public void postInterestForAccounts(Map<String, String> jobParameters) throws JobExecutionException { + + final int threadPoolSize = Integer.parseInt(jobParameters.get("thread-pool-size")); + final int batchSize = Integer.parseInt(jobParameters.get("batch-size")); + final int pageSize = batchSize * threadPoolSize; + Long maxSavingsIdInList = 0L; + // initialise the executor service with fetched configurations + final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); + + List<Long> savingsIds = Collections.synchronizedList( + this.savingAccountReadPlatformService.getAccountsIdsByStatusPaged(ACTIVE.getValue(), pageSize, maxSavingsIdInList)); + if (!CollectionUtils.isEmpty(savingsIds)) { + do { + int totalFilteredRecords = savingsIds.size(); + LOG.info("Starting Interest posting - total filtered records - {}", totalFilteredRecords); + postInterest(savingsIds, threadPoolSize, batchSize, executorService); + maxSavingsIdInList = savingsIds.get(savingsIds.size() - 1); + savingsIds = Collections.synchronizedList( + this.savingAccountReadPlatformService.getAccountsIdsByStatusPaged(ACTIVE.getValue(), pageSize, maxSavingsIdInList)); + } while (!CollectionUtils.isEmpty(savingsIds)); + } + // shutdown the executor when done + executorService.shutdownNow(); + } + + private void postInterest(List<Long> savingsIds, int threadPoolSize, int batchSize, ExecutorService executorService) { + List<Callable<Void>> posters = new ArrayList<>(); + int fromIndex = 0; + // get the size of current paginated dataset + int size = savingsIds.size(); + // calculate the batch size + batchSize = (int) Math.ceil((double) size / threadPoolSize); + + if (batchSize == 0) { + return; + } + + int toIndex = (batchSize > size - 1) ? size : batchSize; + while (toIndex < size && savingsIds.get(toIndex - 1).equals(savingsIds.get(toIndex))) { + toIndex++; + } + boolean lastBatch = false; + int loopCount = size / batchSize + 1; + + for (long i = 0; i < loopCount; i++) { + List<Long> subList = safeSubList(savingsIds, fromIndex, toIndex); + SavingsSchedularInterestPoster poster = (SavingsSchedularInterestPoster) this.applicationContext + .getBean("savingsSchedularInterestPoster"); + poster.setSavingsIds(subList); + poster.setTenant(ThreadLocalContextUtil.getTenant()); + poster.setSavingsAccountWritePlatformService(savingsAccountWritePlatformService); + poster.setSavingsAccountRepository(savingsAccountRepository); + poster.setSavingAccountAssembler(savingAccountAssembler); + posters.add(poster); + + if (lastBatch) { + break; + } + if (toIndex + batchSize > size - 1) { + lastBatch = true; } - page++; - totalPageSize = savingsAccounts.getTotalPages(); - } while (page < totalPageSize); + fromIndex = fromIndex + (toIndex - fromIndex); + toIndex = (toIndex + batchSize > size - 1) ? size : toIndex + batchSize; + while (toIndex < size && savingsIds.get(toIndex - 1).equals(savingsIds.get(toIndex))) { + toIndex++; + } + } + try { + List<Future<Void>> responses = executorService.invokeAll(posters); + checkCompletion(responses); + } catch (InterruptedException e1) { + LOG.error("Interrupted while postInterest", e1); + } + } + + // break the lists into sub lists + public <T> List<T> safeSubList(List<T> list, int fromIndex, int toIndex) { + int size = list.size(); + if (fromIndex >= size || toIndex <= 0 || fromIndex >= toIndex) { + return Collections.emptyList(); + } + + fromIndex = Math.max(0, fromIndex); + toIndex = Math.min(size, toIndex); - if (!errors.isEmpty()) { - throw new JobExecutionException(errors); + return list.subList(fromIndex, toIndex); + } + + // checks the execution of task by each thread in the executor service + private void checkCompletion(List<Future<Void>> responses) { + try { + for (Future f : responses) { + f.get(); + } + boolean allThreadsExecuted = false; + int noOfThreadsExecuted = 0; + for (Future<Void> future : responses) { + if (future.isDone()) { + noOfThreadsExecuted++; + } + } + allThreadsExecuted = noOfThreadsExecuted == responses.size(); + if (!allThreadsExecuted) { + LOG.error("All threads could not execute."); + } + } catch (InterruptedException e1) { + LOG.error("Interrupted while interest posting entries", e1); + } catch (ExecutionException e2) { + LOG.error("Execution exception while interest posting entries", e2); } } diff --git a/fineract-provider/src/main/resources/sql/migrations/core_db/V377__job_prams_for_parallelizing_savings_interest_posting.sql b/fineract-provider/src/main/resources/sql/migrations/core_db/V377__job_prams_for_parallelizing_savings_interest_posting.sql new file mode 100644 index 0000000..6983df6 --- /dev/null +++ b/fineract-provider/src/main/resources/sql/migrations/core_db/V377__job_prams_for_parallelizing_savings_interest_posting.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + + +INSERT IGNORE INTO `job_parameters`(`job_id`,`parameter_name`,`parameter_value`) VALUES((select id from job where name = 'Post Interest For Savings'),'thread-pool-size',10); +INSERT IGNORE INTO `job_parameters`(`job_id`,`parameter_name`,`parameter_value`) VALUES((select id from job where name = 'Post Interest For Savings'),'batch-size',100);