This is an automated email from the ASF dual-hosted git repository. mmoayyed pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/master by this push: new f1cd7ccec1 SYNCOPE-1709: Persist Jobs' current status in the database to support multi-node deployments (#390) f1cd7ccec1 is described below commit f1cd7ccec1382a696d9e09b3454a227de9f55558 Author: Misagh Moayyed <mm1...@gmail.com> AuthorDate: Mon Nov 14 20:00:38 2022 +0400 SYNCOPE-1709: Persist Jobs' current status in the database to support multi-node deployments (#390) Co-authored-by: Francesco Chicchiriccò <ilgro...@apache.org> --- .../core/logic/AbstractExecutableLogic.java | 9 +++- .../syncope/core/logic/AbstractJobLogic.java | 23 ++++---- .../syncope/core/logic/IdRepoLogicContext.java | 17 +++++- .../syncope/core/logic/NotificationLogic.java | 6 ++- .../org/apache/syncope/core/logic/ReportLogic.java | 9 ++-- .../org/apache/syncope/core/logic/TaskLogic.java | 4 +- .../apache/syncope/core/logic/ReportLogicTest.java | 11 +++- .../core/persistence/api/dao/JobStatusDAO.java} | 15 +++--- .../core/persistence/api/dao/Reportlet.java | 5 +- .../core/persistence/api/entity/JobStatus.java} | 13 ++--- .../core/persistence/jpa/PersistenceContext.java | 8 +++ .../core/persistence/jpa/dao/JPAJobStatusDAO.java | 49 +++++++++++++++++ .../persistence/jpa/entity/JPAEntityFactory.java | 3 ++ .../core/persistence/jpa/entity/JPAJobStatus.java} | 29 +++++++--- .../provisioning/api/event/JobStatusEvent.java | 52 ++++++++++++++++++ .../core/provisioning/api/job/JobDelegate.java | 2 - .../provisioning/java/ProvisioningContext.java | 37 +++++++++++-- .../java/job/AbstractInterruptableJob.java | 5 -- .../java/job/AbstractSchedTaskJobDelegate.java | 21 +++++--- .../job/GroupMemberProvisionTaskJobDelegate.java | 6 +-- .../provisioning/java/job/JobStatusUpdater.java | 61 ++++++++++++++++++++++ .../DefaultNotificationJobDelegate.java | 21 +++++--- .../java/job/report/AbstractReportlet.java | 17 ++++-- .../java/job/report/AuditReportlet.java | 13 ++--- .../java/job/report/DefaultReportJobDelegate.java | 44 +++++++++++----- .../java/job/report/GroupReportlet.java | 9 ++-- .../java/job/report/ReconciliationReportlet.java | 25 ++++----- .../java/job/report/StaticReportlet.java | 3 +- .../java/job/report/UserReportlet.java | 7 ++- .../java/pushpull/PullJobDelegate.java | 32 +++++------- .../java/pushpull/PushJobDelegate.java | 31 +++++------ .../java/pushpull/SinglePullJobDelegate.java | 3 ++ .../java/pushpull/SinglePushJobDelegate.java | 4 ++ .../pushpull/stream/StreamPullJobDelegate.java | 4 ++ .../pushpull/stream/StreamPushJobDelegate.java | 4 ++ .../java/job/JobStatusUpdaterTest.java | 50 ++++++++++++++++++ 36 files changed, 490 insertions(+), 162 deletions(-) diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java index db6447eb2a..406ae4abe0 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java @@ -26,14 +26,19 @@ import org.apache.syncope.common.lib.to.ExecTO; import org.apache.syncope.common.lib.to.JobTO; import org.apache.syncope.common.lib.types.JobAction; import org.apache.syncope.common.rest.api.batch.BatchResponseItem; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.search.OrderByClause; import org.apache.syncope.core.provisioning.api.job.JobManager; import org.springframework.scheduling.quartz.SchedulerFactoryBean; public abstract class AbstractExecutableLogic<T extends EntityTO> extends AbstractJobLogic<T> { - public AbstractExecutableLogic(final JobManager jobManager, final SchedulerFactoryBean scheduler) { - super(jobManager, scheduler); + public AbstractExecutableLogic( + final JobManager jobManager, + final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO) { + + super(jobManager, scheduler, jobStatusDAO); } public abstract ExecTO execute(String key, OffsetDateTime startAt, boolean dryRun); diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java index c88788507a..275cc847e0 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java @@ -20,19 +20,20 @@ package org.apache.syncope.core.logic; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.commons.lang3.tuple.Triple; import org.apache.syncope.common.lib.to.EntityTO; import org.apache.syncope.common.lib.to.JobTO; import org.apache.syncope.common.lib.types.JobAction; import org.apache.syncope.common.lib.types.JobType; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.JobStatus; import org.apache.syncope.core.provisioning.api.job.JobManager; import org.apache.syncope.core.provisioning.api.utils.FormatUtils; -import org.apache.syncope.core.provisioning.java.job.AbstractInterruptableJob; import org.apache.syncope.core.provisioning.java.job.SystemLoadReporterJob; import org.apache.syncope.core.provisioning.java.job.TaskJob; import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob; import org.apache.syncope.core.provisioning.java.job.report.ReportJob; -import org.apache.syncope.core.spring.ApplicationContextProvider; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; @@ -48,9 +49,16 @@ abstract class AbstractJobLogic<T extends EntityTO> extends AbstractTransactiona protected final SchedulerFactoryBean scheduler; - protected AbstractJobLogic(final JobManager jobManager, final SchedulerFactoryBean scheduler) { + protected final JobStatusDAO jobStatusDAO; + + protected AbstractJobLogic( + final JobManager jobManager, + final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO) { + this.jobManager = jobManager; this.scheduler = scheduler; + this.jobStatusDAO = jobStatusDAO; } protected abstract Triple<JobType, String, String> getReference(JobKey jobKey); @@ -93,12 +101,9 @@ abstract class AbstractJobLogic<T extends EntityTO> extends AbstractTransactiona jobTO.setStatus("UNKNOWN"); if (jobTO.isRunning()) { try { - Object job = ApplicationContextProvider.getBeanFactory().getBean(jobKey.getName()); - if (job instanceof AbstractInterruptableJob - && ((AbstractInterruptableJob) job).getDelegate() != null) { - - jobTO.setStatus(((AbstractInterruptableJob) job).getDelegate().currentStatus()); - } + jobTO.setStatus(Optional.ofNullable(jobStatusDAO.find(jobTO.getRefDesc())). + map(JobStatus::getStatus). + orElse(jobTO.getStatus())); } catch (NoSuchBeanDefinitionException e) { LOG.warn("Could not find job {} implementation", jobKey, e); } diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/IdRepoLogicContext.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/IdRepoLogicContext.java index d04964193a..9e004bd247 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/IdRepoLogicContext.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/IdRepoLogicContext.java @@ -52,6 +52,7 @@ import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO; import org.apache.syncope.core.persistence.api.dao.FIQLQueryDAO; import org.apache.syncope.core.persistence.api.dao.GroupDAO; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.MailTemplateDAO; import org.apache.syncope.core.persistence.api.dao.NotificationDAO; import org.apache.syncope.core.persistence.api.dao.OIDCRPClientAppDAO; @@ -378,10 +379,11 @@ public class IdRepoLogicContext { public NotificationLogic notificationLogic( final NotificationDataBinder binder, final JobManager jobManager, + final JobStatusDAO jobStatusDAO, final SchedulerFactoryBean scheduler, final NotificationDAO notificationDAO) { - return new NotificationLogic(jobManager, scheduler, notificationDAO, binder); + return new NotificationLogic(jobManager, scheduler, jobStatusDAO, notificationDAO, binder); } @ConditionalOnMissingBean @@ -435,11 +437,20 @@ public class IdRepoLogicContext { final ConfParamOps confParamOps, final ReportDataBinder binder, final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO, final ReportDAO reportDAO, final EntityFactory entityFactory, final ReportExecDAO reportExecDAO) { - return new ReportLogic(jobManager, scheduler, reportDAO, reportExecDAO, confParamOps, binder, entityFactory); + return new ReportLogic( + jobManager, + scheduler, + jobStatusDAO, + reportDAO, + reportExecDAO, + confParamOps, + binder, + entityFactory); } @ConditionalOnMissingBean @@ -518,6 +529,7 @@ public class IdRepoLogicContext { final TaskExecDAO taskExecDAO, final TaskDAO taskDAO, final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO, final ConfParamOps confParamOps, final ExternalResourceDAO externalResourceDAO, final NotificationJobDelegate notificationJobDelegate, @@ -528,6 +540,7 @@ public class IdRepoLogicContext { return new TaskLogic( jobManager, scheduler, + jobStatusDAO, taskDAO, taskExecDAO, externalResourceDAO, diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java index 1bf6e8312e..9fea398973 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java @@ -28,6 +28,7 @@ import org.apache.syncope.common.lib.to.NotificationTO; import org.apache.syncope.common.lib.types.IdRepoEntitlement; import org.apache.syncope.common.lib.types.JobAction; import org.apache.syncope.common.lib.types.JobType; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.NotFoundException; import org.apache.syncope.core.persistence.api.dao.NotificationDAO; import org.apache.syncope.core.persistence.api.entity.Notification; @@ -48,10 +49,11 @@ public class NotificationLogic extends AbstractJobLogic<NotificationTO> { public NotificationLogic( final JobManager jobManager, final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO, final NotificationDAO notificationDAO, final NotificationDataBinder binder) { - super(jobManager, scheduler); + super(jobManager, scheduler, jobStatusDAO); this.notificationDAO = notificationDAO; this.binder = binder; @@ -112,7 +114,7 @@ public class NotificationLogic extends AbstractJobLogic<NotificationTO> { @Override protected Triple<JobType, String, String> getReference(final JobKey jobKey) { return JobManager.NOTIFICATION_JOB.equals(jobKey) - ? Triple.of(JobType.NOTIFICATION, (String) null, NotificationJob.class.getSimpleName()) + ? Triple.of(JobType.NOTIFICATION, null, NotificationJob.class.getSimpleName()) : null; } diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java index 95803a43a3..83565a2bc4 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java @@ -62,6 +62,7 @@ import org.apache.syncope.common.lib.types.ReportExecExportFormat; import org.apache.syncope.common.lib.types.ReportExecStatus; import org.apache.syncope.common.rest.api.RESTHeaders; import org.apache.syncope.common.rest.api.batch.BatchResponseItem; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.NotFoundException; import org.apache.syncope.core.persistence.api.dao.ReportDAO; import org.apache.syncope.core.persistence.api.dao.ReportExecDAO; @@ -112,13 +113,14 @@ public class ReportLogic extends AbstractExecutableLogic<ReportTO> { public ReportLogic( final JobManager jobManager, final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO, final ReportDAO reportDAO, final ReportExecDAO reportExecDAO, final ConfParamOps confParamOps, final ReportDataBinder binder, final EntityFactory entityFactory) { - super(jobManager, scheduler); + super(jobManager, scheduler, jobStatusDAO); this.reportDAO = reportDAO; this.reportExecDAO = reportExecDAO; @@ -419,9 +421,8 @@ public class ReportLogic extends AbstractExecutableLogic<ReportTO> { protected Triple<JobType, String, String> getReference(final JobKey jobKey) { String key = JobNamer.getReportKeyFromJobName(jobKey.getName()); - Report report = reportDAO.find(key); - return Optional.ofNullable(report) - .map(report1 -> Triple.of(JobType.REPORT, key, binder.buildRefDesc(report1))).orElse(null); + return Optional.ofNullable(reportDAO.find(key)). + map(f -> Triple.of(JobType.REPORT, key, binder.buildRefDesc(f))).orElse(null); } @PreAuthorize("hasRole('" + IdRepoEntitlement.REPORT_LIST + "')") diff --git a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java index 9eb7051e5f..73b1a2c302 100644 --- a/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java +++ b/core/idrepo/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java @@ -49,6 +49,7 @@ import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.common.rest.api.RESTHeaders; import org.apache.syncope.common.rest.api.batch.BatchResponseItem; import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.NotFoundException; import org.apache.syncope.core.persistence.api.dao.NotificationDAO; import org.apache.syncope.core.persistence.api.dao.TaskDAO; @@ -107,6 +108,7 @@ public class TaskLogic extends AbstractExecutableLogic<TaskTO> { public TaskLogic( final JobManager jobManager, final SchedulerFactoryBean scheduler, + final JobStatusDAO jobStatusDAO, final TaskDAO taskDAO, final TaskExecDAO taskExecDAO, final ExternalResourceDAO resourceDAO, @@ -117,7 +119,7 @@ public class TaskLogic extends AbstractExecutableLogic<TaskTO> { final NotificationJobDelegate notificationJobDelegate, final TaskUtilsFactory taskUtilsFactory) { - super(jobManager, scheduler); + super(jobManager, scheduler, jobStatusDAO); this.taskDAO = taskDAO; this.taskExecDAO = taskExecDAO; diff --git a/core/idrepo/logic/src/test/java/org/apache/syncope/core/logic/ReportLogicTest.java b/core/idrepo/logic/src/test/java/org/apache/syncope/core/logic/ReportLogicTest.java index 366a1f3b67..61391676d2 100644 --- a/core/idrepo/logic/src/test/java/org/apache/syncope/core/logic/ReportLogicTest.java +++ b/core/idrepo/logic/src/test/java/org/apache/syncope/core/logic/ReportLogicTest.java @@ -35,6 +35,7 @@ import org.apache.syncope.core.persistence.api.dao.ReportDAO; import org.apache.syncope.core.persistence.api.dao.ReportExecDAO; import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.ReportExec; +import org.apache.syncope.core.provisioning.api.data.ReportDataBinder; import org.apache.syncope.core.provisioning.api.job.report.ReportJobDelegate; import org.apache.syncope.core.provisioning.java.job.report.DefaultReportJobDelegate; import org.apache.syncope.core.spring.security.SyncopeAuthenticationDetails; @@ -43,6 +44,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.security.authentication.UsernamePasswordAuthenticationToken; import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; @@ -81,6 +83,12 @@ public class ReportLogicTest extends AbstractTest { @Autowired private EntityFactory entityFactory; + @Autowired + private ReportDataBinder reportDataBinder; + + @Autowired + private ApplicationEventPublisher publisher; + private void checkExport(final String execKey, final ReportExecExportFormat fmt) throws IOException { ReportExecExportFormat format = Optional.ofNullable(fmt).orElse(ReportExecExportFormat.XML); ReportExec reportExec = logic.getReportExec(execKey); @@ -104,7 +112,8 @@ public class ReportLogicTest extends AbstractTest { report = logic.read(report.getKey()); assertTrue(report.getExecutions().isEmpty()); - ReportJobDelegate delegate = new DefaultReportJobDelegate(reportDAO, reportExecDAO, entityFactory); + ReportJobDelegate delegate = new DefaultReportJobDelegate( + reportDAO, reportExecDAO, entityFactory, reportDataBinder, publisher); delegate.execute(report.getKey(), "test"); report = logic.read(report.getKey()); diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java similarity index 73% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java index 1124fb52ae..bb50b6485d 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java @@ -16,16 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.api.dao; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { +import org.apache.syncope.core.persistence.api.entity.JobStatus; + +public interface JobStatusDAO extends DAO<JobStatus> { - String currentStatus(); + JobStatus find(String key); - void interrupt(); + JobStatus save(JobStatus jobStatus); - boolean isInterrupted(); + void delete(String key); } diff --git a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java index 83a260f6be..22c4468b0c 100644 --- a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java @@ -18,7 +18,6 @@ */ package org.apache.syncope.core.persistence.api.dao; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.common.lib.report.ReportletConf; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -43,8 +42,8 @@ public interface Reportlet { * Actual data extraction for reporting. * * @param handler SAX content handler for streaming result - * @param status current report status (for job reporting) + * @param refDesc current report status (for job reporting) * @throws SAXException if there is any problem in SAX handling */ - void extract(ContentHandler handler, AtomicReference<String> status) throws SAXException; + void extract(ContentHandler handler, String refDesc) throws SAXException; } diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java similarity index 73% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java index 1124fb52ae..ae98e37051 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java @@ -16,16 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.api.entity; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { - - String currentStatus(); +public interface JobStatus extends ProvidedKeyEntity { - void interrupt(); + String getStatus(); - boolean isInterrupted(); + void setStatus(String status); } diff --git a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/PersistenceContext.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/PersistenceContext.java index 89bb5beb24..a7acb384c7 100644 --- a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/PersistenceContext.java +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/PersistenceContext.java @@ -49,6 +49,7 @@ import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO; import org.apache.syncope.core.persistence.api.dao.FIQLQueryDAO; import org.apache.syncope.core.persistence.api.dao.GroupDAO; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.MailTemplateDAO; import org.apache.syncope.core.persistence.api.dao.NotificationDAO; import org.apache.syncope.core.persistence.api.dao.OIDCJWKSDAO; @@ -106,6 +107,7 @@ import org.apache.syncope.core.persistence.jpa.dao.JPAExternalResourceDAO; import org.apache.syncope.core.persistence.jpa.dao.JPAFIQLQueryDAO; import org.apache.syncope.core.persistence.jpa.dao.JPAGroupDAO; import org.apache.syncope.core.persistence.jpa.dao.JPAImplementationDAO; +import org.apache.syncope.core.persistence.jpa.dao.JPAJobStatusDAO; import org.apache.syncope.core.persistence.jpa.dao.JPAMailTemplateDAO; import org.apache.syncope.core.persistence.jpa.dao.JPANotificationDAO; import org.apache.syncope.core.persistence.jpa.dao.JPAOIDCJWKSDAO; @@ -524,6 +526,12 @@ public class PersistenceContext { return new JPAImplementationDAO(resourceDAO, entityCacheDAO); } + @ConditionalOnMissingBean + @Bean + public JobStatusDAO jobStatusDAO() { + return new JPAJobStatusDAO(); + } + @ConditionalOnMissingBean @Bean public MailTemplateDAO mailTemplateDAO() { diff --git a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java new file mode 100644 index 0000000000..560efd1d13 --- /dev/null +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.persistence.jpa.dao; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.JobStatus; +import org.apache.syncope.core.persistence.jpa.entity.JPAJobStatus; +import org.springframework.transaction.annotation.Transactional; + +public class JPAJobStatusDAO extends AbstractDAO<JobStatus> implements JobStatusDAO { + + @Transactional(readOnly = true) + @Override + public JobStatus find(final String key) { + return entityManager().find(JPAJobStatus.class, key); + } + + @Transactional + @Override + public JobStatus save(final JobStatus jobStatus) { + return entityManager().merge(jobStatus); + } + + @Transactional + @Override + public void delete(final String key) { + JobStatus jobStatus = find(key); + if (jobStatus != null) { + entityManager().remove(jobStatus); + } + + } +} diff --git a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java index 0b9ee7c7df..4798a2c919 100644 --- a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java @@ -38,6 +38,7 @@ import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.ExternalResource; import org.apache.syncope.core.persistence.api.entity.FIQLQuery; import org.apache.syncope.core.persistence.api.entity.Implementation; +import org.apache.syncope.core.persistence.api.entity.JobStatus; import org.apache.syncope.core.persistence.api.entity.MailTemplate; import org.apache.syncope.core.persistence.api.entity.Notification; import org.apache.syncope.core.persistence.api.entity.PlainSchema; @@ -306,6 +307,8 @@ public class JPAEntityFactory implements EntityFactory { result = (E) new JPADelegation(); } else if (reference.equals(FIQLQuery.class)) { result = (E) new JPAFIQLQuery(); + } else if (reference.equals(JobStatus.class)) { + result = (E) new JPAJobStatus(); } else if (reference.equals(SRARoute.class)) { result = (E) new JPASRARoute(); } else if (reference.equals(AuthModule.class)) { diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java similarity index 55% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java index 1124fb52ae..5cda0127f9 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java @@ -16,16 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.jpa.entity; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { +import javax.persistence.Entity; +import javax.persistence.Table; +import org.apache.syncope.core.persistence.api.entity.JobStatus; + +@Entity +@Table(name = JPAJobStatus.TABLE) +public class JPAJobStatus extends AbstractProvidedKeyEntity implements JobStatus { + + private static final long serialVersionUID = 9061740216669505871L; + + public static final String TABLE = "JobStatus"; - String currentStatus(); + private String jobStatus; - void interrupt(); + @Override + public String getStatus() { + return jobStatus; + } - boolean isInterrupted(); + @Override + public void setStatus(final String status) { + jobStatus = status; + } } diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java new file mode 100644 index 0000000000..0df61e7082 --- /dev/null +++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.api.event; + +import org.springframework.context.ApplicationEvent; + +public class JobStatusEvent extends ApplicationEvent { + + private static final long serialVersionUID = 373067530016978592L; + + private final String jobRefDesc; + + private final String jobStatus; + + public JobStatusEvent(final Object source, final String jobRefDesc, final String jobStatus) { + super(source); + this.jobRefDesc = jobRefDesc; + this.jobStatus = jobStatus; + } + + public String getJobRefDesc() { + return jobRefDesc; + } + + public String getJobStatus() { + return jobStatus; + } + + @Override + public String toString() { + return "JobStatusEvent{" + + "jobRefDesc=" + jobRefDesc + + ", jobStatus=" + jobStatus + + '}'; + } +} diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java index 1124fb52ae..329cad2845 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java @@ -23,8 +23,6 @@ package org.apache.syncope.core.provisioning.api.job; */ public interface JobDelegate { - String currentStatus(); - void interrupt(); boolean isInterrupted(); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java index 16db1fbab8..88b8a961e4 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/ProvisioningContext.java @@ -23,6 +23,7 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import javax.annotation.Resource; import javax.mail.MessagingException; @@ -52,6 +53,7 @@ import org.apache.syncope.core.persistence.api.dao.DynRealmDAO; import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO; import org.apache.syncope.core.persistence.api.dao.GroupDAO; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; import org.apache.syncope.core.persistence.api.dao.MailTemplateDAO; import org.apache.syncope.core.persistence.api.dao.NotificationDAO; import org.apache.syncope.core.persistence.api.dao.PlainAttrDAO; @@ -161,6 +163,7 @@ import org.apache.syncope.core.provisioning.java.data.UserDataBinderImpl; import org.apache.syncope.core.provisioning.java.data.WAConfigDataBinderImpl; import org.apache.syncope.core.provisioning.java.data.wa.WAClientAppDataBinderImpl; import org.apache.syncope.core.provisioning.java.job.DefaultJobManager; +import org.apache.syncope.core.provisioning.java.job.JobStatusUpdater; import org.apache.syncope.core.provisioning.java.job.SchedulerDBInit; import org.apache.syncope.core.provisioning.java.job.SyncopeSpringBeanJobFactory; import org.apache.syncope.core.provisioning.java.job.SystemLoadReporterJob; @@ -187,12 +190,15 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Primary; import org.springframework.core.io.ClassPathResource; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; import org.springframework.jndi.JndiObjectFactoryBean; import org.springframework.mail.javamail.JavaMailSender; @@ -257,6 +263,18 @@ public class ProvisioningContext { }; } + /** + * This is a special thread executor that only created a single worker thread. + * This is necessary to allow job status update operations to queue up serially + * and not via multiple threads to avoid the "lost update" problem. + * + * @return the async task executor + */ + @Bean + public AsyncTaskExecutor jobStatusUpdaterThreadExecutor() { + return new TaskExecutorAdapter(Executors.newSingleThreadExecutor()); + } + /** * Used by {@link org.apache.syncope.core.provisioning.java.propagation.PriorityPropagationTaskExecutor}. * @@ -357,10 +375,17 @@ public class ProvisioningContext { return jobManager; } + @ConditionalOnMissingBean + @Bean + public JobStatusUpdater jobStatusUpdater(final JobStatusDAO jobStatusDAO, final EntityFactory entityFactory) { + return new JobStatusUpdater(jobStatusDAO, entityFactory); + } + @ConditionalOnMissingBean @Bean public JavaMailSender mailSender(final ProvisioningProperties provisioningProperties) throws IllegalArgumentException, IOException { + JavaMailSenderImpl mailSender = new JavaMailSenderImpl() { @Override @@ -771,14 +796,16 @@ public class ProvisioningContext { final TaskDAO taskDAO, final JavaMailSender mailSender, final AuditManager auditManager, - final NotificationManager notificationManager) { + final NotificationManager notificationManager, + final ApplicationEventPublisher publisher) { return new DefaultNotificationJobDelegate( taskDAO, mailSender, taskUtilsFactory, auditManager, - notificationManager); + notificationManager, + publisher); } @ConditionalOnMissingBean @@ -796,9 +823,11 @@ public class ProvisioningContext { public ReportJobDelegate reportJobDelegate( final ReportDAO reportDAO, final ReportExecDAO reportExecDAO, - final EntityFactory entityFactory) { + final EntityFactory entityFactory, + final ReportDataBinder reportDataBinder, + final ApplicationEventPublisher publisher) { - return new DefaultReportJobDelegate(reportDAO, reportExecDAO, entityFactory); + return new DefaultReportJobDelegate(reportDAO, reportExecDAO, entityFactory, reportDataBinder, publisher); } @ConditionalOnMissingBean diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java index b2314c5907..a858d2385b 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java @@ -28,11 +28,6 @@ public abstract class AbstractInterruptableJob implements InterruptableJob { private final JobDelegate embeddedDelegate = new JobDelegate() { - @Override - public String currentStatus() { - return "RUNNING"; - } - @Override public void interrupt() { } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java index 6b6f50f6f5..cbace864bd 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java @@ -19,8 +19,8 @@ package org.apache.syncope.core.provisioning.java.job; import java.time.OffsetDateTime; +import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.common.lib.types.AuditElements; import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.core.persistence.api.dao.TaskDAO; @@ -29,6 +29,8 @@ import org.apache.syncope.core.persistence.api.entity.task.SchedTask; import org.apache.syncope.core.persistence.api.entity.task.TaskExec; import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory; import org.apache.syncope.core.provisioning.api.AuditManager; +import org.apache.syncope.core.provisioning.api.data.TaskDataBinder; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.apache.syncope.core.provisioning.api.job.JobManager; import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate; import org.apache.syncope.core.provisioning.api.notification.NotificationManager; @@ -39,6 +41,7 @@ import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.transaction.annotation.Transactional; public abstract class AbstractSchedTaskJobDelegate<T extends SchedTask> implements SchedTaskJobDelegate { @@ -70,6 +73,9 @@ public abstract class AbstractSchedTaskJobDelegate<T extends SchedTask> implemen @Autowired protected TaskUtilsFactory taskUtilsFactory; + @Autowired + protected TaskDataBinder taskDataBinder; + /** * Notification manager. */ @@ -82,15 +88,16 @@ public abstract class AbstractSchedTaskJobDelegate<T extends SchedTask> implemen @Autowired protected AuditManager auditManager; - protected final AtomicReference<String> status = new AtomicReference<>(); + @Autowired + protected ApplicationEventPublisher publisher; protected boolean interrupt; protected boolean interrupted; - @Override - public String currentStatus() { - return status.get(); + protected void setStatus(final String status) { + Objects.requireNonNull(task, "Task cannot be undefined"); + publisher.publishEvent(new JobStatusEvent(this, taskDataBinder.buildRefDesc(task), status)); } @Override @@ -131,7 +138,7 @@ public abstract class AbstractSchedTaskJobDelegate<T extends SchedTask> implemen execution.setTask(task); execution.setExecutor(executor); - status.set("Initialization completed"); + setStatus("Initialization completed"); AuditElements.Result result; @@ -153,7 +160,7 @@ public abstract class AbstractSchedTaskJobDelegate<T extends SchedTask> implemen } task = (T) taskDAO.save(task); - status.set("Done"); + setStatus(null); notificationManager.createTasks( executor, diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java index 5e137695d7..74d9ef1e87 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java @@ -90,13 +90,13 @@ public class GroupMemberProvisionTaskJobDelegate extends AbstractSchedTaskJobDel } result.append("provision\n\n"); - status.set(result.toString()); + setStatus(result.toString()); MembershipCond membershipCond = new MembershipCond(); membershipCond.setGroup(groupKey); List<User> users = searchDAO.search(SearchCond.getLeaf(membershipCond), AnyTypeKind.USER); Collection<String> gResources = groupDAO.findAllResourceKeys(groupKey); - status.set("About to " + setStatus("About to " + (action == ProvisionAction.DEPROVISION ? "de" : "") + "provision " + users.size() + " users from " + gResources); @@ -126,7 +126,7 @@ public class GroupMemberProvisionTaskJobDelegate extends AbstractSchedTaskJobDel membershipCond = new MembershipCond(); membershipCond.setGroup(groupKey); List<AnyObject> anyObjects = searchDAO.search(SearchCond.getLeaf(membershipCond), AnyTypeKind.ANY_OBJECT); - status.set("About to " + setStatus("About to " + (action == ProvisionAction.DEPROVISION ? "de" : "") + "provision " + anyObjects.size() + " any objects from " + gResources); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java new file mode 100644 index 0000000000..04bba663ac --- /dev/null +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.job; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.EntityFactory; +import org.apache.syncope.core.persistence.api.entity.JobStatus; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; + +public class JobStatusUpdater { + + protected final JobStatusDAO jobStatusDAO; + + protected final EntityFactory entityFactory; + + public JobStatusUpdater(final JobStatusDAO jobStatusDAO, final EntityFactory entityFactory) { + this.jobStatusDAO = jobStatusDAO; + this.entityFactory = entityFactory; + } + + /** + * It's important to note that responding to job status updates + * must be done in async mode, and via a separate special thread executor + * that attempts to synchronize job execution serially by only making one thread + * active at a given time. Not doing so will force the event executor to launch + * separate threads per each status update, which would result in multiple concurrent + * INSERT operations on the database, and failing. + * + * @param event the event + */ + @Async("jobStatusUpdaterThreadExecutor") + @EventListener + public void update(final JobStatusEvent event) { + if (event.getJobStatus() == null) { + jobStatusDAO.delete(event.getJobRefDesc()); + } else { + JobStatus jobStatus = entityFactory.newEntity(JobStatus.class); + jobStatus.setKey(event.getJobRefDesc()); + jobStatus.setStatus(event.getJobStatus()); + jobStatusDAO.save(jobStatus); + } + } +} diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java index 4a0afe5dcd..3d51b85347 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java @@ -20,7 +20,6 @@ package org.apache.syncope.core.provisioning.java.job.notification; import java.time.OffsetDateTime; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import javax.mail.internet.MimeMessage; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.types.AuditElements; @@ -31,6 +30,8 @@ import org.apache.syncope.core.persistence.api.entity.task.NotificationTask; import org.apache.syncope.core.persistence.api.entity.task.TaskExec; import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory; import org.apache.syncope.core.provisioning.api.AuditManager; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.apache.syncope.core.provisioning.api.job.JobManager; import org.apache.syncope.core.provisioning.api.notification.NotificationJobDelegate; import org.apache.syncope.core.provisioning.api.notification.NotificationManager; import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2; @@ -38,6 +39,7 @@ import org.apache.syncope.core.spring.security.AuthContextUtils; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.mail.javamail.MimeMessageHelper; import org.springframework.transaction.annotation.Transactional; @@ -56,7 +58,7 @@ public class DefaultNotificationJobDelegate implements NotificationJobDelegate { protected final NotificationManager notificationManager; - protected final AtomicReference<String> status = new AtomicReference<>(); + protected final ApplicationEventPublisher publisher; protected boolean interrupt; @@ -67,18 +69,19 @@ public class DefaultNotificationJobDelegate implements NotificationJobDelegate { final JavaMailSender mailSender, final TaskUtilsFactory taskUtilsFactory, final AuditManager auditManager, - final NotificationManager notificationManager) { + final NotificationManager notificationManager, + final ApplicationEventPublisher publisher) { this.taskDAO = taskDAO; this.mailSender = mailSender; this.taskUtilsFactory = taskUtilsFactory; this.auditManager = auditManager; this.notificationManager = notificationManager; + this.publisher = publisher; } - @Override - public String currentStatus() { - return status.get(); + protected void setStatus(final String status) { + publisher.publishEvent(new JobStatusEvent(this, JobManager.NOTIFICATION_JOB.getName(), status)); } @Override @@ -127,7 +130,7 @@ public class DefaultNotificationJobDelegate implements NotificationJobDelegate { + task.getTextBody() + '\n'); } - status.set("Sending notifications to " + task.getRecipients()); + setStatus("Sending notifications to " + task.getRecipients()); for (String to : task.getRecipients()) { try { @@ -219,7 +222,7 @@ public class DefaultNotificationJobDelegate implements NotificationJobDelegate { public void execute(final String executor) throws JobExecutionException { List<NotificationTask> tasks = taskDAO.<NotificationTask>findToExec(TaskType.NOTIFICATION); - status.set("Sending out " + tasks.size() + " notifications"); + setStatus("Sending out " + tasks.size() + " notifications"); for (int i = 0; i < tasks.size() && !interrupt; i++) { LOG.debug("Found notification task {} to be executed: starting...", tasks.get(i)); @@ -230,6 +233,8 @@ public class DefaultNotificationJobDelegate implements NotificationJobDelegate { LOG.debug("Notification job interrupted"); interrupted = true; } + + setStatus(null); } protected static boolean hasToBeRegistered(final TaskExec<NotificationTask> execution) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java index 86f867cb47..f95ea52102 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java @@ -18,11 +18,13 @@ */ package org.apache.syncope.core.provisioning.java.job.report; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.common.lib.report.ReportletConf; import org.apache.syncope.core.persistence.api.dao.Reportlet; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -32,6 +34,9 @@ public abstract class AbstractReportlet implements Reportlet { protected static final Logger LOG = LoggerFactory.getLogger(AbstractReportlet.class); + @Autowired + protected ApplicationEventPublisher publisher; + protected ReportletConf conf; @Override @@ -39,18 +44,22 @@ public abstract class AbstractReportlet implements Reportlet { this.conf = conf; } - protected abstract void doExtract(ReportletConf conf, ContentHandler handler, AtomicReference<String> status) + protected void setStatus(final String refDesc, final String status) { + publisher.publishEvent(new JobStatusEvent(this, refDesc, status)); + } + + protected abstract void doExtract(ReportletConf conf, ContentHandler handler, String refDesc) throws SAXException; @Override @Transactional(readOnly = true) - public void extract(final ContentHandler handler, final AtomicReference<String> status) throws SAXException { + public void extract(final ContentHandler handler, final String refDesc) throws SAXException { AttributesImpl atts = new AttributesImpl(); atts.addAttribute("", "", ReportXMLConst.ATTR_NAME, ReportXMLConst.XSD_STRING, conf.getName()); atts.addAttribute("", "", ReportXMLConst.ATTR_CLASS, ReportXMLConst.XSD_STRING, getClass().getName()); handler.startElement("", "", ReportXMLConst.ELEMENT_REPORTLET, atts); - doExtract(conf, handler, status); + doExtract(conf, handler, refDesc); handler.endElement("", "", ReportXMLConst.ELEMENT_REPORTLET); } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java index 26161914dd..b605bdd994 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java @@ -20,7 +20,6 @@ package org.apache.syncope.core.provisioning.java.job.report; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import javax.sql.DataSource; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.audit.AuditEntry; @@ -47,8 +46,10 @@ public class AuditReportlet extends AbstractReportlet { private DataSource datasource; - private void doExtractConf(final ContentHandler handler, final AtomicReference<String> status) throws SAXException { - status.set("Fetching " + conf.getSize() + " rows from the " + AuditConfDAO.AUDIT_ENTRY_TABLE + " table"); + private void doExtractConf(final ContentHandler handler, final String refDesc) throws SAXException { + setStatus( + refDesc, + "Fetching " + conf.getSize() + " rows from the " + AuditConfDAO.AUDIT_ENTRY_TABLE + " table"); JdbcTemplate jdbcTemplate = new JdbcTemplate(datasource); jdbcTemplate.setMaxRows(conf.getSize()); @@ -122,14 +123,14 @@ public class AuditReportlet extends AbstractReportlet { } handler.endElement("", "", "events"); - status.set("Fetched " + conf.getSize() + " rows from the " + AuditConfDAO.AUDIT_ENTRY_TABLE + " table"); + setStatus(refDesc, "Fetched " + conf.getSize() + " rows from the " + AuditConfDAO.AUDIT_ENTRY_TABLE + " table"); } @Override protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof AuditReportletConf) { @@ -143,6 +144,6 @@ public class AuditReportlet extends AbstractReportlet { throw new ReportException(new IllegalArgumentException("Could not get to DataSource")); } - doExtractConf(handler, status); + doExtractConf(handler, refDesc); } } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java index b2183d0979..1004593e9e 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.OffsetDateTime; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Deflater; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -42,12 +41,15 @@ import org.apache.syncope.core.persistence.api.dao.Reportlet; import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.Report; import org.apache.syncope.core.persistence.api.entity.ReportExec; +import org.apache.syncope.core.provisioning.api.data.ReportDataBinder; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.apache.syncope.core.provisioning.api.job.report.ReportJobDelegate; import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2; import org.apache.syncope.core.spring.implementation.ImplementationManager; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.helpers.AttributesImpl; @@ -73,7 +75,11 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { protected final EntityFactory entityFactory; - protected final AtomicReference<String> status = new AtomicReference<>(); + protected final ReportDataBinder reportDataBinder; + + protected final ApplicationEventPublisher publisher; + + protected Report report; protected boolean interrupt; @@ -82,16 +88,28 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { public DefaultReportJobDelegate( final ReportDAO reportDAO, final ReportExecDAO reportExecDAO, - final EntityFactory entityFactory) { + final EntityFactory entityFactory, + final ReportDataBinder reportDataBinder, + final ApplicationEventPublisher publisher) { this.reportDAO = reportDAO; this.reportExecDAO = reportExecDAO; this.entityFactory = entityFactory; + this.reportDataBinder = reportDataBinder; + this.publisher = publisher; } - @Override - public String currentStatus() { - return status.get(); + /** + * Sets status. + * Updating job status is done via an event publishing mechanism + * to allow underlying updates to be done in separate threads, + * and not as part of the current transaction. + * + * @param status the status + */ + protected void setStatus(final String status) { + LOG.debug("Status update: {} {}", reportDataBinder.buildRefDesc(report), status); + publisher.publishEvent(new JobStatusEvent(this, reportDataBinder.buildRefDesc(report), status)); } @Override @@ -107,7 +125,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { @Transactional @Override public void execute(final String reportKey, final String executor) throws JobExecutionException { - Report report = reportDAO.find(reportKey); + report = reportDAO.find(reportKey); if (report == null) { throw new JobExecutionException("Report " + reportKey + " not found"); } @@ -153,7 +171,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { execution.setStatus(ReportExecStatus.RUNNING); execution = reportExecDAO.save(execution); - status.set("Starting"); + setStatus("Starting"); // 3. actual report execution StringBuilder reportExecutionMessage = new StringBuilder(); @@ -164,15 +182,15 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { atts.addAttribute("", "", ReportXMLConst.ATTR_NAME, ReportXMLConst.XSD_STRING, report.getName()); handler.startElement("", "", ReportXMLConst.ELEMENT_REPORT, atts); - status.set("Generating report header"); + setStatus("Generating report header"); // iterate over reportlet instances defined for this report for (int i = 0; i < report.getReportlets().size() && !interrupt; i++) { Optional<Reportlet> reportlet = ImplementationManager.buildReportlet(report.getReportlets().get(i)); if (reportlet.isPresent()) { try { - status.set("Invoking reportlet " + report.getReportlets().get(i).getKey()); - reportlet.get().extract(handler, status); + setStatus("Invoking reportlet " + report.getReportlets().get(i).getKey()); + reportlet.get().extract(handler, reportDataBinder.buildRefDesc(report)); } catch (Throwable t) { LOG.error("While executing reportlet {} for report {}", report.getReportlets().get(i).getKey(), reportKey, t); @@ -194,7 +212,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { } // report footer - status.set("Generating report footer"); + setStatus("Generating report footer"); handler.endElement("", "", ReportXMLConst.ELEMENT_REPORT); handler.endDocument(); @@ -208,7 +226,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { throw new JobExecutionException(e, true); } finally { - status.set("Completed"); + setStatus(null); try { zos.closeEntry(); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java index bf11408b85..b9b28a2fe5 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java @@ -21,7 +21,6 @@ package org.apache.syncope.core.provisioning.java.job.report; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.Attr; import org.apache.syncope.common.lib.EntityTOUtils; @@ -306,7 +305,7 @@ public class GroupReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof GroupReportletConf) { @@ -320,10 +319,10 @@ public class GroupReportlet extends AbstractReportlet { int total = count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); List<Group> groups; if (StringUtils.isBlank(this.conf.getMatchingCond())) { @@ -342,7 +341,7 @@ public class GroupReportlet extends AbstractReportlet { doExtract(handler, groups); - status.set("Processed " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processed " + total + " groups: page " + page + " of " + pages); } } } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java index 9bc0f66502..a0473245f1 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.syncope.common.lib.SyncopeConstants; @@ -365,7 +364,7 @@ public class ReconciliationReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof ReconciliationReportletConf) { @@ -380,13 +379,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = userDAO.count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.USER) + 's', atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); doExtract(handler, userDAO.findAll(page, AnyDAO.DEFAULT_PAGE_SIZE)); } @@ -401,13 +400,13 @@ public class ReconciliationReportlet extends AbstractReportlet { AnyTypeKind.USER); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.USER) + 's', atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); doExtract(handler, searchDAO.search( realmDAO.getRoot(), @@ -427,13 +426,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = groupDAO.count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.GROUP) + 's', atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); doExtract(handler, groupDAO.findAll(page, AnyDAO.DEFAULT_PAGE_SIZE)); } @@ -448,13 +447,13 @@ public class ReconciliationReportlet extends AbstractReportlet { AnyTypeKind.GROUP); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.GROUP) + 's', atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); doExtract(handler, searchDAO.search( realmDAO.getRoot(), @@ -487,7 +486,9 @@ public class ReconciliationReportlet extends AbstractReportlet { AnyTypeKind.ANY_OBJECT); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " any objects " + anyType.getKey() + " in " + pages + " pages"); + setStatus( + refDesc, + "Processing " + total + " any objects " + anyType.getKey() + " in " + pages + " pages"); atts.clear(); atts.addAttribute("", "", "type", ReportXMLConst.XSD_STRING, anyType.getKey()); @@ -495,7 +496,7 @@ public class ReconciliationReportlet extends AbstractReportlet { handler.startElement("", "", getAnyElementName(AnyTypeKind.ANY_OBJECT) + 's', atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " any objects " + anyType.getKey() + setStatus(refDesc, "Processing " + total + " any objects " + anyType.getKey() + ": page " + page + " of " + pages); doExtract(handler, searchDAO.search( diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java index f6d6f66a0a..23dab1798d 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java @@ -18,7 +18,6 @@ */ package org.apache.syncope.core.provisioning.java.job.report; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.common.lib.report.ReportletConf; import org.apache.syncope.common.lib.report.StaticReportletConf; import org.apache.syncope.core.persistence.api.dao.ReportletConfClass; @@ -74,7 +73,7 @@ public class StaticReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof StaticReportletConf) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java index 018aca327b..af2c4a4c5c 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java @@ -21,7 +21,6 @@ package org.apache.syncope.core.provisioning.java.job.report; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.Attr; import org.apache.syncope.common.lib.EntityTOUtils; @@ -367,7 +366,7 @@ public class UserReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof UserReportletConf) { @@ -381,10 +380,10 @@ public class UserReportlet extends AbstractReportlet { int total = count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); List<User> users; if (StringUtils.isBlank(this.conf.getMatchingCond())) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java index 6f357c7fb8..ac95b54940 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java @@ -117,6 +117,15 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i }); pair.setLeft(pair.getLeft() + 1); pair.setRight(name.getNameValue()); + + if (!handled.isEmpty()) { + StringBuilder builder = new StringBuilder("Processed:\n"); + handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t'). + append(k). + append(" / latest: ").append(v.getRight()). + append('\n')); + setStatus(builder.toString()); + } } @Override @@ -129,21 +138,6 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i this.interrupted = true; } - @Override - public String currentStatus() { - synchronized (status) { - if (!handled.isEmpty()) { - StringBuilder builder = new StringBuilder("Processed:\n"); - handled.forEach((key, value) -> builder.append(' ').append(value.getLeft()).append('\t'). - append(key). - append(" / latest: ").append(value.getRight()). - append('\n')); - status.set(builder.toString()); - } - } - return status.get(); - } - protected void setGroupOwners(final GroupPullResultHandler ghandler) { ghandler.getGroupOwnerMap().forEach((groupKey, ownerKey) -> { Group group = groupDAO.find(groupKey); @@ -249,11 +243,11 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i } } - status.set("Initialization completed"); + setStatus("Initialization completed"); // First realms... if (pullTask.getResource().getOrgUnit() != null) { - status.set("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass()); + setStatus("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass()); OrgUnit orgUnit = pullTask.getResource().getOrgUnit(); @@ -315,7 +309,7 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i filter(provision -> provision.getMapping() != null).sorted(provisionSorter). collect(Collectors.toList())) { - status.set("Pulling " + provision.getObjectClass()); + setStatus("Pulling " + provision.getObjectClass()); AnyType anyType = anyTypeDAO.find(provision.getAnyType()); @@ -416,7 +410,7 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i } } - status.set("Pull done"); + setStatus("Pull done"); String result = createReport(profile.getResults(), pullTask.getResource(), dryRun); LOG.debug("Pull result: {}", result); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java index 124a189059..10a966d2ea 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java @@ -19,7 +19,6 @@ package org.apache.syncope.core.provisioning.java.pushpull; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -83,7 +82,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { protected ProvisioningProfile<PushTask, PushActions> profile; - protected final Map<String, MutablePair<Integer, String>> handled = new HashMap<>(); + protected final Map<String, MutablePair<Integer, String>> handled = new ConcurrentHashMap<>(); protected final Map<String, PushActions> perContextActions = new ConcurrentHashMap<>(); @@ -95,21 +94,15 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { } pair.setLeft(pair.getLeft() + 1); pair.setRight(key); - } - @Override - public String currentStatus() { - synchronized (status) { - if (!handled.isEmpty()) { - StringBuilder builder = new StringBuilder("Processed:\n"); - handled.forEach((key, value) -> builder.append(' ').append(value.getLeft()).append('\t'). - append(key). - append(" / latest: ").append(value.getRight()). - append('\n')); - status.set(builder.toString()); - } + if (!handled.isEmpty()) { + StringBuilder builder = new StringBuilder("Processed:\n"); + handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t'). + append(k). + append(" / latest: ").append(v.getRight()). + append('\n')); + setStatus(builder.toString()); } - return status.get(); } protected void doHandle( @@ -196,11 +189,11 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { } } - status.set("Initialization completed"); + setStatus("Initialization completed"); // First realms... if (pushTask.getResource().getOrgUnit() != null) { - status.set("Pushing realms"); + setStatus("Pushing realms"); RealmPushResultHandler handler = buildRealmHandler(); handler.setProfile(profile); @@ -226,7 +219,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { filter(provision -> provision.getMapping() != null).sorted(provisionSorter). collect(Collectors.toList())) { - status.set("Pushing " + provision.getAnyType()); + setStatus("Pushing " + provision.getAnyType()); AnyType anyType = anyTypeDAO.find(provision.getAnyType()); @@ -283,7 +276,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { interrupted = true; } - status.set("Push done"); + setStatus("Push done"); String result = createReport(profile.getResults(), pushTask.getResource(), dryRun); LOG.debug("Push result: {}", result); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java index a71163ec3d..c11e5421ed 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java @@ -32,6 +32,7 @@ import org.apache.syncope.common.lib.types.ClientExceptionType; import org.apache.syncope.common.lib.types.ConflictResolutionAction; import org.apache.syncope.common.lib.types.MatchingRule; import org.apache.syncope.common.lib.types.PullMode; +import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.common.lib.types.UnmatchingRule; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; import org.apache.syncope.core.persistence.api.dao.RealmDAO; @@ -112,6 +113,8 @@ public class SinglePullJobDelegate extends PullJobDelegate implements SyncopeSin profile.getActions().addAll(getPullActions(pullTaskTO.getActions().stream(). map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); profile.setExecutor(executor); + this.task = profile.getTask(); + this.taskType = TaskType.PULL; for (PullActions action : profile.getActions()) { action.beforeAll(profile); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java index 9e67fce043..b40edf9861 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java @@ -26,6 +26,7 @@ import org.apache.syncope.common.lib.to.ProvisioningReport; import org.apache.syncope.common.lib.to.PushTaskTO; import org.apache.syncope.common.lib.types.ConflictResolutionAction; import org.apache.syncope.common.lib.types.MatchingRule; +import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.common.lib.types.UnmatchingRule; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; import org.apache.syncope.core.persistence.api.entity.Any; @@ -72,6 +73,9 @@ public class SinglePushJobDelegate extends PushJobDelegate implements SyncopeSin map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH); + this.task = profile.getTask(); + this.taskType = TaskType.PUSH; + for (PushActions action : profile.getActions()) { action.beforeAll(profile); } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java index 915edfbd49..3b228d23e2 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java @@ -33,6 +33,7 @@ import org.apache.syncope.common.lib.types.ConflictResolutionAction; import org.apache.syncope.common.lib.types.IdMImplementationType; import org.apache.syncope.common.lib.types.MappingPurpose; import org.apache.syncope.common.lib.types.PullMode; +import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; import org.apache.syncope.core.persistence.api.dao.RealmDAO; import org.apache.syncope.core.persistence.api.entity.AnyType; @@ -218,6 +219,9 @@ public class StreamPullJobDelegate extends PullJobDelegate implements SyncopeStr virSchemaDAO.find(resource.getKey(), anyType.getKey()).stream(). map(VirSchema::asLinkingMappingItem)); + this.task = profile.getTask(); + this.taskType = TaskType.PULL; + connector.fullReconciliation( new ObjectClass(provision.getObjectClass()), handler, diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java index 7c9c765f85..7628132fe5 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java @@ -29,6 +29,7 @@ import org.apache.syncope.common.lib.to.PushTaskTO; import org.apache.syncope.common.lib.types.ConflictResolutionAction; import org.apache.syncope.common.lib.types.IdMImplementationType; import org.apache.syncope.common.lib.types.MappingPurpose; +import org.apache.syncope.common.lib.types.TaskType; import org.apache.syncope.core.persistence.api.dao.ImplementationDAO; import org.apache.syncope.core.persistence.api.entity.Any; import org.apache.syncope.core.persistence.api.entity.AnyType; @@ -146,6 +147,9 @@ public class StreamPushJobDelegate extends PushJobDelegate implements SyncopeStr map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH); + this.task = profile.getTask(); + this.taskType = TaskType.PUSH; + for (PushActions action : profile.getActions()) { action.beforeAll(profile); } diff --git a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java new file mode 100644 index 0000000000..47b4c2ae3b --- /dev/null +++ b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.job; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.EntityFactory; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.apache.syncope.core.provisioning.java.AbstractTest; +import org.apache.syncope.core.spring.security.SecureRandomUtils; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +@Transactional("Master") +public class JobStatusUpdaterTest extends AbstractTest { + @Autowired + private EntityFactory entityFactory; + + @Autowired + private JobStatusDAO jobStatusDAO; + + @Test + public void verifyUpdate() { + JobStatusUpdater jobStatusUpdater = new JobStatusUpdater(jobStatusDAO, entityFactory); + final String refDesc = "JobRefDesc-" + SecureRandomUtils.generateRandomNumber(); + jobStatusUpdater.update(new JobStatusEvent(this, refDesc, "Started")); + assertNotNull(jobStatusDAO.find(refDesc)); + jobStatusUpdater.update(new JobStatusEvent(this, refDesc, null)); + assertNull(jobStatusDAO.find(refDesc)); + } +}