Updated Branches: refs/heads/vmsync a72ddad92 -> 309f8da6d
jobs changes Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/2b96665b Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/2b96665b Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/2b96665b Branch: refs/heads/vmsync Commit: 2b96665bf408486f23003f878fa1f05ec5d5b963 Parents: a72ddad Author: Alex Huang <[email protected]> Authored: Fri Jun 14 16:27:01 2013 -0700 Committer: Alex Huang <[email protected]> Committed: Mon Jun 17 17:04:00 2013 -0700 ---------------------------------------------------------------------- .../api/response/AsyncJobResponse.java | 4 +- api/src/org/apache/cloudstack/jobs/Job.java | 67 ------- api/src/org/apache/cloudstack/jobs/JobInfo.java | 65 +++++++ .../com/cloud/vm/VirtualMachineManagerImpl.java | 7 +- .../cloud/vm/VmWorkTestApiJobDispatcher.java | 2 +- .../cloud/async/AsyncJobExecutionContext.java | 173 ------------------ .../cloudstack/framework/jobs/AsyncJob.java | 4 +- .../jobs/AsyncJobExecutionContext.java | 178 +++++++++++++++++++ .../jobs/JobCancellationException.java | 48 +++++ .../cloudstack/framework/jobs/Outcome.java | 62 +++++++ .../framework/jobs/impl/AsyncJobVO.java | 4 +- .../framework/jobs/impl/OutcomeImpl.java | 111 ++++++++++++ .../com/cloud/async/AsyncJobManagerImpl.java | 14 +- .../com/cloud/server/ManagementServerImpl.java | 2 +- .../com/cloud/storage/VolumeManagerImpl.java | 2 +- .../src/com/cloud/vm/SystemVmLoadScanner.java | 2 +- .../vm/VmWorkMockVirtualMachineManagerImpl.java | 2 +- .../cloud/vm/VmWorkTestWorkJobDispatcher.java | 2 +- utils/src/com/cloud/utils/SerialVersionUID.java | 1 + 19 files changed, 486 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/api/src/org/apache/cloudstack/api/response/AsyncJobResponse.java ---------------------------------------------------------------------- diff --git a/api/src/org/apache/cloudstack/api/response/AsyncJobResponse.java b/api/src/org/apache/cloudstack/api/response/AsyncJobResponse.java index 622c7c0..f5c0c1e 100644 --- a/api/src/org/apache/cloudstack/api/response/AsyncJobResponse.java +++ b/api/src/org/apache/cloudstack/api/response/AsyncJobResponse.java @@ -24,11 +24,11 @@ import org.apache.cloudstack.api.ApiConstants; import org.apache.cloudstack.api.BaseResponse; import org.apache.cloudstack.api.EntityReference; import org.apache.cloudstack.api.ResponseObject; -import org.apache.cloudstack.jobs.Job; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.serializer.Param; -@EntityReference(value = Job.class) +@EntityReference(value = JobInfo.class) public class AsyncJobResponse extends BaseResponse { @SerializedName("accountid") @Param(description="the account that executed the async command") http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/api/src/org/apache/cloudstack/jobs/Job.java ---------------------------------------------------------------------- diff --git a/api/src/org/apache/cloudstack/jobs/Job.java b/api/src/org/apache/cloudstack/jobs/Job.java deleted file mode 100644 index d238da3..0000000 --- a/api/src/org/apache/cloudstack/jobs/Job.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.cloudstack.jobs; - -import java.util.Date; - -import org.apache.cloudstack.api.Identity; -import org.apache.cloudstack.api.InternalIdentity; - -public interface Job extends Identity, InternalIdentity { - - - - String getType(); - - String getDispatcher(); - - int getPendingSignals(); - - long getUserId(); - - long getAccountId(); - - String getCmd(); - - int getCmdVersion(); - - String getCmdInfo(); - - int getStatus(); - - int getProcessStatus(); - - int getResultCode(); - - String getResult(); - - Long getInitMsid(); - - Long getExecutingMsid(); - - Long getCompleteMsid(); - - Date getCreated(); - - Date getLastUpdated(); - - Date getLastPolled(); - - String getInstanceType(); - - Long getInstanceId(); -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/api/src/org/apache/cloudstack/jobs/JobInfo.java ---------------------------------------------------------------------- diff --git a/api/src/org/apache/cloudstack/jobs/JobInfo.java b/api/src/org/apache/cloudstack/jobs/JobInfo.java new file mode 100644 index 0000000..bce9627 --- /dev/null +++ b/api/src/org/apache/cloudstack/jobs/JobInfo.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.jobs; + +import java.util.Date; + +import org.apache.cloudstack.api.Identity; +import org.apache.cloudstack.api.InternalIdentity; + +public interface JobInfo extends Identity, InternalIdentity { + + String getType(); + + String getDispatcher(); + + int getPendingSignals(); + + long getUserId(); + + long getAccountId(); + + String getCmd(); + + int getCmdVersion(); + + String getCmdInfo(); + + int getStatus(); + + int getProcessStatus(); + + int getResultCode(); + + String getResult(); + + Long getInitMsid(); + + Long getExecutingMsid(); + + Long getCompleteMsid(); + + Date getCreated(); + + Date getLastUpdated(); + + Date getLastPolled(); + + String getInstanceType(); + + Long getInstanceId(); +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 1f2ca57..4fee49e 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -47,6 +47,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.messagebus.MessageBus; import org.apache.cloudstack.framework.messagebus.MessageDispatcher; @@ -84,7 +85,6 @@ import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.manager.Commands; import com.cloud.agent.manager.allocator.HostAllocator; import com.cloud.alert.AlertManager; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.dao.EntityManager; import com.cloud.dc.ClusterDetailsDao; import com.cloud.dc.ClusterDetailsVO; @@ -186,11 +186,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac protected AgentManager _agentMgr; @Inject protected VMInstanceDao _vmDao; -/* - @Inject - protected ItWorkDao _workDao; -*/ - @Inject protected NicDao _nicsDao; @Inject http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java b/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java index 926fcbb..766929a 100644 --- a/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java +++ b/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java @@ -24,10 +24,10 @@ import javax.inject.Inject; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.vm.jobs.VmWorkJobVO; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.utils.component.AdapterBase; import com.cloud.utils.db.Transaction; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/com/cloud/async/AsyncJobExecutionContext.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/com/cloud/async/AsyncJobExecutionContext.java b/framework/jobs/src/com/cloud/async/AsyncJobExecutionContext.java deleted file mode 100644 index 0e05a98..0000000 --- a/framework/jobs/src/com/cloud/async/AsyncJobExecutionContext.java +++ /dev/null @@ -1,173 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.async; - -import javax.inject.Inject; - -import org.apache.log4j.Logger; - -import org.apache.cloudstack.framework.jobs.AsyncJob; -import org.apache.cloudstack.framework.jobs.AsyncJobConstants; -import org.apache.cloudstack.framework.jobs.AsyncJobManager; -import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; -import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; -import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; -import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem; - -import com.cloud.exception.ConcurrentOperationException; -import com.cloud.exception.InsufficientCapacityException; -import com.cloud.exception.ResourceUnavailableException; -import com.cloud.utils.component.ComponentContext; - -public class AsyncJobExecutionContext { - private AsyncJob _job; - - @Inject private AsyncJobManager _jobMgr; - @Inject private AsyncJobJoinMapDao _joinMapDao; - - private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>(); - - public AsyncJobExecutionContext() { - } - - public AsyncJobExecutionContext(AsyncJob job) { - _job = job; - } - - public SyncQueueItem getSyncSource() { - return _job.getSyncSource(); - } - - public void resetSyncSource() { - _job.setSyncSource(null); - } - - public AsyncJob getJob() { - if(_job == null) { - _job = _jobMgr.getPseudoJob(); - } - - return _job; - } - - public void setJob(AsyncJob job) { - _job = job; - } - - public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) { - assert(_job != null); - _jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject); - } - - public void updateAsyncJobStatus(int processStatus, Object resultObject) { - assert(_job != null); - _jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject); - } - - public void updateAsyncJobAttachment(String instanceType, Long instanceId) { - assert(_job != null); - _jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId); - } - - public void logJobJournal(AsyncJob.JournalType journalType, String journalText, String journalObjJson) { - assert(_job != null); - _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); - } - - public void log(Logger logger, String journalText) { - _jobMgr.logJobJournal(_job.getId(), AsyncJob.JournalType.SUCCESS, journalText, null); - logger.debug(journalText); - } - - public void joinJob(long joinJobId) { - assert(_job != null); - _jobMgr.joinJob(_job.getId(), joinJobId); - } - - public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher, - String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) { - assert(_job != null); - _jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, - wakeupIntervalInMilliSeconds, timeoutInMilliSeconds); - } - - // - // check failure exception before we disjoin the worker job - // TODO : it is ugly and this will become unnecessary after we switch to full-async mode - // - public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, - ConcurrentOperationException, ResourceUnavailableException { - assert(_job != null); - - AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); - if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) { - Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); - if(exception != null && exception instanceof Exception) { - if(exception instanceof InsufficientCapacityException) - throw (InsufficientCapacityException)exception; - else if(exception instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)exception; - else if(exception instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)exception; - else - throw new RuntimeException((Exception)exception); - } - } - - _jobMgr.disjoinJob(_job.getId(), joinedJobId); - } - - public void completeJoin(int joinStatus, String joinResult) { - assert(_job != null); - _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); - } - - public void completeJobAndJoin(int joinStatus, String joinResult) { - assert(_job != null); - _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); - _jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null); - } - - public static AsyncJobExecutionContext getCurrentExecutionContext() { - AsyncJobExecutionContext context = s_currentExectionContext.get(); - if(context == null) { - context = new AsyncJobExecutionContext(); - context = ComponentContext.inject(context); - context.getJob(); - setCurrentExecutionContext(context); - } - - return context; - } - - public static AsyncJobExecutionContext registerPseudoExecutionContext() { - AsyncJobExecutionContext context = s_currentExectionContext.get(); - if (context == null) { - context = new AsyncJobExecutionContext(); - context = ComponentContext.inject(context); - context.getJob(); - setCurrentExecutionContext(context); - } - - return context; - } - - // This is intended to be package level access for AsyncJobManagerImpl only. - static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) { - s_currentExectionContext.set(currentContext); - } -} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java index 8f52073..dfb67f8 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJob.java @@ -19,9 +19,9 @@ package org.apache.cloudstack.framework.jobs; import java.util.Date; import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem; -import org.apache.cloudstack.jobs.Job; +import org.apache.cloudstack.jobs.JobInfo; -public interface AsyncJob extends Job { +public interface AsyncJob extends JobInfo { public enum JournalType { SUCCESS, FAILURE http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java new file mode 100644 index 0000000..3d5c326 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; +import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO; +import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper; +import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem; + +import com.cloud.exception.ConcurrentOperationException; +import com.cloud.exception.InsufficientCapacityException; +import com.cloud.exception.ResourceUnavailableException; +import com.cloud.utils.component.ComponentContext; + +public class AsyncJobExecutionContext { + private AsyncJob _job; + + static private AsyncJobManager _jobMgr; + static private AsyncJobJoinMapDao _joinMapDao; + + public static void init(AsyncJobManager jobMgr, AsyncJobJoinMapDao joinMapDao) { + _jobMgr = jobMgr; + _joinMapDao = joinMapDao; + } + + private static ThreadLocal<AsyncJobExecutionContext> s_currentExectionContext = new ThreadLocal<AsyncJobExecutionContext>(); + + public AsyncJobExecutionContext() { + } + + public AsyncJobExecutionContext(AsyncJob job) { + _job = job; + } + + public SyncQueueItem getSyncSource() { + return _job.getSyncSource(); + } + + public void resetSyncSource() { + _job.setSyncSource(null); + } + + public AsyncJob getJob() { + if(_job == null) { + _job = _jobMgr.getPseudoJob(); + } + + return _job; + } + + public void setJob(AsyncJob job) { + _job = job; + } + + public void completeAsyncJob(int jobStatus, int resultCode, Object resultObject) { + assert(_job != null); + _jobMgr.completeAsyncJob(_job.getId(), jobStatus, resultCode, resultObject); + } + + public void updateAsyncJobStatus(int processStatus, Object resultObject) { + assert(_job != null); + _jobMgr.updateAsyncJobStatus(_job.getId(), processStatus, resultObject); + } + + public void updateAsyncJobAttachment(String instanceType, Long instanceId) { + assert(_job != null); + _jobMgr.updateAsyncJobAttachment(_job.getId(), instanceType, instanceId); + } + + public void logJobJournal(AsyncJob.JournalType journalType, String journalText, String journalObjJson) { + assert(_job != null); + _jobMgr.logJobJournal(_job.getId(), journalType, journalText, journalObjJson); + } + + public void log(Logger logger, String journalText) { + _jobMgr.logJobJournal(_job.getId(), AsyncJob.JournalType.SUCCESS, journalText, null); + logger.debug(journalText); + } + + public void joinJob(long joinJobId) { + assert(_job != null); + _jobMgr.joinJob(_job.getId(), joinJobId); + } + + public void joinJob(long joinJobId, String wakeupHandler, String wakeupDispatcher, + String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) { + assert(_job != null); + _jobMgr.joinJob(_job.getId(), joinJobId, wakeupHandler, wakeupDispatcher, wakeupTopcisOnMessageBus, + wakeupIntervalInMilliSeconds, timeoutInMilliSeconds); + } + + // + // check failure exception before we disjoin the worker job + // TODO : it is ugly and this will become unnecessary after we switch to full-async mode + // + public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, + ConcurrentOperationException, ResourceUnavailableException { + assert(_job != null); + + AsyncJobJoinMapVO record = _joinMapDao.getJoinRecord(_job.getId(), joinedJobId); + if(record.getJoinStatus() == AsyncJobConstants.STATUS_FAILED && record.getJoinResult() != null) { + Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); + if(exception != null && exception instanceof Exception) { + if(exception instanceof InsufficientCapacityException) + throw (InsufficientCapacityException)exception; + else if(exception instanceof ConcurrentOperationException) + throw (ConcurrentOperationException)exception; + else if(exception instanceof ResourceUnavailableException) + throw (ResourceUnavailableException)exception; + else + throw new RuntimeException((Exception)exception); + } + } + + _jobMgr.disjoinJob(_job.getId(), joinedJobId); + } + + public void completeJoin(int joinStatus, String joinResult) { + assert(_job != null); + _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); + } + + public void completeJobAndJoin(int joinStatus, String joinResult) { + assert(_job != null); + _jobMgr.completeJoin(_job.getId(), joinStatus, joinResult); + _jobMgr.completeAsyncJob(_job.getId(), joinStatus, 0, null); + } + + public static AsyncJobExecutionContext getCurrentExecutionContext() { + AsyncJobExecutionContext context = s_currentExectionContext.get(); + if(context == null) { + context = new AsyncJobExecutionContext(); + context = ComponentContext.inject(context); + context.getJob(); + setCurrentExecutionContext(context); + } + + return context; + } + + public static AsyncJobExecutionContext registerPseudoExecutionContext() { + AsyncJobExecutionContext context = s_currentExectionContext.get(); + if (context == null) { + context = new AsyncJobExecutionContext(); + context.getJob(); + setCurrentExecutionContext(context); + } + + return context; + } + + public static AsyncJobExecutionContext unregister() { + AsyncJobExecutionContext context = s_currentExectionContext.get(); + setCurrentExecutionContext(null); + return context; + } + + // This is intended to be package level access for AsyncJobManagerImpl only. + public static void setCurrentExecutionContext(AsyncJobExecutionContext currentContext) { + s_currentExectionContext.set(currentContext); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java new file mode 100644 index 0000000..a433b2b --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/JobCancellationException.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs; + +import com.cloud.utils.SerialVersionUID; +import com.cloud.utils.exception.CloudRuntimeException; + +/** + * This exception is fired when the job has been cancelled + * + */ +public class JobCancellationException extends CloudRuntimeException { + + private static final long serialVersionUID = SerialVersionUID.AffinityConflictException; + + public enum Reason { + RequestedByUser, + RequestedByCaller, + TimedOut; + } + + + Reason reason; + + public JobCancellationException(Reason reason) { + super("The job was cancelled due to " + reason.toString()); + this.reason = reason; + } + + public Reason getReason() { + return reason; + } + +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java new file mode 100644 index 0000000..b400b71 --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/Outcome.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Outcome is returned by clients of jobs framework as a way to wait for the + * outcome of a job. It fully complies with how Future interface is designed. + * In addition, it allows the callee to file a task to be scheduled when the + * job completes. + * + * Note that the callee should schedule a job when using the Task interface. + * It shouldn't try to complete the job in the schedule code as that will take + * up threads in the jobs framework. + * + * For the client of the jobs framework, you can either use the OutcomeImpl + * class to implement this interface or you can add to this interface to + * allow for your specific exceptions to be thrown. + * + * @param <T> Object returned to the callee when the job completes + */ +public interface Outcome<T> extends Future<T> { + AsyncJob getJob(); + + /** + * In addition to the normal Future methods, Outcome allows the ability + * to register a schedule task to be performed when the job is completed. + * + * @param listener + */ + void execute(Task<T> task); + + void execute(Task<T> task, long wait, TimeUnit unit); + + /** + * Listener is used by Outcome to schedule a task to run when a job + * completes. + * + * @param <T> T result returned + */ + public interface Task<T> extends Runnable { + void schedule(AsyncJobExecutionContext context, T result); + + void scheduleOnError(AsyncJobExecutionContext context, Throwable e); + } +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java index 13b0d12..0e103b9 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java @@ -34,7 +34,7 @@ import javax.persistence.TemporalType; import javax.persistence.Transient; import org.apache.cloudstack.framework.jobs.AsyncJob; -import org.apache.cloudstack.jobs.Job; +import org.apache.cloudstack.jobs.JobInfo; import com.cloud.utils.UuidUtils; import com.cloud.utils.db.GenericDao; @@ -43,7 +43,7 @@ import com.cloud.utils.db.GenericDao; @Table(name="async_job") @Inheritance(strategy=InheritanceType.JOINED) @DiscriminatorColumn(name="job_type", discriminatorType=DiscriminatorType.STRING, length=32) -public class AsyncJobVO implements AsyncJob, Job { +public class AsyncJobVO implements AsyncJob, JobInfo { @Id @GeneratedValue(strategy=GenerationType.IDENTITY) http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java ---------------------------------------------------------------------- diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java new file mode 100644 index 0000000..f1e4f4b --- /dev/null +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/OutcomeImpl.java @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.jobs.impl; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobManager; +import org.apache.cloudstack.framework.jobs.Outcome; + +import com.cloud.utils.Predicate; + +public class OutcomeImpl<T> implements Outcome<T> { + protected AsyncJob _job; + protected Class<T> _clazz; + protected String[] _topics; + protected Predicate _predicate; + protected long _checkIntervalInMs; + + protected T _result; + + private static AsyncJobManager s_jobMgr; + + public static void init(AsyncJobManager jobMgr) { + s_jobMgr = jobMgr; + } + + public OutcomeImpl(Class<T> clazz, AsyncJob job, long checkIntervalInMs, Predicate predicate, String... topics) { + _clazz = clazz; + _job = job; + _topics = topics; + _predicate = predicate; + _checkIntervalInMs = checkIntervalInMs; + } + + @Override + public AsyncJob getJob() { + return _job; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + s_jobMgr.waitAndCheck(_topics, _checkIntervalInMs, -1, _predicate); + return retrieve(); + } + + @Override + public T get(long timeToWait, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + s_jobMgr.waitAndCheck(_topics, _checkIntervalInMs, unit.toMillis(timeToWait), _predicate); + return retrieve(); + } + + /** + * This method can be overridden by children classes to retrieve the + * actual object. + */ + protected T retrieve() { + return _result; + } + + protected Outcome<T> set(T result) { + _result = result; + return this; + } + + @Override + public boolean isCancelled() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isDone() { + // TODO Auto-generated method stub + return false; + } + + @Override + public void execute(Task<T> task) { + // TODO Auto-generated method stub + + } + + @Override + public void execute(Task<T> task, long wait, TimeUnit unit) { + // TODO Auto-generated method stub + + } + +} http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/src/com/cloud/async/AsyncJobManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 084585f..332587a 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -42,6 +42,7 @@ import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao; import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao; @@ -51,6 +52,7 @@ import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO; import org.apache.cloudstack.framework.jobs.impl.AsyncJobMBeanImpl; import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; +import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem; import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO; import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager; @@ -537,8 +539,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } _jobMonitor.registerActiveTask(runNumber, job.getId()); - AsyncJobExecutionContext.setCurrentExecutionContext( - (AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) + AsyncJobExecutionContext.setCurrentExecutionContext((AsyncJobExecutionContext)ComponentContext.inject(new AsyncJobExecutionContext(job)) ); // execute the job @@ -847,12 +848,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, @Override public boolean configure(String name, Map<String, Object> params) throws ConfigurationException { - int expireMinutes = NumbersUtil.parseInt( - _configDao.getValue(Config.JobExpireMinutes.key()), 24*60); + int expireMinutes = NumbersUtil.parseInt(_configDao.getValue(Config.JobExpireMinutes.key()), 24 * 60); _jobExpireSeconds = (long)expireMinutes*60; - _jobCancelThresholdSeconds = NumbersUtil.parseInt( - _configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60); + _jobCancelThresholdSeconds = NumbersUtil.parseInt(_configDao.getValue(Config.JobCancelThresholdMinutes.key()), 60); _jobCancelThresholdSeconds *= 60; try { @@ -870,6 +869,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl"); } + AsyncJobExecutionContext.init(this, _joinMapDao); + OutcomeImpl.init(this); + return true; } http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/src/com/cloud/server/ManagementServerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java index 4ad60ea..6de8840 100755 --- a/server/src/com/cloud/server/ManagementServerImpl.java +++ b/server/src/com/cloud/server/ManagementServerImpl.java @@ -425,6 +425,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; @@ -442,7 +443,6 @@ import com.cloud.alert.AlertManager; import com.cloud.alert.AlertVO; import com.cloud.alert.dao.AlertDao; import com.cloud.api.ApiDBUtils; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.capacity.Capacity; import com.cloud.capacity.CapacityVO; import com.cloud.capacity.dao.CapacityDao; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/src/com/cloud/storage/VolumeManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/storage/VolumeManagerImpl.java b/server/src/com/cloud/storage/VolumeManagerImpl.java index 9d3f4df..2eeb206 100644 --- a/server/src/com/cloud/storage/VolumeManagerImpl.java +++ b/server/src/com/cloud/storage/VolumeManagerImpl.java @@ -67,6 +67,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeService.VolumeApiResult; import org.apache.cloudstack.framework.async.AsyncCallFuture; import org.apache.cloudstack.framework.jobs.AsyncJob; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; @@ -79,7 +80,6 @@ import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.api.to.VolumeTO; import com.cloud.alert.AlertManager; import com.cloud.api.ApiDBUtils; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.capacity.CapacityManager; import com.cloud.capacity.dao.CapacityDao; import com.cloud.configuration.Config; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/src/com/cloud/vm/SystemVmLoadScanner.java ---------------------------------------------------------------------- diff --git a/server/src/com/cloud/vm/SystemVmLoadScanner.java b/server/src/com/cloud/vm/SystemVmLoadScanner.java index 3d7953f..704129d 100644 --- a/server/src/com/cloud/vm/SystemVmLoadScanner.java +++ b/server/src/com/cloud/vm/SystemVmLoadScanner.java @@ -23,8 +23,8 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.cloudstack.context.CallContext; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.utils.Pair; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.GlobalLock; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java index 642dfc1..2c1249e 100644 --- a/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java +++ b/server/test/com/cloud/vm/VmWorkMockVirtualMachineManagerImpl.java @@ -26,11 +26,11 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.messagebus.MessageBus; import com.cloud.agent.api.to.NicTO; import com.cloud.agent.api.to.VirtualMachineTO; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.deploy.DeployDestination; import com.cloud.deploy.DeploymentPlan; import com.cloud.exception.AgentUnavailableException; http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java ---------------------------------------------------------------------- diff --git a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java index 288b793..72210b4 100644 --- a/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java +++ b/server/test/com/cloud/vm/VmWorkTestWorkJobDispatcher.java @@ -5,8 +5,8 @@ import org.apache.log4j.Logger; import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobConstants; import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher; +import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; -import com.cloud.async.AsyncJobExecutionContext; import com.cloud.utils.component.AdapterBase; public class VmWorkTestWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher { http://git-wip-us.apache.org/repos/asf/cloudstack/blob/2b96665b/utils/src/com/cloud/utils/SerialVersionUID.java ---------------------------------------------------------------------- diff --git a/utils/src/com/cloud/utils/SerialVersionUID.java b/utils/src/com/cloud/utils/SerialVersionUID.java index 856d563..90fafdd 100755 --- a/utils/src/com/cloud/utils/SerialVersionUID.java +++ b/utils/src/com/cloud/utils/SerialVersionUID.java @@ -62,4 +62,5 @@ public interface SerialVersionUID { public static final long CallFailedException = Base | 0x28; public static final long UnableDeleteHostException = Base | 0x29; public static final long AffinityConflictException = Base | 0x2a; + public static final long JobCancellationException = Base | 0x2b; }
