Remove cancelled jobs from job monitoring, correct mis-calculated time-unit in job cancellation.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/5dd4fb22 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/5dd4fb22 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/5dd4fb22 Branch: refs/heads/rbac Commit: 5dd4fb22eff5ec6df3c79bab0ebb99226c954e9b Parents: bbf5a91 Author: Kelven Yang <kelv...@gmail.com> Authored: Mon Mar 3 16:09:40 2014 -0800 Committer: Kelven Yang <kelv...@gmail.com> Committed: Mon Mar 3 17:44:58 2014 -0800 ---------------------------------------------------------------------- .../jobs/impl/AsyncJobManagerImpl.java | 15 ++++++---- .../framework/jobs/impl/AsyncJobMonitor.java | 30 ++++++++++++++++---- 2 files changed, 35 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 2be2786..b9246aa 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -85,12 +85,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", "1440", "Time (in minutes) for async-jobs to be kept in system", true, ConfigKey.Scope.Global, 60l); private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", "60", - "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 60l); + "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, ConfigKey.Scope.Global, 240l); private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class); private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds - private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_SYNC = 60; // 60 seconds private static final int MAX_ONETIME_SCHEDULE_SIZE = 50; private static final int HEARTBEAT_INTERVAL = 2000; @@ -706,14 +705,16 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, try { s_logger.trace("Begin cleanup expired async-jobs"); - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 1000); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute // hopefully this will be fast enough to balance potential growth of job table //1) Expire unfinished jobs that weren't processed yet List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); for (AsyncJobVO job : l) { - s_logger.trace("Expunging unfinished job " + job); + s_logger.info("Expunging unfinished job " + job); + + _jobMonitor.unregisterByJobId(job.getId()); expungeAsyncJob(job); } @@ -721,15 +722,19 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); for (AsyncJobVO job : completedJobs) { s_logger.trace("Expunging completed job " + job); + expungeAsyncJob(job); } // forcefully cancel blocking queue items if they've been staying there for too long - List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 1000, false); + List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false); if (blockItems != null && blockItems.size() > 0) { for (SyncQueueItemVO item : blockItems) { if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { + s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); + + _jobMonitor.unregisterByJobId(item.getContentId()); } // purge the item and resume queue processing http://git-wip-us.apache.org/repos/asf/cloudstack/blob/5dd4fb22/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java index 6718181..0b6f7a5 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobMonitor.java @@ -17,6 +17,7 @@ package org.apache.cloudstack.framework.jobs.impl; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Timer; import java.util.concurrent.atomic.AtomicInteger; @@ -38,8 +39,7 @@ import com.cloud.utils.component.ManagerBase; public class AsyncJobMonitor extends ManagerBase { public static final Logger s_logger = Logger.getLogger(AsyncJobMonitor.class); - @Inject - private MessageBus _messageBus; + @Inject private MessageBus _messageBus; private final Map<Long, ActiveTaskRecord> _activeTasks = new HashMap<Long, ActiveTaskRecord>(); private final Timer _timer = new Timer(); @@ -86,15 +86,16 @@ public class AsyncJobMonitor extends ManagerBase { synchronized (this) { for (Map.Entry<Long, ActiveTaskRecord> entry : _activeTasks.entrySet()) { if (entry.getValue().millisSinceLastJobHeartbeat() > _inactivityWarningThresholdMs) { - s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + - " seconds"); + s_logger.warn("Task (job-" + entry.getValue().getJobId() + ") has been pending for " + + entry.getValue().millisSinceLastJobHeartbeat() / 1000 + " seconds"); } } } } @Override - public boolean configure(String name, Map<String, Object> params) throws ConfigurationException { + public boolean configure(String name, Map<String, Object> params) + throws ConfigurationException { _messageBus.subscribe(AsyncJob.Topics.JOB_HEARTBEAT, MessageDispatcher.getDispatcher(this)); _timer.scheduleAtFixedRate(new ManagedContextTimerTask() { @@ -141,6 +142,25 @@ public class AsyncJobMonitor extends ManagerBase { } } + public void unregisterByJobId(long jobId) { + synchronized (this) { + Iterator<Map.Entry<Long, ActiveTaskRecord>> it = _activeTasks.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Long, ActiveTaskRecord> entry = it.next(); + if (entry.getValue().getJobId() == jobId) { + s_logger.info("Remove Job-" + entry.getValue().getJobId() + " from job monitoring due to job cancelling"); + + if (entry.getValue().isPoolThread()) + _activePoolThreads.decrementAndGet(); + else + _activeInplaceThreads.decrementAndGet(); + + it.remove(); + } + } + } + } + public int getActivePoolThreads() { return _activePoolThreads.get(); }