CLOUDSTACK-8061: Extracting volume when it is in migrating state causes
both the operations to fail.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/fbe54974
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/fbe54974
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/fbe54974
Branch: refs/heads/master
Commit: fbe54974c514572d1c224528446618c31952e25a
Parents: 9aefd9b
Author: Min Chen <[email protected]>
Authored: Wed Dec 10 14:16:00 2014 -0800
Committer: Min Chen <[email protected]>
Committed: Thu Dec 11 11:36:34 2014 -0800
----------------------------------------------------------------------
.../src/com/cloud/vm/VmWorkExtractVolume.java | 38 ++++++
.../com/cloud/storage/VolumeApiServiceImpl.java | 124 ++++++++++++++++++-
2 files changed, 156 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/fbe54974/engine/components-api/src/com/cloud/vm/VmWorkExtractVolume.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkExtractVolume.java
b/engine/components-api/src/com/cloud/vm/VmWorkExtractVolume.java
new file mode 100644
index 0000000..82b5db4
--- /dev/null
+++ b/engine/components-api/src/com/cloud/vm/VmWorkExtractVolume.java
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+public class VmWorkExtractVolume extends VmWork {
+ private static final long serialVersionUID = -565778516928408602L;
+
+ private long volumeId;
+ private long zoneId;
+
+ public VmWorkExtractVolume(long userId, long accountId, long vmId, String
handlerName, long volumeId, long zoneId) {
+ super(userId, accountId, vmId, handlerName);
+ this.volumeId = volumeId;
+ this.zoneId = zoneId;
+ }
+
+ public long getVolumeId() {
+ return volumeId;
+ }
+
+ public long getZoneId() {
+ return zoneId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/fbe54974/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index 9deaae2..0914c57 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -141,6 +141,7 @@ import com.cloud.vm.VmWork;
import com.cloud.vm.VmWorkAttachVolume;
import com.cloud.vm.VmWorkConstants;
import com.cloud.vm.VmWorkDetachVolume;
+import com.cloud.vm.VmWorkExtractVolume;
import com.cloud.vm.VmWorkJobHandler;
import com.cloud.vm.VmWorkJobHandlerProxy;
import com.cloud.vm.VmWorkMigrateVolume;
@@ -2043,14 +2044,70 @@ public class VolumeApiServiceImpl extends ManagerBase
implements VolumeApiServic
return volumeStoreRef.getExtractUrl();
}
- dataStoreMgr.getPrimaryDataStore(volume.getPoolId());
- ImageStoreEntity secStore =
(ImageStoreEntity)dataStoreMgr.getImageStore(zoneId);
- secStore.getUri();
+ VMInstanceVO vm = null;
+ if (volume.getInstanceId() != null) {
+ vm = _vmInstanceDao.findById(volume.getInstanceId());
+ }
+
+ if (vm != null) {
+ // serialize VM operation
+ AsyncJobExecutionContext jobContext =
AsyncJobExecutionContext.getCurrentExecutionContext();
+ if
(jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
+ // avoid re-entrance
+
+ VmWorkJobVO placeHolder = null;
+ placeHolder = createPlaceHolderWork(vm.getId());
+ try {
+ return orchestrateExtractVolume(volume.getId(), zoneId);
+ } finally {
+ _workJobDao.expunge(placeHolder.getId());
+ }
+ } else {
+ Outcome<String> outcome =
extractVolumeThroughJobQueue(vm.getId(), volume.getId(), zoneId);
+
+ try {
+ outcome.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Operation is interrupted", e);
+ } catch (java.util.concurrent.ExecutionException e) {
+ throw new RuntimeException("Execution excetion", e);
+ }
+
+ Object jobResult =
_jobMgr.unmarshallResultObject(outcome.getJob());
+ if (jobResult != null) {
+ if (jobResult instanceof ConcurrentOperationException)
+ throw (ConcurrentOperationException)jobResult;
+ else if (jobResult instanceof RuntimeException)
+ throw (RuntimeException)jobResult;
+ else if (jobResult instanceof Throwable)
+ throw new RuntimeException("Unexpected exception",
(Throwable)jobResult);
+ }
+
+ // retrieve the entity url from job result
+ if (jobResult != null && jobResult instanceof String) {
+ return (String)jobResult;
+ }
+ return null;
+ }
+ }
+
+ return orchestrateExtractVolume(volume.getId(), zoneId);
+ }
+
+ private String orchestrateExtractVolume(long volumeId, long zoneId) {
+ // get latest volume state to make sure that it is not updated by
other parallel operations
+ VolumeVO volume = _volsDao.findById(volumeId);
+ if (volume == null || volume.getState() != Volume.State.Ready) {
+ throw new InvalidParameterValueException("Volume to be extracted
has been removed or not in right state!");
+ }
+ // perform extraction
+ ImageStoreEntity secStore =
(ImageStoreEntity)dataStoreMgr.getImageStore(zoneId);
String value = _configDao.getValue(Config.CopyVolumeWait.toString());
NumbersUtil.parseInt(value,
Integer.parseInt(Config.CopyVolumeWait.getDefaultValue()));
+
// Copy volume from primary to secondary storage
- VolumeInfo srcVol = volFactory.getVolume(volume.getId());
+ VolumeInfo srcVol = volFactory.getVolume(volumeId);
AsyncCallFuture<VolumeApiResult> cvAnswer =
volService.copyVolume(srcVol, secStore);
// Check if you got a valid answer.
VolumeApiResult cvResult = null;
@@ -2071,11 +2128,10 @@ public class VolumeApiServiceImpl extends ManagerBase
implements VolumeApiServic
VolumeInfo vol = cvResult.getVolume();
String extractUrl = secStore.createEntityExtractUrl(vol.getPath(),
vol.getFormat(), vol);
- volumeStoreRef = _volumeStoreDao.findByVolume(volumeId);
+ VolumeDataStoreVO volumeStoreRef =
_volumeStoreDao.findByVolume(volumeId);
volumeStoreRef.setExtractUrl(extractUrl);
volumeStoreRef.setExtractUrlCreated(DateUtil.now());
_volumeStoreDao.update(volumeStoreRef.getId(), volumeStoreRef);
-
return extractUrl;
}
@@ -2350,6 +2406,23 @@ public class VolumeApiServiceImpl extends ManagerBase
implements VolumeApiServic
_storagePoolAllocators = storagePoolAllocators;
}
+ public class VmJobVolumeUrlOutcome extends OutcomeImpl<String> {
+
+ public VmJobVolumeUrlOutcome(final AsyncJob job) {
+ super(String.class, job, VmJobCheckInterval.value(), new
Predicate() {
+ @Override
+ public boolean checkCondition() {
+ AsyncJobVO jobVo = _entityMgr.findById(AsyncJobVO.class,
job.getId());
+ assert (jobVo != null);
+ if (jobVo == null || jobVo.getStatus() !=
JobInfo.Status.IN_PROGRESS)
+ return true;
+
+ return false;
+ }
+ }, AsyncJob.Topics.JOB_STATE);
+ }
+ }
+
public class VmJobVolumeOutcome extends OutcomeImpl<Volume> {
private long _volumeId;
@@ -2498,6 +2571,39 @@ public class VolumeApiServiceImpl extends ManagerBase
implements VolumeApiServic
return new VmJobVolumeOutcome(workJob,volumeId);
}
+ public Outcome<String> extractVolumeThroughJobQueue(final Long vmId, final
long volumeId,
+ final long zoneId) {
+
+ final CallContext context = CallContext.current();
+ final User callingUser = context.getCallingUser();
+ final Account callingAccount = context.getCallingAccount();
+
+ final VMInstanceVO vm = _vmInstanceDao.findById(vmId);
+
+ VmWorkJobVO workJob = new VmWorkJobVO(context.getContextId());
+
+ workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER);
+ workJob.setCmd(VmWorkExtractVolume.class.getName());
+
+ workJob.setAccountId(callingAccount.getId());
+ workJob.setUserId(callingUser.getId());
+ workJob.setStep(VmWorkJobVO.Step.Starting);
+ workJob.setVmType(VirtualMachine.Type.Instance);
+ workJob.setVmInstanceId(vm.getId());
+ workJob.setRelated(AsyncJobExecutionContext.getOriginJobId());
+
+ // save work context info (there are some duplications)
+ VmWorkExtractVolume workInfo = new
VmWorkExtractVolume(callingUser.getId(), callingAccount.getId(), vm.getId(),
+ VolumeApiServiceImpl.VM_WORK_JOB_HANDLER, volumeId, zoneId);
+ workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));
+
+ _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE,
vm.getId());
+
+
AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId());
+
+ return new VmJobVolumeUrlOutcome(workJob);
+ }
+
public Outcome<Volume> migrateVolumeThroughJobQueue(final Long vmId, final
long volumeId,
final long destPoolId, final boolean liveMigrate) {
@@ -2566,6 +2672,12 @@ public class VolumeApiServiceImpl extends ManagerBase
implements VolumeApiServic
}
@ReflectionUse
+ private Pair<JobInfo.Status, String>
orchestrateExtractVolume(VmWorkExtractVolume work) throws Exception {
+ String volUrl = orchestrateExtractVolume(work.getVolumeId(),
work.getZoneId());
+ return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
_jobMgr.marshallResultObject(volUrl));
+ }
+
+ @ReflectionUse
private Pair<JobInfo.Status, String>
orchestrateAttachVolumeToVM(VmWorkAttachVolume work) throws Exception {
Volume vol = orchestrateAttachVolumeToVM(work.getVmId(),
work.getVolumeId(), work.getDeviceId());