This is an automated email from the ASF dual-hosted git repository. mmoayyed pushed a commit to branch 2_1_X in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/2_1_X by this push: new 1f3feb88f2 SYNCOPE-1709: Persist Jobs' current status in the database (Backport 2_1_X) (#391) 1f3feb88f2 is described below commit 1f3feb88f25979c73f4305d0e774d7dbe8fef30d Author: Misagh Moayyed <mm1...@gmail.com> AuthorDate: Wed Nov 16 15:09:02 2022 +0400 SYNCOPE-1709: Persist Jobs' current status in the database (Backport 2_1_X) (#391) Co-authored-by: Francesco Chicchiriccò <ilgro...@apache.org> --- .../core/logic/AbstractExecutableLogic.java | 8 +-- .../syncope/core/logic/AbstractJobLogic.java | 20 +++---- .../syncope/core/logic/NotificationLogic.java | 2 +- .../org/apache/syncope/core/logic/ReportLogic.java | 4 +- .../org/apache/syncope/core/logic/TaskLogic.java | 2 +- .../core/persistence/api/dao/JobStatusDAO.java} | 15 +++-- .../core/persistence/api/dao/Reportlet.java | 5 +- .../core/persistence/api/entity/JobStatus.java} | 13 ++-- .../core/persistence/jpa/dao/JPAJobStatusDAO.java | 51 ++++++++++++++++ .../persistence/jpa/entity/JPAEntityFactory.java | 3 + .../core/persistence/jpa/entity/JPAJobStatus.java} | 29 ++++++--- .../provisioning/api/event/JobStatusEvent.java | 52 ++++++++++++++++ .../core/provisioning/api/job/JobDelegate.java | 2 - .../java/job/AbstractInterruptableJob.java | 5 -- .../java/job/AbstractSchedTaskJobDelegate.java | 21 ++++--- .../job/GroupMemberProvisionTaskJobDelegate.java | 6 +- .../provisioning/java/job/JobStatusUpdater.java | 69 ++++++++++++++++++++++ .../DefaultNotificationJobDelegate.java | 18 +++--- .../java/job/report/AbstractReportlet.java | 19 ++++-- .../java/job/report/AuditReportlet.java | 11 ++-- .../java/job/report/DefaultReportJobDelegate.java | 48 ++++++++++----- .../java/job/report/GroupReportlet.java | 9 ++- .../java/job/report/ReconciliationReportlet.java | 25 ++++---- .../java/job/report/StaticReportlet.java | 3 +- .../java/job/report/UserReportlet.java | 7 +-- .../java/pushpull/PullJobDelegate.java | 35 ++++------- .../java/pushpull/PushJobDelegate.java | 33 ++++------- .../java/pushpull/SinglePullJobDelegate.java | 1 + .../java/pushpull/SinglePushJobDelegate.java | 2 + .../pushpull/stream/StreamPullJobDelegate.java | 2 + .../pushpull/stream/StreamPushJobDelegate.java | 2 + .../src/main/resources/provisioningContext.xml | 8 +++ .../java/job/JobStatusUpdaterTest.java | 50 ++++++++++++++++ .../java/pushpull/DBPasswordPullActionsTest.java | 3 +- 34 files changed, 418 insertions(+), 165 deletions(-) diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java index aa69180f77..a030cacb75 100644 --- a/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java +++ b/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractExecutableLogic.java @@ -18,8 +18,6 @@ */ package org.apache.syncope.core.logic; -import java.util.Date; -import java.util.List; import org.apache.commons.lang3.tuple.Pair; import org.apache.syncope.common.lib.to.EntityTO; import org.apache.syncope.common.lib.to.ExecTO; @@ -27,20 +25,22 @@ import org.apache.syncope.common.lib.to.JobTO; import org.apache.syncope.common.lib.types.JobAction; import org.apache.syncope.common.rest.api.batch.BatchResponseItem; import org.apache.syncope.core.persistence.api.dao.search.OrderByClause; +import java.util.Date; +import java.util.List; public abstract class AbstractExecutableLogic<T extends EntityTO> extends AbstractJobLogic<T> { public abstract ExecTO execute(String key, Date startAt, boolean dryRun); public abstract Pair<Integer, List<ExecTO>> listExecutions( - String key, int page, int size, List<OrderByClause> orderByClauses); + String key, int page, int size, List<OrderByClause> orderByClauses); public abstract List<ExecTO> listRecentExecutions(int max); public abstract ExecTO deleteExecution(String executionKey); public abstract List<BatchResponseItem> deleteExecutions( - String key, Date startedBefore, Date startedAfter, Date endedBefore, Date endedAfter); + String key, Date startedBefore, Date startedAfter, Date endedBefore, Date endedAfter); public abstract JobTO getJob(String key); diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java index 5a00435cc6..803c2f03f0 100644 --- a/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java +++ b/core/logic/src/main/java/org/apache/syncope/core/logic/AbstractJobLogic.java @@ -20,25 +20,25 @@ package org.apache.syncope.core.logic; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.commons.lang3.tuple.Triple; import org.apache.syncope.common.lib.to.EntityTO; import org.apache.syncope.common.lib.to.JobTO; import org.apache.syncope.common.lib.types.JobAction; import org.apache.syncope.common.lib.types.JobType; +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.JobStatus; import org.apache.syncope.core.provisioning.api.job.JobManager; -import org.apache.syncope.core.provisioning.java.job.AbstractInterruptableJob; import org.apache.syncope.core.provisioning.java.job.SystemLoadReporterJob; import org.apache.syncope.core.provisioning.java.job.TaskJob; import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob; import org.apache.syncope.core.provisioning.java.job.report.ReportJob; -import org.apache.syncope.core.spring.ApplicationContextProvider; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.impl.matchers.GroupMatcher; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.SchedulerFactoryBean; @@ -50,6 +50,9 @@ abstract class AbstractJobLogic<T extends EntityTO> extends AbstractTransactiona @Autowired protected SchedulerFactoryBean scheduler; + @Autowired + protected JobStatusDAO jobStatusDAO; + protected abstract Triple<JobType, String, String> getReference(JobKey jobKey); protected JobTO getJobTO(final JobKey jobKey, final boolean includeCustom) throws SchedulerException { @@ -90,13 +93,10 @@ abstract class AbstractJobLogic<T extends EntityTO> extends AbstractTransactiona jobTO.setStatus("UNKNOWN"); if (jobTO.isRunning()) { try { - Object job = ApplicationContextProvider.getBeanFactory().getBean(jobKey.getName()); - if (job instanceof AbstractInterruptableJob - && ((AbstractInterruptableJob) job).getDelegate() != null) { - - jobTO.setStatus(((AbstractInterruptableJob) job).getDelegate().currentStatus()); - } - } catch (NoSuchBeanDefinitionException e) { + jobTO.setStatus(Optional.ofNullable(jobStatusDAO.find(jobTO.getRefDesc())). + map(JobStatus::getStatus). + orElse(jobTO.getStatus())); + } catch (Exception e) { LOG.warn("Could not find job {} implementation", jobKey, e); } } diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java index 7b9aa90937..06c5e70985 100644 --- a/core/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java +++ b/core/logic/src/main/java/org/apache/syncope/core/logic/NotificationLogic.java @@ -104,7 +104,7 @@ public class NotificationLogic extends AbstractJobLogic<NotificationTO> { @Override protected Triple<JobType, String, String> getReference(final JobKey jobKey) { return JobManager.NOTIFICATION_JOB.equals(jobKey) - ? Triple.of(JobType.NOTIFICATION, (String) null, NotificationJob.class.getSimpleName()) + ? Triple.of(JobType.NOTIFICATION, null, NotificationJob.class.getSimpleName()) : null; } diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java index 95aa956f4d..250abcec52 100644 --- a/core/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java +++ b/core/logic/src/main/java/org/apache/syncope/core/logic/ReportLogic.java @@ -372,8 +372,8 @@ public class ReportLogic extends AbstractExecutableLogic<ReportTO> { Report report = reportDAO.find(key); return report == null - ? null - : Triple.of(JobType.REPORT, key, binder.buildRefDesc(report)); + ? null + : Triple.of(JobType.REPORT, key, binder.buildRefDesc(report)); } @PreAuthorize("hasRole('" + StandardEntitlement.REPORT_LIST + "')") diff --git a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java index 8b174e809f..f43e588800 100644 --- a/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java +++ b/core/logic/src/main/java/org/apache/syncope/core/logic/TaskLogic.java @@ -96,7 +96,7 @@ public class TaskLogic extends AbstractExecutableLogic<TaskTO> { @Autowired private TaskDataBinder binder; - + @Autowired private PropagationTaskExecutor taskExecutor; diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java similarity index 73% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java index 1124fb52ae..bb50b6485d 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/JobStatusDAO.java @@ -16,16 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.api.dao; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { +import org.apache.syncope.core.persistence.api.entity.JobStatus; + +public interface JobStatusDAO extends DAO<JobStatus> { - String currentStatus(); + JobStatus find(String key); - void interrupt(); + JobStatus save(JobStatus jobStatus); - boolean isInterrupted(); + void delete(String key); } diff --git a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java index 2fb2a86152..50cd03c868 100644 --- a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/Reportlet.java @@ -18,7 +18,6 @@ */ package org.apache.syncope.core.persistence.api.dao; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.common.lib.report.ReportletConf; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -42,8 +41,8 @@ public interface Reportlet { * Actual data extraction for reporting. * * @param handler SAX content handler for streaming result - * @param status current report status (for job reporting) + * @param refDesc current report status (for job reporting) * @throws SAXException if there is any problem in SAX handling */ - void extract(ContentHandler handler, AtomicReference<String> status) throws SAXException; + void extract(ContentHandler handler, String refDesc) throws SAXException; } diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java similarity index 73% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java index 1124fb52ae..ae98e37051 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/entity/JobStatus.java @@ -16,16 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.api.entity; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { - - String currentStatus(); +public interface JobStatus extends ProvidedKeyEntity { - void interrupt(); + String getStatus(); - boolean isInterrupted(); + void setStatus(String status); } diff --git a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java new file mode 100644 index 0000000000..ca6cab0527 --- /dev/null +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPAJobStatusDAO.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.persistence.jpa.dao; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.JobStatus; +import org.apache.syncope.core.persistence.jpa.entity.JPAJobStatus; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +@Repository +public class JPAJobStatusDAO extends AbstractDAO<JobStatus> implements JobStatusDAO { + + @Transactional(readOnly = true) + @Override + public JobStatus find(final String key) { + return entityManager().find(JPAJobStatus.class, key); + } + + @Transactional + @Override + public JobStatus save(final JobStatus jobStatus) { + return entityManager().merge(jobStatus); + } + + @Transactional + @Override + public void delete(final String key) { + JobStatus jobStatus = find(key); + if (jobStatus != null) { + entityManager().remove(jobStatus); + } + + } +} diff --git a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java index 9107c0e130..218e7500ac 100644 --- a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAEntityFactory.java @@ -44,6 +44,7 @@ import org.apache.syncope.core.persistence.api.entity.Entity; import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.resource.ExternalResource; import org.apache.syncope.core.persistence.api.entity.Logger; +import org.apache.syncope.core.persistence.api.entity.JobStatus; import org.apache.syncope.core.persistence.api.entity.MailTemplate; import org.apache.syncope.core.persistence.api.entity.Notification; import org.apache.syncope.core.persistence.api.entity.policy.PasswordPolicy; @@ -316,6 +317,8 @@ public class JPAEntityFactory implements EntityFactory { result = (E) new JPABatch(); } else if (reference.equals(Delegation.class)) { result = (E) new JPADelegation(); + } else if (reference.equals(JobStatus.class)) { + result = (E) new JPAJobStatus(); } else { throw new IllegalArgumentException("Could not find a JPA implementation of " + reference.getName()); } diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java similarity index 55% copy from core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java copy to core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java index 1124fb52ae..5cda0127f9 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/entity/JPAJobStatus.java @@ -16,16 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.syncope.core.provisioning.api.job; +package org.apache.syncope.core.persistence.jpa.entity; -/** - * Implementations of this interface will perform the actual operations required to Quartz's {@link org.quartz.Job}. - */ -public interface JobDelegate { +import javax.persistence.Entity; +import javax.persistence.Table; +import org.apache.syncope.core.persistence.api.entity.JobStatus; + +@Entity +@Table(name = JPAJobStatus.TABLE) +public class JPAJobStatus extends AbstractProvidedKeyEntity implements JobStatus { + + private static final long serialVersionUID = 9061740216669505871L; + + public static final String TABLE = "JobStatus"; - String currentStatus(); + private String jobStatus; - void interrupt(); + @Override + public String getStatus() { + return jobStatus; + } - boolean isInterrupted(); + @Override + public void setStatus(final String status) { + jobStatus = status; + } } diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java new file mode 100644 index 0000000000..0df61e7082 --- /dev/null +++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/event/JobStatusEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.api.event; + +import org.springframework.context.ApplicationEvent; + +public class JobStatusEvent extends ApplicationEvent { + + private static final long serialVersionUID = 373067530016978592L; + + private final String jobRefDesc; + + private final String jobStatus; + + public JobStatusEvent(final Object source, final String jobRefDesc, final String jobStatus) { + super(source); + this.jobRefDesc = jobRefDesc; + this.jobStatus = jobStatus; + } + + public String getJobRefDesc() { + return jobRefDesc; + } + + public String getJobStatus() { + return jobStatus; + } + + @Override + public String toString() { + return "JobStatusEvent{" + + "jobRefDesc=" + jobRefDesc + + ", jobStatus=" + jobStatus + + '}'; + } +} diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java index 1124fb52ae..329cad2845 100644 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java +++ b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/job/JobDelegate.java @@ -23,8 +23,6 @@ package org.apache.syncope.core.provisioning.api.job; */ public interface JobDelegate { - String currentStatus(); - void interrupt(); boolean isInterrupted(); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java index b2314c5907..a858d2385b 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractInterruptableJob.java @@ -28,11 +28,6 @@ public abstract class AbstractInterruptableJob implements InterruptableJob { private final JobDelegate embeddedDelegate = new JobDelegate() { - @Override - public String currentStatus() { - return "RUNNING"; - } - @Override public void interrupt() { } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java index 62bdff4a6e..528c61bcde 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractSchedTaskJobDelegate.java @@ -19,7 +19,7 @@ package org.apache.syncope.core.provisioning.java.job; import java.util.Date; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Objects; import org.apache.syncope.common.lib.types.AuditElements; import org.apache.syncope.core.provisioning.api.utils.ExceptionUtils2; import org.apache.syncope.core.persistence.api.dao.TaskDAO; @@ -28,6 +28,8 @@ import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.task.SchedTask; import org.apache.syncope.core.persistence.api.entity.task.TaskExec; import org.apache.syncope.core.provisioning.api.AuditManager; +import org.apache.syncope.core.provisioning.api.data.TaskDataBinder; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate; import org.apache.syncope.core.provisioning.api.notification.NotificationManager; import org.apache.syncope.core.spring.security.AuthContextUtils; @@ -36,6 +38,7 @@ import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.transaction.annotation.Transactional; public abstract class AbstractSchedTaskJobDelegate implements SchedTaskJobDelegate { @@ -62,6 +65,9 @@ public abstract class AbstractSchedTaskJobDelegate implements SchedTaskJobDelega @Autowired protected EntityFactory entityFactory; + @Autowired + protected TaskDataBinder taskDataBinder; + /** * Notification manager. */ @@ -74,15 +80,16 @@ public abstract class AbstractSchedTaskJobDelegate implements SchedTaskJobDelega @Autowired protected AuditManager auditManager; - protected final AtomicReference<String> status = new AtomicReference<>(); + @Autowired + protected ApplicationEventPublisher publisher; protected boolean interrupt; protected boolean interrupted; - @Override - public String currentStatus() { - return status.get(); + protected void setStatus(final String status) { + Objects.requireNonNull(task, "Task cannot be undefined"); + publisher.publishEvent(new JobStatusEvent(this, taskDataBinder.buildRefDesc(task), status)); } @Override @@ -114,7 +121,7 @@ public abstract class AbstractSchedTaskJobDelegate implements SchedTaskJobDelega execution.setStart(new Date()); execution.setTask(task); - status.set("Initialization completed"); + setStatus("Initialization completed"); AuditElements.Result result; @@ -136,7 +143,7 @@ public abstract class AbstractSchedTaskJobDelegate implements SchedTaskJobDelega } task = taskDAO.save(task); - status.set("Done"); + setStatus(null); notificationManager.createTasks( AuthContextUtils.getWho(), diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java index ab7598f7d2..22e162ac01 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/GroupMemberProvisionTaskJobDelegate.java @@ -82,13 +82,13 @@ public class GroupMemberProvisionTaskJobDelegate extends AbstractSchedTaskJobDel } result.append("provision\n\n"); - status.set(result.toString()); + setStatus(result.toString()); MembershipCond membershipCond = new MembershipCond(); membershipCond.setGroup(groupKey); List<User> users = searchDAO.search(SearchCond.getLeaf(membershipCond), AnyTypeKind.USER); Collection<String> groupResourceKeys = groupDAO.findAllResourceKeys(groupKey); - status.set("About to " + setStatus("About to " + (action == ProvisionAction.DEPROVISION ? "de" : "") + "provision " + users.size() + " users from " + groupResourceKeys); @@ -116,7 +116,7 @@ public class GroupMemberProvisionTaskJobDelegate extends AbstractSchedTaskJobDel membershipCond = new MembershipCond(); membershipCond.setGroup(groupKey); List<AnyObject> anyObjects = searchDAO.search(SearchCond.getLeaf(membershipCond), AnyTypeKind.ANY_OBJECT); - status.set("About to " + setStatus("About to " + (action == ProvisionAction.DEPROVISION ? "de" : "") + "provision " + anyObjects.size() + " any objects from " + groupResourceKeys); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java new file mode 100644 index 0000000000..7bd15ef5ab --- /dev/null +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdater.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.job; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.EntityFactory; +import org.apache.syncope.core.persistence.api.entity.JobStatus; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; + +public class JobStatusUpdater { + protected static final Logger LOG = LoggerFactory.getLogger(JobStatusUpdater.class); + + @Autowired + protected JobStatusDAO jobStatusDAO; + + @Autowired + protected EntityFactory entityFactory; + + public JobStatusUpdater(final JobStatusDAO jobStatusDAO, final EntityFactory entityFactory) { + this.jobStatusDAO = jobStatusDAO; + this.entityFactory = entityFactory; + } + + /** + * It's important to note that responding to job status updates + * must be done in async mode, and via a separate special thread executor + * that attempts to synchronize job execution serially by only making one thread + * active at a given time. Not doing so will force the event executor to launch + * separate threads per each status update, which would result in multiple concurrent + * INSERT operations on the database, and failing. + * + * @param event the event + */ + @Async("jobStatusUpdaterThreadExecutor") + @EventListener + public void update(final JobStatusEvent event) { + if (event.getJobStatus() == null) { + LOG.debug("Deleting status for job {}", event.getJobRefDesc()); + jobStatusDAO.delete(event.getJobRefDesc()); + } else { + LOG.debug("Updating job {} with status {}", event.getJobRefDesc(), event.getJobStatus()); + JobStatus jobStatus = entityFactory.newEntity(JobStatus.class); + jobStatus.setKey(event.getJobRefDesc()); + jobStatus.setStatus(event.getJobStatus()); + jobStatusDAO.save(jobStatus); + } + } +} diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java index 10a9019d44..e4a66603b3 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/DefaultNotificationJobDelegate.java @@ -23,7 +23,6 @@ import java.util.Date; import java.util.Enumeration; import java.util.List; import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; import javax.mail.Session; import javax.mail.internet.MimeMessage; import org.apache.commons.lang3.BooleanUtils; @@ -39,6 +38,8 @@ import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.task.NotificationTask; import org.apache.syncope.core.persistence.api.entity.task.TaskExec; import org.apache.syncope.core.provisioning.api.AuditManager; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.apache.syncope.core.provisioning.api.job.JobManager; import org.apache.syncope.core.provisioning.api.notification.NotificationJobDelegate; import org.apache.syncope.core.provisioning.api.notification.NotificationManager; import org.apache.syncope.core.spring.security.AuthContextUtils; @@ -47,6 +48,7 @@ import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.mail.javamail.JavaMailSender; import org.springframework.mail.javamail.JavaMailSenderImpl; @@ -74,7 +76,8 @@ public class DefaultNotificationJobDelegate implements InitializingBean, Notific @Autowired private NotificationManager notificationManager; - private final AtomicReference<String> status = new AtomicReference<>(); + @Autowired + protected ApplicationEventPublisher publisher; private boolean interrupt; @@ -110,9 +113,8 @@ public class DefaultNotificationJobDelegate implements InitializingBean, Notific } } - @Override - public String currentStatus() { - return status.get(); + protected void setStatus(final String status) { + publisher.publishEvent(new JobStatusEvent(this, JobManager.NOTIFICATION_JOB.getName(), status)); } @Override @@ -161,7 +163,7 @@ public class DefaultNotificationJobDelegate implements InitializingBean, Notific + task.getTextBody() + "\n"); } - status.set("Sending notifications to " + task.getRecipients()); + setStatus("Sending notifications to " + task.getRecipients()); for (String to : task.getRecipients()) { try { @@ -253,7 +255,7 @@ public class DefaultNotificationJobDelegate implements InitializingBean, Notific public void execute() throws JobExecutionException { List<NotificationTask> tasks = taskDAO.<NotificationTask>findToExec(TaskType.NOTIFICATION); - status.set("Sending out " + tasks.size() + " notifications"); + setStatus("Sending out " + tasks.size() + " notifications"); for (int i = 0; i < tasks.size() && !interrupt; i++) { LOG.debug("Found notification task {} to be executed: starting...", tasks.get(i)); @@ -264,6 +266,8 @@ public class DefaultNotificationJobDelegate implements InitializingBean, Notific LOG.debug("Notification job interrupted"); interrupted = true; } + + setStatus(null); } private boolean hasToBeRegistered(final TaskExec execution) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java index cb52fd0934..f95ea52102 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AbstractReportlet.java @@ -18,11 +18,13 @@ */ package org.apache.syncope.core.provisioning.java.job.report; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.syncope.core.persistence.api.dao.Reportlet; import org.apache.syncope.common.lib.report.ReportletConf; +import org.apache.syncope.core.persistence.api.dao.Reportlet; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -32,6 +34,9 @@ public abstract class AbstractReportlet implements Reportlet { protected static final Logger LOG = LoggerFactory.getLogger(AbstractReportlet.class); + @Autowired + protected ApplicationEventPublisher publisher; + protected ReportletConf conf; @Override @@ -39,18 +44,22 @@ public abstract class AbstractReportlet implements Reportlet { this.conf = conf; } - protected abstract void doExtract(ReportletConf conf, ContentHandler handler, AtomicReference<String> status) + protected void setStatus(final String refDesc, final String status) { + publisher.publishEvent(new JobStatusEvent(this, refDesc, status)); + } + + protected abstract void doExtract(ReportletConf conf, ContentHandler handler, String refDesc) throws SAXException; @Override @Transactional(readOnly = true) - public void extract(final ContentHandler handler, final AtomicReference<String> status) throws SAXException { + public void extract(final ContentHandler handler, final String refDesc) throws SAXException { AttributesImpl atts = new AttributesImpl(); atts.addAttribute("", "", ReportXMLConst.ATTR_NAME, ReportXMLConst.XSD_STRING, conf.getName()); atts.addAttribute("", "", ReportXMLConst.ATTR_CLASS, ReportXMLConst.XSD_STRING, getClass().getName()); handler.startElement("", "", ReportXMLConst.ELEMENT_REPORTLET, atts); - doExtract(conf, handler, status); + doExtract(conf, handler, refDesc); handler.endElement("", "", ReportXMLConst.ELEMENT_REPORTLET); } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java index fb7a4d365d..42d52a2d76 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/AuditReportlet.java @@ -20,7 +20,6 @@ package org.apache.syncope.core.provisioning.java.job.report; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import javax.sql.DataSource; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.log.AuditEntry; @@ -47,8 +46,8 @@ public class AuditReportlet extends AbstractReportlet { private DataSource datasource; - private void doExtractConf(final ContentHandler handler, final AtomicReference<String> status) throws SAXException { - status.set("Fetching " + conf.getSize() + " rows from the " + LoggerDAO.AUDIT_TABLE + " table"); + private void doExtractConf(final ContentHandler handler, final String refDesc) throws SAXException { + setStatus(refDesc, "Fetching " + conf.getSize() + " rows from the " + LoggerDAO.AUDIT_TABLE + " table"); JdbcTemplate jdbcTemplate = new JdbcTemplate(datasource); jdbcTemplate.setMaxRows(conf.getSize()); @@ -119,14 +118,14 @@ public class AuditReportlet extends AbstractReportlet { } handler.endElement("", "", "events"); - status.set("Fetched " + conf.getSize() + " rows from the SYNCOPEAUDIT table"); + setStatus(refDesc, "Fetched " + conf.getSize() + " rows from the SYNCOPEAUDIT table"); } @Override protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof AuditReportletConf) { @@ -140,6 +139,6 @@ public class AuditReportlet extends AbstractReportlet { throw new ReportException(new IllegalArgumentException("Could not get to DataSource")); } - doExtractConf(handler, status); + doExtractConf(handler, refDesc); } } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java index d25bbdc76c..7560348fb5 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/DefaultReportJobDelegate.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Deflater; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -43,12 +42,15 @@ import org.apache.syncope.core.persistence.api.dao.Reportlet; import org.apache.syncope.core.persistence.api.entity.EntityFactory; import org.apache.syncope.core.persistence.api.entity.Report; import org.apache.syncope.core.persistence.api.entity.ReportExec; +import org.apache.syncope.core.provisioning.api.data.ReportDataBinder; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; import org.apache.syncope.core.provisioning.api.job.report.ReportJobDelegate; import org.apache.syncope.core.spring.implementation.ImplementationManager; import org.quartz.JobExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.helpers.AttributesImpl; @@ -83,22 +85,36 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { private ReportExecDAO reportExecDAO; @Autowired - private EntityFactory entityFactory; + protected ReportDataBinder reportDataBinder; - private final AtomicReference<String> status = new AtomicReference<>(); + @Autowired + protected ApplicationEventPublisher publisher; - private boolean interrupt; + protected Report report; - private boolean interrupted; + @Autowired + private EntityFactory entityFactory; - @Override - public String currentStatus() { - return status.get(); + private boolean interrupt; + + private boolean interrupted; + + /** + * Sets status. + * Updating job status is done via an event publishing mechanism + * to allow underlying updates to be done in separate threads, + * and not as part of the current transaction. + * + * @param status the status + */ + protected void setStatus(final String status) { + LOG.debug("Status update: {} {}", reportDataBinder.buildRefDesc(report), status); + publisher.publishEvent(new JobStatusEvent(this, reportDataBinder.buildRefDesc(report), status)); } @Override public void interrupt() { - interrupt = true; + interrupted = true; } @Override @@ -109,7 +125,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { @Transactional @Override public void execute(final String reportKey) throws JobExecutionException { - Report report = reportDAO.find(reportKey); + report = reportDAO.find(reportKey); if (report == null) { throw new JobExecutionException("Report " + reportKey + " not found"); } @@ -153,7 +169,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { execution.setStatus(ReportExecStatus.RUNNING); execution = reportExecDAO.save(execution); - status.set("Starting"); + setStatus("Starting"); // 3. actual report execution StringBuilder reportExecutionMessage = new StringBuilder(); @@ -164,15 +180,15 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { atts.addAttribute("", "", ReportXMLConst.ATTR_NAME, ReportXMLConst.XSD_STRING, report.getName()); handler.startElement("", "", ReportXMLConst.ELEMENT_REPORT, atts); - status.set("Generating report header"); + setStatus("Generating report header"); // iterate over reportlet instances defined for this report for (int i = 0; i < report.getReportlets().size() && !interrupt; i++) { Optional<Reportlet> reportlet = ImplementationManager.buildReportlet(report.getReportlets().get(i)); if (reportlet.isPresent()) { try { - status.set("Invoking reportlet " + report.getReportlets().get(i).getKey()); - reportlet.get().extract(handler, status); + setStatus("Invoking reportlet " + report.getReportlets().get(i).getKey()); + reportlet.get().extract(handler, reportDataBinder.buildRefDesc(report)); } catch (Throwable t) { LOG.error("While executing reportlet {} for report {}", reportlet, reportKey, t); @@ -193,7 +209,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { } // report footer - status.set("Generating report footer"); + setStatus("Generating report footer"); handler.endElement("", "", ReportXMLConst.ELEMENT_REPORT); handler.endDocument(); @@ -207,7 +223,7 @@ public class DefaultReportJobDelegate implements ReportJobDelegate { throw new JobExecutionException(e, true); } finally { - status.set("Completed"); + setStatus(null); try { zos.closeEntry(); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java index 5c7db28bd3..3ed0152f4c 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/GroupReportlet.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.EntityTOUtils; import org.apache.syncope.common.lib.SyncopeConstants; @@ -296,7 +295,7 @@ public class GroupReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof GroupReportletConf) { @@ -310,10 +309,10 @@ public class GroupReportlet extends AbstractReportlet { int total = count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); List<Group> groups; if (StringUtils.isBlank(this.conf.getMatchingCond())) { @@ -330,7 +329,7 @@ public class GroupReportlet extends AbstractReportlet { doExtract(handler, groups); - status.set("Processed " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processed " + total + " groups: page " + page + " of " + pages); } } } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java index 0feaef893f..58df039876 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/ReconciliationReportlet.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.syncope.common.lib.SyncopeConstants; @@ -370,7 +369,7 @@ public class ReconciliationReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof ReconciliationReportletConf) { @@ -385,13 +384,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = userDAO.count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.USER) + "s", atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); doExtract(handler, userDAO.findAll(page, AnyDAO.DEFAULT_PAGE_SIZE)); } @@ -401,13 +400,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = searchDAO.count(SyncopeConstants.FULL_ADMIN_REALMS, cond, AnyTypeKind.USER); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.USER) + "s", atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); doExtract(handler, searchDAO.search( SyncopeConstants.FULL_ADMIN_REALMS, @@ -425,13 +424,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = groupDAO.count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.GROUP) + "s", atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); doExtract(handler, groupDAO.findAll(page, AnyDAO.DEFAULT_PAGE_SIZE)); } @@ -441,13 +440,13 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = searchDAO.count(SyncopeConstants.FULL_ADMIN_REALMS, cond, AnyTypeKind.GROUP); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " groups in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " groups in " + pages + " pages"); atts.addAttribute("", "", "total", ReportXMLConst.XSD_INT, String.valueOf(total)); handler.startElement("", "", getAnyElementName(AnyTypeKind.GROUP) + "s", atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " groups: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " groups: page " + page + " of " + pages); doExtract(handler, searchDAO.search( SyncopeConstants.FULL_ADMIN_REALMS, @@ -473,7 +472,9 @@ public class ReconciliationReportlet extends AbstractReportlet { int total = searchDAO.count(SyncopeConstants.FULL_ADMIN_REALMS, cond, AnyTypeKind.ANY_OBJECT); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " any objects " + anyType.getKey() + " in " + pages + " pages"); + setStatus( + refDesc, + "Processing " + total + " any objects " + anyType.getKey() + " in " + pages + " pages"); atts.clear(); atts.addAttribute("", "", "type", ReportXMLConst.XSD_STRING, anyType.getKey()); @@ -481,7 +482,7 @@ public class ReconciliationReportlet extends AbstractReportlet { handler.startElement("", "", getAnyElementName(AnyTypeKind.ANY_OBJECT) + "s", atts); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " any objects " + anyType.getKey() + setStatus(refDesc, "Processing " + total + " any objects " + anyType.getKey() + ": page " + page + " of " + pages); doExtract(handler, searchDAO.search( diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java index 49e5e2a28a..06b44d8e0b 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/StaticReportlet.java @@ -18,7 +18,6 @@ */ package org.apache.syncope.core.provisioning.java.job.report; -import java.util.concurrent.atomic.AtomicReference; import org.apache.syncope.core.persistence.api.dao.ReportletConfClass; import org.apache.syncope.common.lib.report.ReportletConf; import org.apache.syncope.common.lib.report.StaticReportletConf; @@ -74,7 +73,7 @@ public class StaticReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof StaticReportletConf) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java index 4d83d9b82f..a937940e25 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/report/UserReportlet.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.syncope.common.lib.EntityTOUtils; import org.apache.syncope.common.lib.SyncopeConstants; @@ -358,7 +357,7 @@ public class UserReportlet extends AbstractReportlet { protected void doExtract( final ReportletConf conf, final ContentHandler handler, - final AtomicReference<String> status) + final String refDesc) throws SAXException { if (conf instanceof UserReportletConf) { @@ -372,10 +371,10 @@ public class UserReportlet extends AbstractReportlet { int total = count(); int pages = (total / AnyDAO.DEFAULT_PAGE_SIZE) + 1; - status.set("Processing " + total + " users in " + pages + " pages"); + setStatus(refDesc, "Processing " + total + " users in " + pages + " pages"); for (int page = 1; page <= pages; page++) { - status.set("Processing " + total + " users: page " + page + " of " + pages); + setStatus(refDesc, "Processing " + total + " users: page " + page + " of " + pages); List<User> users; if (StringUtils.isBlank(this.conf.getMatchingCond())) { diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java index 116d89b954..9d8ab4dbb8 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PullJobDelegate.java @@ -111,6 +111,15 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i }); pair.setLeft(pair.getLeft() + 1); pair.setRight(name.getNameValue()); + + if (!handled.isEmpty()) { + StringBuilder builder = new StringBuilder("Processed:\n"); + handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t'). + append(k). + append(" / latest: ").append(v.getRight()). + append('\n')); + setStatus(builder.toString()); + } } @Override @@ -122,24 +131,6 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i public void setInterrupted() { this.interrupted = true; } - - @Override - public String currentStatus() { - synchronized (status) { - if (!handled.isEmpty()) { - StringBuilder builder = new StringBuilder("Processed:\n"); - handled.forEach((key, value) -> { - builder.append(' ').append(value.getLeft()).append('\t'). - append(key.getObjectClassValue()). - append(" / latest: ").append(value.getRight()). - append('\n'); - }); - status.set(builder.toString()); - } - } - return status.get(); - } - protected void setGroupOwners(final GroupPullResultHandler ghandler) { ghandler.getGroupOwnerMap().forEach((groupKey, ownerKey) -> { Group group = groupDAO.find(groupKey); @@ -243,11 +234,11 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i } } - status.set("Initialization completed"); + setStatus("Initialization completed"); // First realms... if (pullTask.getResource().getOrgUnit() != null) { - status.set("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass().getObjectClassValue()); + setStatus("Pulling " + pullTask.getResource().getOrgUnit().getObjectClass()); OrgUnit orgUnit = pullTask.getResource().getOrgUnit(); @@ -307,7 +298,7 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i filter(provision -> provision.getMapping() != null).sorted(provisionSorter). collect(Collectors.toList())) { - status.set("Pulling " + provision.getObjectClass().getObjectClassValue()); + setStatus("Pulling " + provision.getObjectClass()); SyncopePullResultHandler handler; switch (provision.getAnyType().getKind()) { @@ -396,7 +387,7 @@ public class PullJobDelegate extends AbstractProvisioningJobDelegate<PullTask> i } } - status.set("Pull done"); + setStatus("Pull done"); String result = createReport(profile.getResults(), pullTask.getResource(), dryRun); LOG.debug("Pull result: {}", result); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java index 243eaf0827..69579b021d 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/PushJobDelegate.java @@ -20,7 +20,6 @@ package org.apache.syncope.core.provisioning.java.pushpull; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -82,7 +81,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { protected ProvisioningProfile<PushTask, PushActions> profile; - protected final Map<String, MutablePair<Integer, String>> handled = new HashMap<>(); + protected final Map<String, MutablePair<Integer, String>> handled = new ConcurrentHashMap<>(); protected final Map<String, PushActions> perContextActions = new ConcurrentHashMap<>(); @@ -94,23 +93,15 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { } pair.setLeft(pair.getLeft() + 1); pair.setRight(key); - } - @Override - public String currentStatus() { - synchronized (status) { - if (!handled.isEmpty()) { - StringBuilder builder = new StringBuilder("Processed:\n"); - handled.forEach((key, value) -> { - builder.append(' ').append(value.getLeft()).append('\t'). - append(key). - append(" / latest: ").append(value.getRight()). - append('\n'); - }); - status.set(builder.toString()); - } + if (!handled.isEmpty()) { + StringBuilder builder = new StringBuilder("Processed:\n"); + handled.forEach((k, v) -> builder.append(' ').append(v.getLeft()).append('\t'). + append(k). + append(" / latest: ").append(v.getRight()). + append('\n')); + setStatus(builder.toString()); } - return status.get(); } protected void doHandle( @@ -195,11 +186,11 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { } } - status.set("Initialization completed"); + setStatus("Initialization completed"); // First realms... if (pushTask.getResource().getOrgUnit() != null) { - status.set("Pushing realms"); + setStatus("Pushing realms"); RealmPushResultHandler handler = buildRealmHandler(); handler.setProfile(profile); @@ -225,7 +216,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { filter(provision -> provision.getMapping() != null).sorted(provisionSorter). collect(Collectors.toList())) { - status.set("Pushing " + provision.getAnyType().getKey()); + setStatus("Pushing " + provision.getAnyType()); AnyDAO<?> anyDAO = anyUtilsFactory.getInstance(provision.getAnyType().getKind()).dao(); @@ -278,7 +269,7 @@ public class PushJobDelegate extends AbstractProvisioningJobDelegate<PushTask> { interrupted = true; } - status.set("Push done"); + setStatus("Push done"); String result = createReport(profile.getResults(), pushTask.getResource(), dryRun); LOG.debug("Push result: {}", result); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java index 6a70e79177..f6b3f2e38f 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePullJobDelegate.java @@ -110,6 +110,7 @@ public class SinglePullJobDelegate extends PullJobDelegate implements SyncopeSin profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH); profile.getActions().addAll(getPullActions(pullTaskTO.getActions().stream(). map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); + this.task = profile.getTask(); for (PullActions action : profile.getActions()) { action.beforeAll(profile); diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java index 0cbf65aee1..143c1616d1 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/SinglePushJobDelegate.java @@ -69,6 +69,8 @@ public class SinglePushJobDelegate extends PushJobDelegate implements SyncopeSin map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH); + this.task = profile.getTask(); + for (PushActions action : profile.getActions()) { action.beforeAll(profile); } diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java index 711856ae0c..184668b412 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPullJobDelegate.java @@ -222,6 +222,8 @@ public class StreamPullJobDelegate extends PullJobDelegate implements SyncopeStr MappingUtils.getPullItems(provision.getMapping().getItems().stream()), virSchemaDAO.findByProvision(provision).stream().map(VirSchema::asLinkingMappingItem)); + this.task = profile.getTask(); + connector.fullReconciliation( provision.getObjectClass(), handler, diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java index 417d7b3bce..289d5da83e 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/pushpull/stream/StreamPushJobDelegate.java @@ -148,6 +148,8 @@ public class StreamPushJobDelegate extends PushJobDelegate implements SyncopeStr map(implementationDAO::find).filter(Objects::nonNull).collect(Collectors.toList()))); profile.setConflictResolutionAction(ConflictResolutionAction.FIRSTMATCH); + this.task = profile.getTask(); + for (PushActions action : profile.getActions()) { action.beforeAll(profile); } diff --git a/core/provisioning-java/src/main/resources/provisioningContext.xml b/core/provisioning-java/src/main/resources/provisioningContext.xml index f464c7fd15..64b1427632 100644 --- a/core/provisioning-java/src/main/resources/provisioningContext.xml +++ b/core/provisioning-java/src/main/resources/provisioningContext.xml @@ -131,4 +131,12 @@ under the License. </bean> <bean class="org.apache.syncope.core.provisioning.api.IntAttrNameParser"/> + + <bean class="org.apache.syncope.core.provisioning.java.job.JobStatusUpdater" /> + <!-- + This is a special thread executor that only created a single worker thread. + This is necessary to allow job status update operations to queue up serially + and not via multiple threads to avoid the "lost update" problem. + --> + <task:executor id="jobStatusUpdaterThreadExecutor" pool-size="1" /> </beans> diff --git a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java new file mode 100644 index 0000000000..47b4c2ae3b --- /dev/null +++ b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/job/JobStatusUpdaterTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.job; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.syncope.core.persistence.api.dao.JobStatusDAO; +import org.apache.syncope.core.persistence.api.entity.EntityFactory; +import org.apache.syncope.core.provisioning.api.event.JobStatusEvent; +import org.apache.syncope.core.provisioning.java.AbstractTest; +import org.apache.syncope.core.spring.security.SecureRandomUtils; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.transaction.annotation.Transactional; + +@Transactional("Master") +public class JobStatusUpdaterTest extends AbstractTest { + @Autowired + private EntityFactory entityFactory; + + @Autowired + private JobStatusDAO jobStatusDAO; + + @Test + public void verifyUpdate() { + JobStatusUpdater jobStatusUpdater = new JobStatusUpdater(jobStatusDAO, entityFactory); + final String refDesc = "JobRefDesc-" + SecureRandomUtils.generateRandomNumber(); + jobStatusUpdater.update(new JobStatusEvent(this, refDesc, "Started")); + assertNotNull(jobStatusDAO.find(refDesc)); + jobStatusUpdater.update(new JobStatusEvent(this, refDesc, null)); + assertNull(jobStatusDAO.find(refDesc)); + } +} diff --git a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/pushpull/DBPasswordPullActionsTest.java b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/pushpull/DBPasswordPullActionsTest.java index 06b46bccec..52412bdb9f 100644 --- a/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/pushpull/DBPasswordPullActionsTest.java +++ b/core/provisioning-java/src/test/java/org/apache/syncope/core/provisioning/java/pushpull/DBPasswordPullActionsTest.java @@ -28,10 +28,9 @@ import static org.mockito.Mockito.when; import java.util.HashSet; import java.util.Set; -import org.apache.syncope.common.lib.patch.AnyPatch; + import org.apache.syncope.common.lib.patch.PasswordPatch; import org.apache.syncope.common.lib.patch.UserPatch; -import org.apache.syncope.common.lib.to.EntityTO; import org.apache.syncope.common.lib.to.ProvisioningReport; import org.apache.syncope.common.lib.to.UserTO; import org.apache.syncope.common.lib.types.CipherAlgorithm;