Updated Branches: refs/heads/vmsync 5585b5ea6 -> 55935f986
more job cleanup Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/55935f98 Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/55935f98 Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/55935f98 Branch: refs/heads/vmsync Commit: 55935f98688b1f4b9740d7941c8329071943b48e Parents: 5585b5e Author: Kelven Yang <kelv...@gmail.com> Authored: Tue Apr 9 11:14:49 2013 -0700 Committer: Kelven Yang <kelv...@gmail.com> Committed: Tue Apr 9 11:14:49 2013 -0700 ---------------------------------------------------------------------- client/tomcatconf/applicationContext.xml.in | 1 - .../com/cloud/async/AsyncJobExecutionContext.java | 55 ++++++++++++ server/src/com/cloud/async/AsyncJobExecutor.java | 39 -------- server/src/com/cloud/async/AsyncJobManager.java | 2 +- .../src/com/cloud/async/AsyncJobManagerImpl.java | 28 ++++--- server/src/com/cloud/async/AsyncJobResult.java | 9 -- .../src/com/cloud/async/BaseAsyncJobExecutor.java | 69 --------------- server/src/com/cloud/async/SyncQueueManager.java | 1 - .../src/com/cloud/server/ManagementServerImpl.java | 8 +- .../src/com/cloud/storage/VolumeManagerImpl.java | 18 ++-- 10 files changed, 85 insertions(+), 145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/client/tomcatconf/applicationContext.xml.in ---------------------------------------------------------------------- diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in index ca6b402..e492f18 100644 --- a/client/tomcatconf/applicationContext.xml.in +++ b/client/tomcatconf/applicationContext.xml.in @@ -594,7 +594,6 @@ <bean id="apiRateLimitServiceImpl" class="org.apache.cloudstack.ratelimit.ApiRateLimitServiceImpl"/> <bean id="alertManagerImpl" class="com.cloud.alert.AlertManagerImpl" /> - <bean id="asyncJobExecutorContextImpl" class="com.cloud.async.AsyncJobExecutorContextImpl" /> <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl" /> <bean id="autoScaleManagerImpl" class="com.cloud.network.as.AutoScaleManagerImpl" /> <bean id="capacityManagerImpl" class="com.cloud.capacity.CapacityManagerImpl" /> http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/AsyncJobExecutionContext.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobExecutionContext.java b/server/src/com/cloud/async/AsyncJobExecutionContext.java new file mode 100644 index 0000000..d8cf3aa --- /dev/null +++ b/server/src/com/cloud/async/AsyncJobExecutionContext.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.async; + +public class AsyncJobExecutionContext { + private SyncQueueItemVO _syncSource; + private AsyncJobVO _job; + + private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>(); + + public AsyncJobExecutionContext() { + } + + public AsyncJobExecutionContext(AsyncJobVO job) { + _job = job; + } + + public SyncQueueItemVO getSyncSource() { + return _syncSource; + } + + public void setSyncSource(SyncQueueItemVO syncSource) { + _syncSource = syncSource; + } + + public AsyncJobVO getJob() { + return _job; + } + + public void setJob(AsyncJobVO job) { + _job = job; + } + + public static AsyncJobExecutionContext getCurrentExecutionContext() { + return s_currentExectionContext.get(); + } + + public static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) { + s_currentExectionContext.set(currentContext); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/AsyncJobExecutor.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobExecutor.java b/server/src/com/cloud/async/AsyncJobExecutor.java deleted file mode 100644 index d224c8f..0000000 --- a/server/src/com/cloud/async/AsyncJobExecutor.java +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - - -public interface AsyncJobExecutor { - public AsyncJobManager getAsyncJobMgr(); - public void setAsyncJobMgr(AsyncJobManager asyncMgr); - public SyncQueueItemVO getSyncSource(); - public void setSyncSource(SyncQueueItemVO syncSource); - public AsyncJobVO getJob(); - public void setJob(AsyncJobVO job); - public void setFromPreviousSession(boolean value); - public boolean isFromPreviousSession(); - - /** - * - * otherwise return false and once the executor finally has completed with the sync source, - * it needs to call AsyncJobManager.releaseSyncSource - * - * if executor does not have a sync source, always return true - */ - public boolean execute(); -} - http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/AsyncJobManager.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index 206df6a..f7ffc5a 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -36,7 +36,7 @@ public interface AsyncJobManager extends Manager { public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); - public void releaseSyncSource(AsyncJobExecutor executor); + public void releaseSyncSource(); public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 761ac36..85e2a56 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -325,7 +325,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, txt.start(); AsyncJobVO job = _jobDao.findById(jobId); if(job != null) { - jobResult.setCmdOriginator(job.getCmdOriginator()); jobResult.setJobStatus(job.getStatus()); jobResult.setProcessStatus(job.getProcessStatus()); jobResult.setResult(job.getResult()); @@ -391,6 +390,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } catch(Exception e) { s_logger.warn("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e)); } + + AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job)); BaseAsyncCmd cmdObj = null; Transaction txn = Transaction.open(Transaction.CLOUD_DB); @@ -500,6 +501,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, s_logger.error("Caught: " + th); } catch (Throwable th2) { } + } finally { + AsyncJobExecutionContext.setCurrentExecutionContext(null); } } }; @@ -535,17 +538,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } @Override - public void releaseSyncSource(AsyncJobExecutor executor) { - if(executor.getSyncSource() != null) { + public void releaseSyncSource() { + AsyncJobExecutionContext executionContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + assert(executionContext != null); + + if(executionContext.getSyncSource() != null) { if(s_logger.isDebugEnabled()) { - s_logger.debug("Release sync source for job-" + executor.getJob().getId() + " sync source: " - + executor.getSyncSource().getContentType() + "-" - + executor.getSyncSource().getContentId()); - } - - _queueMgr.purgeItem(executor.getSyncSource().getId()); - checkQueue(executor.getSyncSource().getQueueId()); - } + s_logger.debug("Release sync source for job-" + executionContext.getJob().getId() + " sync source: " + + executionContext.getSyncSource().getContentType() + "-" + + executionContext.getSyncSource().getContentId()); + } + + _queueMgr.purgeItem(executionContext.getSyncSource().getId()); + checkQueue(executionContext.getSyncSource().getQueueId()); + } } private void checkQueue(long queueId) { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/AsyncJobResult.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobResult.java b/server/src/com/cloud/async/AsyncJobResult.java index cf343ea..4c37929 100644 --- a/server/src/com/cloud/async/AsyncJobResult.java +++ b/server/src/com/cloud/async/AsyncJobResult.java @@ -23,7 +23,6 @@ public class AsyncJobResult { public static final int STATUS_SUCCEEDED = 1; public static final int STATUS_FAILED = 2; - private String cmdOriginator; private long jobId; private int jobStatus; private int processStatus; @@ -39,14 +38,6 @@ public class AsyncJobResult { result = ""; } - public String getCmdOriginator() { - return cmdOriginator; - } - - public void setCmdOriginator(String cmdOriginator) { - this.cmdOriginator = cmdOriginator; - } - public long getJobId() { return jobId; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/BaseAsyncJobExecutor.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/BaseAsyncJobExecutor.java b/server/src/com/cloud/async/BaseAsyncJobExecutor.java deleted file mode 100644 index 122b34b..0000000 --- a/server/src/com/cloud/async/BaseAsyncJobExecutor.java +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - - -public abstract class BaseAsyncJobExecutor implements AsyncJobExecutor { - private SyncQueueItemVO _syncSource; - private AsyncJobVO _job; - private boolean _fromPreviousSession; - private AsyncJobManager _asyncJobMgr; - - private static ThreadLocal<AsyncJobExecutor> s_currentExector = new ThreadLocal<AsyncJobExecutor>(); - - public AsyncJobManager getAsyncJobMgr() { - return _asyncJobMgr; - } - - public void setAsyncJobMgr(AsyncJobManager asyncMgr) { - _asyncJobMgr = asyncMgr; - } - - public SyncQueueItemVO getSyncSource() { - return _syncSource; - } - - public void setSyncSource(SyncQueueItemVO syncSource) { - _syncSource = syncSource; - } - - public AsyncJobVO getJob() { - return _job; - } - - public void setJob(AsyncJobVO job) { - _job = job; - } - - public void setFromPreviousSession(boolean value) { - _fromPreviousSession = value; - } - - public boolean isFromPreviousSession() { - return _fromPreviousSession; - } - - public abstract boolean execute(); - - public static AsyncJobExecutor getCurrentExecutor() { - return s_currentExector.get(); - } - - public static void setCurrentExecutor(AsyncJobExecutor currentExecutor) { - s_currentExector.set(currentExecutor); - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/async/SyncQueueManager.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/SyncQueueManager.java b/server/src/com/cloud/async/SyncQueueManager.java index a7032da..fb4f2d3 100644 --- a/server/src/com/cloud/async/SyncQueueManager.java +++ b/server/src/com/cloud/async/SyncQueueManager.java @@ -20,7 +20,6 @@ import java.util.List; import com.cloud.utils.component.Manager; - public interface SyncQueueManager extends Manager { public SyncQueueVO queue(String syncObjType, long syncObjId, String itemType, long itemId, long queueSizeLimit); public SyncQueueItemVO dequeueFromOne(long queueId, Long msid); http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/server/ManagementServerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java index d0904e1..b8de365 100755 --- a/server/src/com/cloud/server/ManagementServerImpl.java +++ b/server/src/com/cloud/server/ManagementServerImpl.java @@ -2833,9 +2833,9 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe volume.getName(), ApiDBUtils.findAccountById(accountId).getUuid(), UploadVO.Status.COPY_IN_PROGRESS.toString(), uploadJob.getUuid()); resultObj.setResponseName(cmd.getCommandName()); - AsyncJobExecutor asyncExecutor = BaseAsyncJobExecutor.getCurrentExecutor(); - if (asyncExecutor != null) { - job = asyncExecutor.getJob(); + AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (asyncExecutionContext != null) { + job = asyncExecutionContext.getJob(); _asyncMgr.updateAsyncJobAttachment(job.getId(), Upload.Type.VOLUME.toString(), volumeId); _asyncMgr.updateAsyncJobStatus(job.getId(), AsyncJobResult.STATUS_IN_PROGRESS, resultObj); } @@ -2857,7 +2857,7 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe // Update the async job. resultObj.setResultString(errorString); resultObj.setUploadStatus(UploadVO.Status.COPY_ERROR.toString()); - if (asyncExecutor != null) { + if (asyncExecutionContext != null) { _asyncMgr.completeAsyncJob(job.getId(), AsyncJobResult.STATUS_FAILED, 0, resultObj); } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/55935f98/server/src/com/cloud/storage/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/storage/VolumeManagerImpl.java b/server/src/com/cloud/storage/VolumeManagerImpl.java index ff0235f..1f58a90 100644 --- a/server/src/com/cloud/storage/VolumeManagerImpl.java +++ b/server/src/com/cloud/storage/VolumeManagerImpl.java @@ -71,10 +71,9 @@ import com.cloud.agent.api.AttachVolumeCommand; import com.cloud.agent.api.to.VolumeTO; import com.cloud.alert.AlertManager; import com.cloud.api.ApiDBUtils; -import com.cloud.async.AsyncJobExecutor; import com.cloud.async.AsyncJobManager; import com.cloud.async.AsyncJobVO; -import com.cloud.async.BaseAsyncJobExecutor; +import com.cloud.async.AsyncJobExecutionContext; import com.cloud.capacity.CapacityManager; import com.cloud.capacity.dao.CapacityDao; import com.cloud.configuration.Config; @@ -1743,10 +1742,10 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager { } - AsyncJobExecutor asyncExecutor = BaseAsyncJobExecutor - .getCurrentExecutor(); - if (asyncExecutor != null) { - AsyncJobVO job = asyncExecutor.getJob(); + AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + + if (asyncExecutionContext != null) { + AsyncJobVO job = asyncExecutionContext.getJob(); if (s_logger.isInfoEnabled()) { s_logger.info("Trying to attaching volume " + volumeId @@ -1826,10 +1825,9 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager { "Please specify a VM that is either running or stopped."); } - AsyncJobExecutor asyncExecutor = BaseAsyncJobExecutor - .getCurrentExecutor(); - if (asyncExecutor != null) { - AsyncJobVO job = asyncExecutor.getJob(); + AsyncJobExecutionContext asyncExecutionContext = AsyncJobExecutionContext.getCurrentExecutionContext(); + if (asyncExecutionContext != null) { + AsyncJobVO job = asyncExecutionContext.getJob(); if (s_logger.isInfoEnabled()) { s_logger.info("Trying to attaching volume " + volumeId