[SYNCOPE-660]Forcing interrupt for heavy tasks - minor improvement
Project: http://git-wip-us.apache.org/repos/asf/syncope/repo Commit: http://git-wip-us.apache.org/repos/asf/syncope/commit/b13e9fe4 Tree: http://git-wip-us.apache.org/repos/asf/syncope/tree/b13e9fe4 Diff: http://git-wip-us.apache.org/repos/asf/syncope/diff/b13e9fe4 Branch: refs/heads/master Commit: b13e9fe4dd226d7ef48b249447289e3a4451c84b Parents: 0ae4ac8 bdb3257 Author: giacomolm <[email protected]> Authored: Mon Jun 22 16:45:28 2015 +0200 Committer: giacomolm <[email protected]> Committed: Mon Jun 22 16:45:28 2015 +0200 ---------------------------------------------------------------------- .../provisioning/java/job/AbstractTaskJob.java | 29 ++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/syncope/blob/b13e9fe4/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java ---------------------------------------------------------------------- diff --cc core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java index 4c49655,0000000..728ab41 mode 100644,000000..100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java @@@ -1,221 -1,0 +1,216 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.syncope.core.provisioning.java.job; + - import java.text.SimpleDateFormat; +import java.util.Date; - import java.util.Locale; +import java.util.concurrent.atomic.AtomicReference; - import org.apache.syncope.common.lib.SyncopeConstants; +import org.apache.syncope.common.lib.types.AuditElements; +import org.apache.syncope.common.lib.types.AuditElements.Result; +import org.apache.syncope.core.persistence.api.dao.TaskDAO; +import org.apache.syncope.core.persistence.api.dao.TaskExecDAO; +import org.apache.syncope.core.persistence.api.entity.EntityFactory; +import org.apache.syncope.core.persistence.api.entity.task.Task; +import org.apache.syncope.core.persistence.api.entity.task.TaskExec; +import org.apache.syncope.core.provisioning.api.job.TaskJob; +import org.apache.syncope.core.misc.AuditManager; ++import org.apache.syncope.core.misc.DataFormat; +import org.apache.syncope.core.misc.ExceptionUtils2; +import org.apache.syncope.core.persistence.api.dao.ConfDAO; +import org.apache.syncope.core.provisioning.api.notification.NotificationManager; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.UnableToInterruptJobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Abstract job implementation that delegates to concrete implementation the actual job execution and provides some + * base features. + * <strong>Extending this class will not provide support transaction management.</strong><br/> + * Extend <tt>AbstractTransactionalTaskJob</tt> for this purpose. + * + * @see AbstractTransactionalTaskJob + */ +@DisallowConcurrentExecution +public abstract class AbstractTaskJob implements TaskJob { + + /** + * Task execution status. + */ + public enum Status { + + SUCCESS, + FAILURE + + } + + /** + * Logger. + */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractTaskJob.class); + + /** + * Task DAO. + */ + @Autowired + protected TaskDAO taskDAO; + + /** + * Task execution DAO. + */ + @Autowired + private TaskExecDAO taskExecDAO; + + /** + * Configuration DAO. + */ + @Autowired + private ConfDAO confDAO; + + /** + * Notification manager. + */ + @Autowired + private NotificationManager notificationManager; + + /** + * Audit manager. + */ + @Autowired + private AuditManager auditManager; + + @Autowired + private EntityFactory entityFactory; + + /** + * Id, set by the caller, for identifying the task to be executed. + */ + protected Long taskId; + + /** + * The actual task to be executed. + */ + protected Task task; + + /** + * The current running thread containing the task to be executed. + */ + protected AtomicReference<Thread> runningThread = new AtomicReference<Thread>(); + + /** + * Task id setter. + * + * @param taskId to be set + */ + @Override + public void setTaskId(final Long taskId) { + this.taskId = taskId; + } + + @Override + public void execute(final JobExecutionContext context) throws JobExecutionException { + this.runningThread.set(Thread.currentThread()); + task = taskDAO.find(taskId); + if (task == null) { + throw new JobExecutionException("Task " + taskId + " not found"); + } + + TaskExec execution = entityFactory.newEntity(TaskExec.class); + execution.setStartDate(new Date()); + execution.setTask(task); + + Result result; + + try { + execution.setMessage(doExecute(context.getMergedJobDataMap().getBoolean(DRY_RUN_JOBDETAIL_KEY))); + execution.setStatus(Status.SUCCESS.name()); + result = Result.SUCCESS; + } catch (JobExecutionException e) { + LOG.error("While executing task " + taskId, e); + result = Result.FAILURE; + + execution.setMessage(ExceptionUtils2.getFullStackTrace(e)); + execution.setStatus(Status.FAILURE.name()); + } + execution.setEndDate(new Date()); + + if (hasToBeRegistered(execution)) { + taskExecDAO.saveAndAdd(taskId, execution); + } + task = taskDAO.save(task); + + notificationManager.createTasks( + AuditElements.EventCategoryType.TASK, + this.getClass().getSimpleName(), + null, + this.getClass().getSimpleName(), // searching for before object is too much expensive ... + result, + task, + execution); + + auditManager.audit( + AuditElements.EventCategoryType.TASK, + task.getClass().getSimpleName(), + null, + null, // searching for before object is too much expensive ... + result, + task, + (Object[]) null); + } + + /** + * The actual execution, delegated to child classes. + * + * @param dryRun whether to actually touch the data + * @return the task execution status to be set + * @throws JobExecutionException if anything goes wrong + */ + protected abstract String doExecute(boolean dryRun) throws JobExecutionException; + + /** + * Template method to determine whether this job's task execution has to be persisted or not. + * + * @param execution task execution + * @return wether to persist or not + */ + protected boolean hasToBeRegistered(final TaskExec execution) { + return false; + } + + @Override + public void interrupt() throws UnableToInterruptJobException { + Thread thread = this.runningThread.getAndSet(null); - if (thread != null) { - LOG.info("Interrupting job time {} ", (new SimpleDateFormat(SyncopeConstants.DEFAULT_DATE_PATTERN, Locale. - getDefault())).format(new Date())); - thread.interrupt(); ++ if (thread == null) { ++ LOG.warn("Unable to retrieve the thread of the current job execution"); ++ } else { ++ LOG.info("Interrupting job from thread {} at {} ", thread.getId(), DataFormat.format(new Date())); ++ ++ long maxRetry = confDAO.find("tasks.interruptMaxRetries", "1").getValues().get(0).getLongValue(); ++ for (int i = 0; i < maxRetry && thread.isAlive(); i++) { ++ thread.interrupt(); ++ } ++ // if the thread is still alive, it should be available in the next stop + if (thread.isAlive()) { - long maxRetry = confDAO.find("tasks.interruptMaxRetries", "0").getValues().get(0).getLongValue(); - for (int i = 0; i <= maxRetry && thread.isAlive(); i++) { - thread.interrupt(); - } - //if the thread is still alive, it should be available in the next stop - if (thread.isAlive()) { - this.runningThread.set(thread); - } ++ this.runningThread.set(thread); + } - } else { - LOG.warn("Unable to retrieve the right thread related to the current job execution"); + } + } +}
