This is an automated email from the ASF dual-hosted git repository. avikg pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push: new e4b270c Feat: Node aware job scheduler (#1886) e4b270c is described below commit e4b270c0f5be571d7268a87a2cd612acd4ef84d8 Author: Benura Abeywardena <43112139+bla...@users.noreply.github.com> AuthorDate: Thu Oct 14 16:24:21 2021 +0530 Feat: Node aware job scheduler (#1886) --- docker-compose.yml | 1 + .../glaccount/domain/GLAccountRepository.java | 3 ++- .../domain/GLAccountRepositoryWrapper.java | 3 ++- .../journalentry/data/JournalEntryData.java | 3 ++- .../constants/ChartOfAcountsConstants.java | 4 +-- .../chartofaccounts/ChartOfAccountsWorkbook.java | 17 +++++++------ .../jobs/domain/ScheduledJobDetail.java | 18 ++++++++++++++ .../jobs/domain/ScheduledJobDetailRepository.java | 7 ++++++ .../exception/JobNodeIdMismatchingException.java} | 18 ++++++++------ .../infrastructure/jobs/service/JobName.java | 3 ++- .../jobs/service/JobRegisterServiceImpl.java | 29 +++++++++++++++++++--- .../service/SchedularWritePlatformService.java | 2 +- ...dularWritePlatformServiceJpaRepositoryImpl.java | 4 +-- .../portfolio/loanaccount/domain/Loan.java | 3 ++- .../service/ScheduledJobRunnerService.java | 2 ++ .../service/ScheduledJobRunnerServiceImpl.java | 26 ++++++++++++++++++- .../core_db/V373__node_aware_scheduler_jobs.sql | 26 +++++++++++++++++++ kubernetes/fineract-server-deployment.yml | 2 ++ 18 files changed, 141 insertions(+), 30 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 3724c34..2513129 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,6 +45,7 @@ services: environment: - DRIVERCLASS_NAME=org.drizzle.jdbc.DrizzleDriver - PROTOCOL=jdbc + - node_id=1 - SUB_PROTOCOL=mysql:thin - fineract_tenants_driver=org.drizzle.jdbc.DrizzleDriver - fineract_tenants_url=jdbc:mysql:thin://fineractmysql:3306/fineract_tenants diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java index cf258bd..59dd985 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java +++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java @@ -23,7 +23,8 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> { + // no added behaviour - //adding behaviour to fetch id by glcode for opening balance bulk import + // adding behaviour to fetch id by glcode for opening balance bulk import Optional<GLAccount> findOneByGlCode(String glCode); } diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java index 1482c84..b66345e 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java +++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepositoryWrapper.java @@ -40,7 +40,8 @@ public class GLAccountRepositoryWrapper { public GLAccount findOneWithNotFoundDetection(final Long id) { return this.repository.findById(id).orElseThrow(() -> new GLAccountNotFoundException(id)); } - //finding account id by glcode for opening balance bulk import + + // finding account id by glcode for opening balance bulk import public GLAccount findOneByGlCodeWithNotFoundDetection(final String glCode) { return this.repository.findOneByGlCode(glCode).orElseThrow(() -> new GLAccountNotFoundException(glCode)); } diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java index 22865ed..42cb2dd 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java +++ b/fineract-provider/src/main/java/org/apache/fineract/accounting/journalentry/data/JournalEntryData.java @@ -90,7 +90,8 @@ public class JournalEntryData { private String routingCode; private String receiptNumber; private String bankNumber; - //for opening bal bulk import + + // for opening bal bulk import public JournalEntryData(Long officeId, LocalDate transactionDate, String currencyCode, List<CreditDebit> credits, List<CreditDebit> debits, String locale, String dateFormat) { this.officeId = officeId; diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java index 72b5414..0dd414b 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/constants/ChartOfAcountsConstants.java @@ -34,7 +34,7 @@ public final class ChartOfAcountsConstants { public static final int TAG_COL = 7;// H public static final int TAG_ID_COL = 8;// I public static final int DESCRIPTION_COL = 9;// J - //adding for opening balance bulk import + // adding for opening balance bulk import public static final int OFFICE_COL = 10; // K public static final int OFFICE_COL_ID = 11; // L public static final int CURRENCY_CODE = 12; // M @@ -47,7 +47,7 @@ public final class ChartOfAcountsConstants { public static final int LOOKUP_TAG_COL = 21; // V public static final int LOOKUP_TAG_ID_COL = 22; // W - //adding for opening balance bulk import + // adding for opening balance bulk import public static final int LOOKUP_OFFICE_COL = 23; // X public static final int LOOKUP_OFFICE_ID_COL = 24; // Y diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java index 68a775d..5f96176 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/bulkimport/populator/chartofaccounts/ChartOfAccountsWorkbook.java @@ -113,9 +113,12 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { CellRangeAddressList tagRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(), ChartOfAcountsConstants.TAG_COL, ChartOfAcountsConstants.TAG_COL); CellRangeAddressList officeNameRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(), - ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal office column + ChartOfAcountsConstants.OFFICE_COL, ChartOfAcountsConstants.OFFICE_COL); // validation for opening bal + // office column CellRangeAddressList currencyCodeRange = new CellRangeAddressList(1, SpreadsheetVersion.EXCEL97.getLastRowIndex(), - ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency code for opening balance + ChartOfAcountsConstants.CURRENCY_CODE, ChartOfAcountsConstants.CURRENCY_CODE);// validation for currency + // code for opening + // balance DataValidationHelper validationHelper = new HSSFDataValidationHelper((HSSFSheet) chartOfAccountsSheet); setNames(chartOfAccountsSheet, accountTypesNoDuplicatesList, offices); @@ -186,7 +189,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { writeFormula(ChartOfAcountsConstants.TAG_ID_COL, row, "IF(ISERROR(VLOOKUP($H" + (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE))," + "\"\",(VLOOKUP($H" + (rowNo + 1) + ",$V$2:$W$" + (glAccounts.size() + 1) + ",2,FALSE)))"); - //auto populate office id for bulk import of opening balance + // auto populate office id for bulk import of opening balance writeFormula(ChartOfAcountsConstants.OFFICE_COL_ID, row, "IF(ISERROR(VLOOKUP($K" + (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)),\"\",(VLOOKUP($K" + (rowNo + 1) + ",$X$2:$Y$" + (offices.size() + 1) + ",2,FALSE)))"); @@ -236,7 +239,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { accountTypeIndex++; } } - //opening balance lookup table of offices + // opening balance lookup table of offices startIndex = 1; rowIndex = 1; for (OfficeData office : offices) { @@ -283,7 +286,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE); chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE); chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE); - //adding lookup for opening balance bulk import + // adding lookup for opening balance bulk import chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, TemplatePopulateImportConstants.MEDIUM_COL_SIZE); chartOfAccountsSheet.setColumnWidth(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, TemplatePopulateImportConstants.SMALL_COL_SIZE); @@ -297,7 +300,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { writeString(ChartOfAcountsConstants.TAG_COL, rowHeader, "Tag"); writeString(ChartOfAcountsConstants.TAG_ID_COL, rowHeader, "Tag Id"); writeString(ChartOfAcountsConstants.DESCRIPTION_COL, rowHeader, "Description *"); - //adding data for opening balance bulk import + // adding data for opening balance bulk import writeString(ChartOfAcountsConstants.OFFICE_COL, rowHeader, "Parent Office for Opening Balance"); writeString(ChartOfAcountsConstants.OFFICE_COL_ID, rowHeader, "Parent Office Code Opening Balance"); writeString(ChartOfAcountsConstants.CURRENCY_CODE, rowHeader, "Currency Code"); @@ -309,7 +312,7 @@ public class ChartOfAccountsWorkbook extends AbstractWorkbookPopulator { writeString(ChartOfAcountsConstants.LOOKUP_TAG_ID_COL, rowHeader, "Lookup Tag Id"); writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_NAME_COL, rowHeader, "Lookup Account name *"); writeString(ChartOfAcountsConstants.LOOKUP_ACCOUNT_ID_COL, rowHeader, "Lookup Account Id"); - //adding lookup for opening balance bulk import + // adding lookup for opening balance bulk import writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_COL, rowHeader, "Lookup Office Name"); writeString(ChartOfAcountsConstants.LOOKUP_OFFICE_ID_COL, rowHeader, "Lookup Office Id"); diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java index 67a384c..f3f4462 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetail.java @@ -41,6 +41,12 @@ public class ScheduledJobDetail extends AbstractPersistableCustom { @Column(name = "display_name") private String jobDisplayName; + @Column(name = "node_id") + private Integer nodeId; + + @Column(name = "is_mismatched_job") + private boolean isMismatchedJob; + @Column(name = "cron_expression") private String cronExpression; @@ -135,6 +141,14 @@ public class ScheduledJobDetail extends AbstractPersistableCustom { this.jobKey = jobKey; } + public boolean getIsMismatchedJob() { + return this.isMismatchedJob; + } + + public void setIsMismatchedJob(final boolean isMismatchedJob) { + this.isMismatchedJob = isMismatchedJob; + } + public void updateErrorLog(final String errorLog) { this.errorLog = errorLog; } @@ -147,6 +161,10 @@ public class ScheduledJobDetail extends AbstractPersistableCustom { this.currentlyRunning = currentlyRunning; } + public Integer getNodeId() { + return this.nodeId; + } + public Map<String, Object> update(final JsonCommand command) { final Map<String, Object> actualChanges = new LinkedHashMap<>(9); diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java index 216a362..c22cc17 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/domain/ScheduledJobDetailRepository.java @@ -18,6 +18,7 @@ */ package org.apache.fineract.infrastructure.jobs.domain; +import java.util.List; import javax.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; @@ -38,4 +39,10 @@ public interface ScheduledJobDetailRepository @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.jobKey = :jobKey") ScheduledJobDetail findByJobKeyWithLock(@Param("jobKey") String jobKey); + @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.isMismatchedJob = :isMismatchedJob") + List<ScheduledJobDetail> findAllMismatchedJobs(@Param("isMismatchedJob") boolean isMismatchedJob); + + @Query("select jobDetail from ScheduledJobDetail jobDetail where jobDetail.nodeId = :nodeId or jobDetail.nodeId = 0") + List<ScheduledJobDetail> findAllJobs(@Param("nodeId") Integer nodeId); + } diff --git a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java similarity index 59% copy from fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java copy to fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java index cf258bd..1bb5c6d 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/accounting/glaccount/domain/GLAccountRepository.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/exception/JobNodeIdMismatchingException.java @@ -16,14 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.fineract.accounting.glaccount.domain; -import java.util.Optional; -import org.springframework.data.jpa.repository.JpaRepository; -import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +package org.apache.fineract.infrastructure.jobs.exception; + +import org.apache.fineract.infrastructure.core.exception.AbstractPlatformDomainRuleException; + +public class JobNodeIdMismatchingException extends AbstractPlatformDomainRuleException { + + public JobNodeIdMismatchingException(final String nodeId, final String nodeIdProvided) { + super("error.msg.job.cannot.execute.on.node." + nodeIdProvided, + "The node id provided `" + nodeIdProvided + "`" + "` does not match with the configured nodeId.", new Object[] { nodeId }); + } -public interface GLAccountRepository extends JpaRepository<GLAccount, Long>, JpaSpecificationExecutor<GLAccount> { - // no added behaviour - //adding behaviour to fetch id by glcode for opening balance bulk import - Optional<GLAccount> findOneByGlCode(String glCode); } diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java index 7417cc6..2bbc911 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobName.java @@ -46,7 +46,8 @@ public enum JobName { "Generate AdhocClient Schedule"), UPDATE_EMAIL_OUTBOUND_WITH_CAMPAIGN_MESSAGE( "Update Email Outbound with campaign message"), EXECUTE_EMAIL( "Execute Email"), UPDATE_TRAIL_BALANCE_DETAILS( - "Update Trial Balance Details"); + "Update Trial Balance Details"), EXECUTE_DIRTY_JOBS( + "Execute All Dirty Jobs"); private final String name; diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java index 3d07344..6bb2061 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/JobRegisterServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.fineract.infrastructure.jobs.domain.JobParameter; import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository; import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail; import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail; +import org.apache.fineract.infrastructure.jobs.exception.JobNodeIdMismatchingException; import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException; import org.apache.fineract.infrastructure.security.service.TenantDetailsService; import org.quartz.JobDataMap; @@ -50,6 +51,7 @@ import org.quartz.TriggerListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextClosedEvent; @@ -118,12 +120,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi this.jobParameterRepository = jobParameterRepository; } + @Value("${node_id:1}") + private String nodeId; + @PostConstruct public void loadAllJobs() { final List<FineractPlatformTenant> allTenants = this.tenantDetailsService.findAllTenants(); for (final FineractPlatformTenant tenant : allTenants) { ThreadLocalContextUtil.setTenant(tenant); - final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(); + final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(nodeId); for (final ScheduledJobDetail jobDetails : scheduledJobDetails) { scheduleJob(jobDetails); jobDetails.updateTriggerMisfired(false); @@ -203,11 +208,12 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi schedulerDetail.updateSuspendedState(false); this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail); if (schedulerDetail.isExecuteInstructionForMisfiredJobs()) { - final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(); + final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs(this.nodeId); for (final ScheduledJobDetail jobDetail : scheduledJobDetails) { if (jobDetail.isTriggerMisfired()) { if (jobDetail.isActiveSchedular()) { executeJob(jobDetail, SchedulerServiceConstants.TRIGGER_TYPE_CRON); + jobDetail.setIsMismatchedJob(false); } final String schedulerName = getSchedulerName(jobDetail); final Scheduler scheduler = this.schedulers.get(schedulerName); @@ -236,7 +242,14 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi @Override public void rescheduleJob(final Long jobId) { final ScheduledJobDetail scheduledJobDetail = this.schedularWritePlatformService.findByJobId(jobId); - rescheduleJob(scheduledJobDetail); + final String nodeIdStored = scheduledJobDetail.getNodeId().toString(); + if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) { + rescheduleJob(scheduledJobDetail); + } else { + scheduledJobDetail.setIsMismatchedJob(true); + this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail); + throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId); + } } @Override @@ -245,7 +258,15 @@ public class JobRegisterServiceImpl implements JobRegisterService, ApplicationLi if (scheduledJobDetail == null) { throw new JobNotFoundException(String.valueOf(jobId)); } - executeJob(scheduledJobDetail, null); + final String nodeIdStored = scheduledJobDetail.getNodeId().toString(); + + if (nodeIdStored.equals(this.nodeId) || nodeIdStored.equals("0")) { + executeJob(scheduledJobDetail, null); + } else { + scheduledJobDetail.setIsMismatchedJob(true); + this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail); + throw new JobNodeIdMismatchingException(nodeIdStored, this.nodeId); + } } @Override diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java index 9924303..388b778 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformService.java @@ -27,7 +27,7 @@ import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail; public interface SchedularWritePlatformService { - List<ScheduledJobDetail> retrieveAllJobs(); + List<ScheduledJobDetail> retrieveAllJobs(String nodeId); ScheduledJobDetail findByJobKey(String triggerKey); diff --git a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java index 0c99d34..51bbf5a 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/infrastructure/jobs/service/SchedularWritePlatformServiceJpaRepositoryImpl.java @@ -58,8 +58,8 @@ public class SchedularWritePlatformServiceJpaRepositoryImpl implements Schedular } @Override - public List<ScheduledJobDetail> retrieveAllJobs() { - return this.scheduledJobDetailsRepository.findAll(); + public List<ScheduledJobDetail> retrieveAllJobs(final String nodeId) { + return this.scheduledJobDetailsRepository.findAllJobs(Integer.parseInt(nodeId)); } @Override diff --git a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java index 9f93db5..b0c5625 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java +++ b/fineract-provider/src/main/java/org/apache/fineract/portfolio/loanaccount/domain/Loan.java @@ -1647,7 +1647,8 @@ public class Loan extends AbstractPersistableCustom { this.fixedPrincipalPercentagePerInstallment)) { this.fixedPrincipalPercentagePerInstallment = command .bigDecimalValueOfParameterNamed(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName); - actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName, this.fixedPrincipalPercentagePerInstallment); + actualChanges.put(LoanApiConstants.fixedPrincipalPercentagePerInstallmentParamName, + this.fixedPrincipalPercentagePerInstallment); } return actualChanges; diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java index 3c63230..e0f0cc5 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java +++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerService.java @@ -39,4 +39,6 @@ public interface ScheduledJobRunnerService { void postDividends() throws JobExecutionException; void updateTrialBalanceDetails() throws JobExecutionException; + + void executeMissMatchedJobs() throws JobExecutionException; } diff --git a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java index 4dfe186..a9260c7 100644 --- a/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java +++ b/fineract-provider/src/main/java/org/apache/fineract/scheduledjobs/service/ScheduledJobRunnerServiceImpl.java @@ -38,8 +38,11 @@ import org.apache.fineract.infrastructure.core.service.DateUtils; import org.apache.fineract.infrastructure.core.service.RoutingDataSourceServiceFactory; import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil; import org.apache.fineract.infrastructure.jobs.annotation.CronTarget; +import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail; +import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetailRepository; import org.apache.fineract.infrastructure.jobs.exception.JobExecutionException; import org.apache.fineract.infrastructure.jobs.service.JobName; +import org.apache.fineract.infrastructure.jobs.service.JobRegisterService; import org.apache.fineract.portfolio.savings.DepositAccountType; import org.apache.fineract.portfolio.savings.DepositAccountUtils; import org.apache.fineract.portfolio.savings.data.DepositAccountData; @@ -53,6 +56,7 @@ import org.apache.fineract.portfolio.shareaccounts.service.ShareAccountSchedular import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -74,6 +78,11 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService private final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService; private final ShareAccountSchedularService shareAccountSchedularService; private final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper; + private final JobRegisterService jobRegisterService; + private final ScheduledJobDetailRepository scheduledJobDetailsRepository; + + @Value("${node_id:1}") + private String nodeId; @Autowired public ScheduledJobRunnerServiceImpl(final RoutingDataSourceServiceFactory dataSourceServiceFactory, @@ -83,7 +92,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService final DepositAccountWritePlatformService depositAccountWritePlatformService, final ShareAccountDividendReadPlatformService shareAccountDividendReadPlatformService, final ShareAccountSchedularService shareAccountSchedularService, - final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper) { + final TrialBalanceRepositoryWrapper trialBalanceRepositoryWrapper, final JobRegisterService jobRegisterService, + final ScheduledJobDetailRepository scheduledJobDetailsRepository) { this.dataSourceServiceFactory = dataSourceServiceFactory; this.savingsAccountWritePlatformService = savingsAccountWritePlatformService; this.savingsAccountChargeReadPlatformService = savingsAccountChargeReadPlatformService; @@ -92,6 +102,8 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService this.shareAccountDividendReadPlatformService = shareAccountDividendReadPlatformService; this.shareAccountSchedularService = shareAccountSchedularService; this.trialBalanceRepositoryWrapper = trialBalanceRepositoryWrapper; + this.jobRegisterService = jobRegisterService; + this.scheduledJobDetailsRepository = scheduledJobDetailsRepository; } @Transactional @@ -482,4 +494,16 @@ public class ScheduledJobRunnerServiceImpl implements ScheduledJobRunnerService } + @Override + @CronTarget(jobName = JobName.EXECUTE_DIRTY_JOBS) + public void executeMissMatchedJobs() throws JobExecutionException { + List<ScheduledJobDetail> jobDetails = this.scheduledJobDetailsRepository.findAllMismatchedJobs(true); + + for (ScheduledJobDetail scheduledJobDetail : jobDetails) { + if (scheduledJobDetail.getNodeId().toString().equals(this.nodeId)) { + jobRegisterService.executeJob(scheduledJobDetail.getId()); + } + } + } + } diff --git a/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql new file mode 100644 index 0000000..9f35521 --- /dev/null +++ b/fineract-provider/src/main/resources/sql/migrations/core_db/V373__node_aware_scheduler_jobs.sql @@ -0,0 +1,26 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. +-- + +ALTER TABLE job ADD COLUMN node_id int; + +UPDATE job SET node_id = 1; + +ALTER TABLE job ADD COLUMN is_mismatched_job TINYINT(1) DEFAULT 1; + +INSERT INTO `job` (`node_id`, `name`, `display_name`, `cron_expression`, `create_time`, `task_priority`, `group_name`, `previous_run_start_time`, `next_run_time`, `job_key`, `initializing_errorlog`, `is_active`, `currently_running`, `updates_allowed`, `scheduler_group`, `is_misfired`,`is_mismatched_job`) VALUES (0,'Execute All Dirty Jobs', 'Execute All Dirty Jobs', '0 1 0 1/1 * ? *', now(), 5, NULL, NULL, NULL, 'Execute All Dirty JobsJobDetail1 _ DEFAULT', NULL, 1, 0, 1, 0, 0,0); diff --git a/kubernetes/fineract-server-deployment.yml b/kubernetes/fineract-server-deployment.yml index b11d60a..1616318 100644 --- a/kubernetes/fineract-server-deployment.yml +++ b/kubernetes/fineract-server-deployment.yml @@ -87,6 +87,8 @@ spec: value: jdbc - name: SUB_PROTOCOL value: mysql:thin + - name: node_id + value: 1 - name: fineract_tenants_driver value: org.drizzle.jdbc.DrizzleDriver - name: fineract_tenants_url