DaanHoogland commented on a change in pull request #4053:
URL: https://github.com/apache/cloudstack/pull/4053#discussion_r426669651



##########
File path: 
server/src/main/java/com/cloud/storage/secondary/SecondaryStorageVmManager.java
##########
@@ -31,6 +32,7 @@
     public static final int DEFAULT_SS_VM_CPUMHZ = 500;             // 500 MHz
     public static final int DEFAULT_SS_VM_MTUSIZE = 1500;
     public static final int DEFAULT_SS_VM_CAPACITY = 50;            // max 
command execution session per SSVM
+    public static final int DEFAULT_MIGRATE_SS_VM_CAPACITY = 2;     // number 
of concurrent migrate operations to happen per SSVM

Review comment:
       is this being overridden by a global setting somewhere, or a hard-coded 
restiction

##########
File path: 
api/src/main/java/org/apache/cloudstack/api/command/admin/storage/MigrateSecondaryStorageDataCmd.java
##########
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.command.admin.storage;
+
+import java.util.List;
+
+import org.apache.cloudstack.acl.RoleType;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseAsyncCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.response.ImageStoreResponse;
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
+
+import com.cloud.event.EventTypes;
+import com.cloud.utils.StringUtils;
+
+@APICommand(name = MigrateSecondaryStorageDataCmd.APINAME,
+        description = "migrates data objects from one secondary storage to 
destination image store(s)",
+        responseObject = MigrationResponse.class,
+        requestHasSensitiveInfo = false,
+        responseHasSensitiveInfo = false,
+        since = "4.14.0",

Review comment:
       we wont make `"4.14.0"` ;)

##########
File path: 
api/src/main/java/org/apache/cloudstack/api/command/admin/storage/MigrateSecondaryStorageDataCmd.java
##########
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.command.admin.storage;
+
+import java.util.List;
+
+import org.apache.cloudstack.acl.RoleType;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseAsyncCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.response.ImageStoreResponse;
+import org.apache.cloudstack.api.response.MigrationResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
+
+import com.cloud.event.EventTypes;
+import com.cloud.utils.StringUtils;
+
+@APICommand(name = MigrateSecondaryStorageDataCmd.APINAME,
+        description = "migrates data objects from one secondary storage to 
destination image store(s)",
+        responseObject = MigrationResponse.class,
+        requestHasSensitiveInfo = false,
+        responseHasSensitiveInfo = false,
+        since = "4.14.0",
+        authorized = {RoleType.Admin})
+public class MigrateSecondaryStorageDataCmd extends BaseAsyncCmd {
+
+    public static final Logger s_logger = 
Logger.getLogger(MigrateSecondaryStorageDataCmd.class.getName());

Review comment:
       this naming convention is not advices for static final fields rather use 
`LOG` or `LOGGER`

##########
File path: 
api/src/main/java/org/apache/cloudstack/api/command/admin/storage/ListImageStoresCmd.java
##########
@@ -52,6 +51,9 @@
     @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = 
ImageStoreResponse.class, description = "the ID of the storage pool")
     private Long id;
 
+    @Parameter(name = ApiConstants.READ_ONLY, type = CommandType.BOOLEAN, 
entityType = ImageStoreResponse.class, description = "read-only status of the 
image store")

Review comment:
       please add `since = "4.15"`

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */

Review comment:
       So, if there is any file in any other state, the migration is not ready, 
is it?

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {

Review comment:
       filesReady in this case means "files ready to migrate" does it? if so 
can we rename it that please?

##########
File path: 
services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java
##########
@@ -135,13 +152,43 @@ public boolean configure(String name, Map<String, Object> 
params) throws Configu
             }
 
             alreadyRunning = 
_secStorageVmDao.getSecStorageVmListInStates(null, dataCenterId, State.Running, 
State.Migrating, State.Starting);
-
             List<CommandExecLogVO> activeCmds = 
findActiveCommands(dataCenterId, cutTime);
+            // Find running copy / migrate commands running arranged in 
ascending order of their creation time i.e., oldest first
+            List<CommandExecLogVO> copyCmdsInPipeline = 
findAllActiveCopyCommands(dataCenterId, cutTime);
+            // Count of total hosts
+            Integer hostsCount = _hostDao.countAllByType(Host.Type.Routing);
+            // Maximum number of allowed SSVMs for migration task
+            Integer maxSsvms = (hostsCount < 
MaxNumberOfSsvmsForMigration.value()) ? hostsCount : 
MaxNumberOfSsvmsForMigration.value();
+            int halfLimit = Math.round((float) (alreadyRunning.size() * 
migrateCapPerSSVM) / 2);
+            currentTime = DateUtil.currentGMTTime().getTime();
+

Review comment:
       please factor out in a suitably named method.

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;

Review comment:
       no 0 possible for equal?

##########
File path: 
services/secondary-storage/controller/src/main/java/org/apache/cloudstack/secondarystorage/PremiumSecondaryStorageManagerImpl.java
##########
@@ -135,13 +152,43 @@ public boolean configure(String name, Map<String, Object> 
params) throws Configu
             }
 
             alreadyRunning = 
_secStorageVmDao.getSecStorageVmListInStates(null, dataCenterId, State.Running, 
State.Migrating, State.Starting);
-
             List<CommandExecLogVO> activeCmds = 
findActiveCommands(dataCenterId, cutTime);
+            // Find running copy / migrate commands running arranged in 
ascending order of their creation time i.e., oldest first
+            List<CommandExecLogVO> copyCmdsInPipeline = 
findAllActiveCopyCommands(dataCenterId, cutTime);
+            // Count of total hosts
+            Integer hostsCount = _hostDao.countAllByType(Host.Type.Routing);
+            // Maximum number of allowed SSVMs for migration task
+            Integer maxSsvms = (hostsCount < 
MaxNumberOfSsvmsForMigration.value()) ? hostsCount : 
MaxNumberOfSsvmsForMigration.value();
+            int halfLimit = Math.round((float) (alreadyRunning.size() * 
migrateCapPerSSVM) / 2);
+            currentTime = DateUtil.currentGMTTime().getTime();
+
             if (alreadyRunning.size() * _capacityPerSSVM - activeCmds.size() < 
_standbyCapacity) {
                 s_logger.info("secondary storage command execution standby 
capactiy low (running VMs: " + alreadyRunning.size() + ", active cmds: " + 
activeCmds.size() +
                         "), starting a new one");
                 return new Pair<AfterScanAction, 
Object>(AfterScanAction.expand, SecondaryStorageVm.Role.commandExecutor);
             }
+            // Scale the number of SSVMs if the number of Copy operations is 
greater than the number of SSVMs running and the copy operation has been in 
pipeline for
+            // more than half of the total time allocated for secondary 
storage operation
+            else if (!copyCmdsInPipeline.isEmpty()  && 
copyCmdsInPipeline.size() >= halfLimit &&
+                    (((currentTime - copyCmdsInPipeline.get(halfLimit - 
1).getCreated().getTime()) > _maxExecutionTimeMs/2 )) &&
+                            (currentTime > nextSpawnTime) &&  
alreadyRunning.size() <=  maxSsvms) {
+                    nextSpawnTime = currentTime + _maxExecutionTimeMs/2;
+                    s_logger.debug("scaling SSVM to handle migration tasks");
+                    return new Pair<AfterScanAction, 
Object>(AfterScanAction.expand, SecondaryStorageVm.Role.templateProcessor);
+                }
+
+            // Scale down the number of SSVMs if the load on them has reduced
+            if ((copyCmdsInPipeline.size() < halfLimit && 
alreadyRunning.size() * _capacityPerSSVM - activeCmds.size() > 
_standbyCapacity) && alreadyRunning.size() > 1) {
+                Collections.reverse(alreadyRunning);
+                for(SecondaryStorageVmVO vm : alreadyRunning) {
+                    long count = copyCmdsInPipeline.stream().map(cmd -> 
cmd.getInstanceId() == vm.getId()).count();
+                    count += activeCmds.stream().map(cmd -> 
cmd.getInstanceId() == vm.getId()).count();
+                    if (count == 0) {
+                        destroySecStorageVm(vm.getId());
+                        break;
+                    }
+                }
+            }

Review comment:
       please factor this out in suitably called methods. This method was 
already too long to begin with, but let's not add to the lack of 
maintainability.

##########
File path: server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
##########
@@ -84,8 +82,10 @@
 import com.cloud.api.query.vo.UserAccountJoinVO;
 import com.cloud.api.query.vo.UserVmJoinVO;
 import com.cloud.api.query.vo.VolumeJoinVO;
-import com.cloud.storage.StoragePoolTagVO;
+import com.cloud.configuration.Resource;
+import com.cloud.domain.Domain;
 import com.cloud.storage.Storage.ImageFormat;
+import com.cloud.storage.StoragePoolTagVO;
 import com.cloud.storage.VolumeStats;

Review comment:
       this file is not changed. better remove the diff from the PR

##########
File path: server/src/main/java/com/cloud/storage/ImageStoreDetailsUtil.java
##########
@@ -30,7 +30,6 @@
 import com.google.common.base.Preconditions;
 
 public class ImageStoreDetailsUtil {
-

Review comment:
       I really like this style but in spite of it looking neater, the removal 
of the line is a potential conflict before we get a chance to merge. Better 
have those kind of changes is isolated contributions.

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllValidTemplates(srcDataStore));
+        files.addAll(getAllValidSnapshotsAndChains(srcDataStore, 
snapshotChains));
+        files.addAll(getAllValidVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return (int) (size2 - size1);
+            }
+        });
+        return files;
+    }
+
+    // Gets list of all valid templates, i.e, templates in "Ready" state for 
migration
+    protected List<DataObject> getAllValidTemplates(DataStore srcDataStore) {
+
+        List<DataObject> files = new LinkedList<>();
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStore.getId());
+        for (TemplateDataStoreVO template : templates) {
+            VMTemplateVO templateVO = 
templateDao.findById(template.getTemplateId());
+            if (template.getState() == 
ObjectInDataStoreStateMachine.State.Ready && !templateVO.isPublicTemplate()) {
+                
files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
+            }
+        }
+        return files;
+    }
+
+    /** Returns parent snapshots and snapshots that do not have any children; 
snapshotChains comprises of the snapshot chain info
+     * for each parent snapshot and the cumulative size of the chain - this is 
done to ensure that all the snapshots in a chain
+     * are migrated to the same datastore
+     */

Review comment:
       this triggers the thought: should chain be migrated at all or be 
coalesced first?

##########
File path: server/src/main/java/com/cloud/configuration/Config.java
##########
@@ -904,6 +905,14 @@
             "random",
             "'random', 'firstfit', 'userdispersing', 
'userconcentratedpod_random', 'userconcentratedpod_firstfit', 
'firstfitleastconsumed' : Order in which hosts within a cluster will be 
considered for VM/volume allocation.",
             null),
+    ImageStoreAllocationAlgorithm(

Review comment:
       adding keys here is an obsoleted way of creating Configuration details. 
please use a `Configkey<String>`

##########
File path: server/pom.xml
##########
@@ -89,6 +89,11 @@
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>3.6.1</version>

Review comment:
       version should go in the top level pom and not here.

##########
File path: server/src/main/java/com/cloud/configuration/Config.java
##########
@@ -1808,7 +1817,10 @@
     // StatsCollector
     StatsOutPutGraphiteHost("Advanced", ManagementServer.class, String.class, 
"stats.output.uri", "", "URI to additionally send StatsCollector statistics 
to", null),
 
-    SSVMPSK("Hidden", ManagementServer.class, String.class, 
"upload.post.secret.key", "", "PSK with SSVM", null);
+    SSVMPSK("Hidden", ManagementServer.class, String.class, 
"upload.post.secret.key", "", "PSK with SSVM", null),
+
+    SecStorageMaxMigrateSessions(
+            "Advanced", AgentManager.class, Integer.class, 
"secstorage.max.migrate.sessions","2","The max number of concurrent copy 
command execution sessions that an SSVM can handle",null);

Review comment:
       please use `ConfigKey<>`s

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllValidTemplates(srcDataStore));
+        files.addAll(getAllValidSnapshotsAndChains(srcDataStore, 
snapshotChains));
+        files.addAll(getAllValidVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return (int) (size2 - size1);
+            }
+        });
+        return files;
+    }
+
+    // Gets list of all valid templates, i.e, templates in "Ready" state for 
migration
+    protected List<DataObject> getAllValidTemplates(DataStore srcDataStore) {
+
+        List<DataObject> files = new LinkedList<>();
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStore.getId());
+        for (TemplateDataStoreVO template : templates) {
+            VMTemplateVO templateVO = 
templateDao.findById(template.getTemplateId());
+            if (template.getState() == 
ObjectInDataStoreStateMachine.State.Ready && !templateVO.isPublicTemplate()) {
+                
files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
+            }
+        }
+        return files;
+    }
+
+    /** Returns parent snapshots and snapshots that do not have any children; 
snapshotChains comprises of the snapshot chain info
+     * for each parent snapshot and the cumulative size of the chain - this is 
done to ensure that all the snapshots in a chain
+     * are migrated to the same datastore
+     */
+    protected List<DataObject> getAllValidSnapshotsAndChains(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<SnapshotInfo> files = new LinkedList<>();
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStore.getId(), DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            SnapshotVO snapshotVO = 
snapshotDao.findById(snapshot.getSnapshotId());
+            if (snapshot.getState() == 
ObjectInDataStoreStateMachine.State.Ready && snapshot.getParentSnapshotId() == 
0 ) {
+                SnapshotInfo snap = 
snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), DataStoreRole.Image);
+                files.add(snap);
+            }
+        }
+
+        for (SnapshotInfo parent : files) {
+            List<SnapshotInfo> chain = new ArrayList<>();
+            chain.add(parent);
+            for (int i =0; i< chain.size(); i++) {
+                SnapshotInfo child = chain.get(i);
+                List<SnapshotInfo> children = child.getChildren();
+                if (children != null) {
+                    chain.addAll(children);
+                }
+            }
+            snapshotChains.put(parent, new Pair<List<SnapshotInfo>, 
Long>(chain, getSizeForChain(chain)));
+        }
+
+        return (List<DataObject>) (List<?>) files;
+    }
+
+    // Finds the cumulative file size for all data objects in the chain
+    protected Long getSizeForChain(List<SnapshotInfo> chain) {
+        Long size = 0L;
+        for (SnapshotInfo snapshot : chain) {
+            size += snapshot.getSize();
+        }
+        return size;
+    }
+
+    // Returns a list of volumes that are in "Ready" state

Review comment:
       rename to `getAllReadyVolumes` and delete comment?

##########
File path: 
services/secondary-storage/server/src/main/java/org/apache/cloudstack/storage/resource/NfsSecondaryStorageResource.java
##########
@@ -1285,6 +1288,75 @@ protected File findFile(String path) {
         return srcFile;
     }
 
+    protected Answer copyFromNfsToNfs(CopyCommand cmd) {
+        final DataTO srcData = cmd.getSrcTO();
+        final DataTO destData = cmd.getDestTO();
+        DataStoreTO srcDataStore = srcData.getDataStore();
+        NfsTO srcStore = (NfsTO)srcDataStore;
+        DataStoreTO destDataStore = destData.getDataStore();
+        final NfsTO destStore = (NfsTO) destDataStore;
+        try {
+            File srcFile = new File(getDir(srcStore.getUrl(), _nfsVersion), 
srcData.getPath());
+            File destFile = new File(getDir(destStore.getUrl(), _nfsVersion), 
destData.getPath());
+            ImageFormat format = getTemplateFormat(srcFile.getName());
+
+            if (srcFile == null) {
+                return new CopyCmdAnswer("Can't find src file:" + srcFile);
+            }
+
+            if (srcData instanceof TemplateObjectTO || srcData instanceof 
VolumeObjectTO) {
+                File srcDir = null;
+                if (srcFile.isFile()) {
+                    srcDir = new File(srcFile.getParent());
+                }
+                File destDir = null;
+                if (destFile.isFile()) {
+                    destDir = new File(destFile.getParent());
+                }
+
+                try {
+                    FileUtils.copyDirectory((srcDir == null ? srcFile : 
srcDir), (destDir == null? destFile : destDir));
+                } catch (IOException e) {
+                    String msg = "Failed to copy file to destination";
+                    s_logger.info(msg);
+                    return new CopyCmdAnswer(msg);
+                }
+            } else {
+                destFile = new File(destFile, srcFile.getName());
+                try {
+                    FileUtils.copyFile(srcFile, destFile);
+                } catch (IOException e) {
+                    String msg = "Failed to copy file to destination";
+                    s_logger.info(msg);
+                    return new CopyCmdAnswer(msg);
+                }
+            }
+
+            DataTO retObj = null;
+            if (destData.getObjectType() == DataObjectType.TEMPLATE) {
+                TemplateObjectTO newTemplate = new TemplateObjectTO();
+                newTemplate.setPath(destData.getPath() + File.separator + 
srcFile.getName());
+                newTemplate.setSize(getVirtualSize(srcFile, format));
+                newTemplate.setPhysicalSize(srcFile.length());
+                newTemplate.setFormat(format);
+                retObj = newTemplate;
+            } else if (destData.getObjectType() == DataObjectType.VOLUME) {
+                VolumeObjectTO newVol = new VolumeObjectTO();
+                newVol.setPath(destData.getPath() + File.separator + 
srcFile.getName());
+                newVol.setSize(srcFile.length());
+                retObj = newVol;
+            } else if (destData.getObjectType() == DataObjectType.SNAPSHOT) {
+                SnapshotObjectTO newSnapshot = new SnapshotObjectTO();
+                newSnapshot.setPath(destData.getPath() + File.separator + 
destFile.getName());
+                retObj = newSnapshot;
+            }
+            return new CopyCmdAnswer(retObj);
+            } catch (Exception e) {
+            s_logger.error("failed to copy file" + srcData.getPath(), e);
+            return new CopyCmdAnswer("failed to copy file" + srcData.getPath() 
+ e.toString());

Review comment:
       some kind of indentation problem here. The method is a bit to long for a 
clear view on what happened. Can you modularise it a bit more, please? Also 
there is more than one return points, which makes the flow hard to follow.

##########
File path: 
api/src/main/java/org/apache/cloudstack/api/command/admin/storage/UpdateImageStoreCmd.java
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api.command.admin.storage;
+
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.ApiErrorCode;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.ImageStoreResponse;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.log4j.Logger;
+
+import com.cloud.storage.ImageStore;
+
+@APICommand(name = UpdateImageStoreCmd.APINAME, description = "Updates image 
store read-only status", responseObject = ImageStoreResponse.class, entityType 
= {ImageStore.class},

Review comment:
       can you add `since = "4.15"`

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllValidTemplates(srcDataStore));
+        files.addAll(getAllValidSnapshotsAndChains(srcDataStore, 
snapshotChains));
+        files.addAll(getAllValidVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return (int) (size2 - size1);
+            }
+        });
+        return files;
+    }
+
+    // Gets list of all valid templates, i.e, templates in "Ready" state for 
migration
+    protected List<DataObject> getAllValidTemplates(DataStore srcDataStore) {
+
+        List<DataObject> files = new LinkedList<>();
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStore.getId());
+        for (TemplateDataStoreVO template : templates) {
+            VMTemplateVO templateVO = 
templateDao.findById(template.getTemplateId());
+            if (template.getState() == 
ObjectInDataStoreStateMachine.State.Ready && !templateVO.isPublicTemplate()) {
+                
files.add(templateFactory.getTemplate(template.getTemplateId(), srcDataStore));
+            }
+        }
+        return files;
+    }
+
+    /** Returns parent snapshots and snapshots that do not have any children; 
snapshotChains comprises of the snapshot chain info
+     * for each parent snapshot and the cumulative size of the chain - this is 
done to ensure that all the snapshots in a chain
+     * are migrated to the same datastore
+     */
+    protected List<DataObject> getAllValidSnapshotsAndChains(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<SnapshotInfo> files = new LinkedList<>();
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStore.getId(), DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            SnapshotVO snapshotVO = 
snapshotDao.findById(snapshot.getSnapshotId());
+            if (snapshot.getState() == 
ObjectInDataStoreStateMachine.State.Ready && snapshot.getParentSnapshotId() == 
0 ) {
+                SnapshotInfo snap = 
snapshotFactory.getSnapshot(snapshotVO.getSnapshotId(), DataStoreRole.Image);
+                files.add(snap);
+            }
+        }
+
+        for (SnapshotInfo parent : files) {
+            List<SnapshotInfo> chain = new ArrayList<>();
+            chain.add(parent);
+            for (int i =0; i< chain.size(); i++) {
+                SnapshotInfo child = chain.get(i);
+                List<SnapshotInfo> children = child.getChildren();
+                if (children != null) {
+                    chain.addAll(children);
+                }
+            }
+            snapshotChains.put(parent, new Pair<List<SnapshotInfo>, 
Long>(chain, getSizeForChain(chain)));
+        }
+
+        return (List<DataObject>) (List<?>) files;
+    }
+
+    // Finds the cumulative file size for all data objects in the chain

Review comment:
       methodname says it all

##########
File path: 
engine/storage/src/main/java/org/apache/cloudstack/storage/datastore/ObjectInDataStoreManagerImpl.java
##########
@@ -104,6 +104,14 @@ public ObjectInDataStoreManagerImpl() {
         // TODO: further investigate why an extra event is sent when it is
         // alreay Ready for DownloadListener
         stateMachines.addTransition(State.Ready, Event.OperationSuccessed, 
State.Ready);
+        // State transitions for data object migration
+        stateMachines.addTransition(State.Ready, Event.MigrationRequested, 
State.Migrating);
+        stateMachines.addTransition(State.Ready, Event.CopyRequested, 
State.Copying);
+        stateMachines.addTransition(State.Allocated, Event.MigrationRequested, 
State.Migrating);
+        stateMachines.addTransition(State.Migrating, Event.MigrationFailed, 
State.Failed);
+        stateMachines.addTransition(State.Migrating, Event.MigrationSucceeded, 
State.Destroyed);
+        stateMachines.addTransition(State.Migrating, Event.OperationSuccessed, 
State.Ready);

Review comment:
       `Event.OperationSuccessed`? or should this be `Event.OperationSucceeded`?

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);

Review comment:
       Can you call this static final something all-caps, please?

##########
File path: 
engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/TemplateServiceImpl.java
##########
@@ -253,7 +253,8 @@ public void downloadBootstrapSysTemplate(DataStore store) {
     @Override
     public void handleSysTemplateDownload(HypervisorType hostHyper, Long dcId) 
{
         Set<VMTemplateVO> toBeDownloaded = new HashSet<VMTemplateVO>();
-        List<DataStore> stores = _storeMgr.getImageStoresByScope(new 
ZoneScope(dcId));
+        //List<DataStore> stores = _storeMgr.getImageStoresByScope(new 
ZoneScope(dcId));

Review comment:
       please remove commented code

##########
File path: 
engine/storage/volume/src/main/java/org/apache/cloudstack/storage/volume/VolumeObject.java
##########
@@ -392,7 +391,8 @@ public void 
processEvent(ObjectInDataStoreStateMachine.Event event) {
                 if (event == 
ObjectInDataStoreStateMachine.Event.CreateOnlyRequested) {
                     volEvent = Volume.Event.UploadRequested;
                 } else if (event == 
ObjectInDataStoreStateMachine.Event.MigrationRequested) {
-                    volEvent = Volume.Event.CopyRequested;
+                    //volEvent = Volume.Event.CopyRequested;

Review comment:
       please do not keep comment incode. We have an RCS for keeping save old 
code.

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/StorageOrchestrator.java
##########
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.cloudstack.api.response.MigrationResponse;
+import 
org.apache.cloudstack.engine.orchestration.service.StorageOrchestrationService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService.DataObjectResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.Configurable;
+import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.commons.math3.stat.descriptive.moment.Mean;
+import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
+import org.apache.log4j.Logger;
+
+import com.cloud.configuration.Config;
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.server.StatsCollector;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.ImageStoreService.MigrationPolicy;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.StorageService;
+import com.cloud.storage.StorageStats;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.Pair;
+import com.cloud.utils.StringUtils;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class StorageOrchestrator extends ManagerBase implements 
StorageOrchestrationService, Configurable {
+
+    private static final Logger s_logger = 
Logger.getLogger(StorageOrchestrator.class);
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDao snapshotDao;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    DataStoreManager dataStoreManager;
+    @Inject
+    ImageStoreDao imageStoreDao;
+    @Inject
+    StatsCollector statsCollector;
+    @Inject
+    public StorageService storageService;
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    ConfigurationDao configDao;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    private AsyncJobManager jobMgr;
+    @Inject
+    private SecondaryStorageService secStgSrv;
+
+    ConfigKey<Double> ImageStoreImbalanceThreshold = new 
ConfigKey<>("Advanced", Double.class,
+            "image.store.imbalance.threshold",
+            "0.1",
+            "The storage imbalance threshold that is compared with the 
standard deviation percentage for a storage utilization metric. " +
+                    "The value is a percentage in decimal format.",
+            true, ConfigKey.Scope.Global);
+
+    Integer numConcurrentCopyTasksPerSSVM = 2;
+
+    private double imageStoreCapacityThreshold = 0.90;
+
+    @Override
+    public String getConfigComponentName() {
+        return StorageOrchestrationService.class.getName();
+    }
+
+    @Override
+    public ConfigKey<?>[] getConfigKeys() {
+        return new ConfigKey<?>[]{ImageStoreImbalanceThreshold};
+    }
+
+    static class MigrateBlockingQueue<T> extends ArrayBlockingQueue<T> {
+
+        MigrateBlockingQueue(int size) {
+            super(size);
+        }
+
+        public boolean offer(T task) {
+            try {
+                this.put(task);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return true;
+        }
+    }
+
+    @Override
+    public boolean configure(String name, Map<String, Object> params) throws 
ConfigurationException {
+        numConcurrentCopyTasksPerSSVM = 
NumbersUtil.parseInt(configDao.getValue(Config.SecStorageMaxMigrateSessions.key()),
 2);
+        return true;
+    }
+
+    @Override
+    public MigrationResponse migrateData(Long srcDataStoreId, List<Long> 
destDatastores, MigrationPolicy migrationPolicy) {

Review comment:
       It is still 112 lines and there are a lot of comments in it that could 
serve as method names. This would help make the code more prozaic and thus 
easier to read. This method could do with some modularisation.

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllValidTemplates(srcDataStore));
+        files.addAll(getAllValidSnapshotsAndChains(srcDataStore, 
snapshotChains));
+        files.addAll(getAllValidVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return (int) (size2 - size1);

Review comment:
       I know I asked for this (or such), but overflow is possible if the diff 
is huge.
   In relation to the comment to the coomparable above, Is 0 useful? Might we 
want to reuse this in cases where it is?

##########
File path: 
engine/orchestration/src/main/java/org/apache/cloudstack/engine/orchestration/DataMigrationUtility.java
##########
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.engine.orchestration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.storage.ImageStoreService;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.SnapshotVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.dao.SnapshotDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.SecondaryStorageVm;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+
+public class DataMigrationUtility {
+    @Inject
+    SecondaryStorageVmDao secStorageVmDao;
+    @Inject
+    TemplateDataStoreDao templateDataStoreDao;
+    @Inject
+    SnapshotDataStoreDao snapshotDataStoreDao;
+    @Inject
+    VolumeDataStoreDao volumeDataStoreDao;
+    @Inject
+    VMTemplateDao templateDao;
+    @Inject
+    VolumeDataFactory volumeFactory;
+    @Inject
+    TemplateDataFactory templateFactory;
+    @Inject
+    SnapshotDataFactory snapshotFactory;
+    @Inject
+    HostDao hostDao;
+    @Inject
+    SnapshotDao snapshotDao;
+
+    private static final Logger s_logger = 
Logger.getLogger(DataMigrationUtility.class);
+
+    /** This function verifies if the given image store comprises of data 
objects that are not in either the "Ready" or
+     * "Allocated" state - in such a case, if the migration policy is 
complete, the migration is terminated
+     */
+    private boolean filesReady(Long srcDataStoreId) {
+        String[] validStates = new String[]{"Ready", "Allocated"};
+        boolean isReady = true;
+        List<TemplateDataStoreVO> templates = 
templateDataStoreDao.listByStoreId(srcDataStoreId);
+        for (TemplateDataStoreVO template : templates) {
+            isReady &= 
(Arrays.asList(validStates).contains(template.getState().toString()));
+        }
+        List<SnapshotDataStoreVO> snapshots = 
snapshotDataStoreDao.listByStoreId(srcDataStoreId, DataStoreRole.Image);
+        for (SnapshotDataStoreVO snapshot : snapshots) {
+            isReady &= 
(Arrays.asList(validStates).contains(snapshot.getState().toString()));
+        }
+        List<VolumeDataStoreVO> volumes = 
volumeDataStoreDao.listByStoreId(srcDataStoreId);
+        for (VolumeDataStoreVO volume : volumes) {
+            isReady &= 
(Arrays.asList(validStates).contains(volume.getState().toString()));
+        }
+        return isReady;
+    }
+
+    protected void 
checkIfCompleteMigrationPossible(ImageStoreService.MigrationPolicy policy, Long 
srcDataStoreId) {
+        if (policy == ImageStoreService.MigrationPolicy.COMPLETE) {
+            if (!filesReady(srcDataStoreId)) {
+                throw new CloudRuntimeException("Complete migration failed as 
there are data objects which are not Ready");
+            }
+        }
+        return;
+    }
+
+    protected Long getFileSize(DataObject file, Map<DataObject, 
Pair<List<SnapshotInfo>, Long>> snapshotChain) {
+        Long size = file.getSize();
+        Pair<List<SnapshotInfo>, Long> chain = snapshotChain.get(file);
+        if (file instanceof SnapshotInfo && chain.first() != null) {
+            size = chain.second();
+        }
+        return size;
+    }
+
+    /**
+     * Sorts the datastores in decreasing order of their free capacities, so 
as to make
+     * an informed decision of picking the datastore with maximum free 
capactiy for migration
+     */
+    protected List<Long> sortDataStores(Map<Long, Pair<Long, Long>> 
storageCapacities) {
+        List<Map.Entry<Long, Pair<Long, Long>>> list =
+                new LinkedList<Map.Entry<Long, Pair<Long, 
Long>>>((storageCapacities.entrySet()));
+
+        Collections.sort(list, new Comparator<Map.Entry<Long, Pair<Long, 
Long>>>() {
+            @Override
+            public int compare(Map.Entry<Long, Pair<Long, Long>> e1, 
Map.Entry<Long, Pair<Long, Long>> e2) {
+                return e2.getValue().first() > e1.getValue().first() ? 1 : -1;
+            }
+        });
+        HashMap<Long, Pair<Long, Long>> temp = new LinkedHashMap<>();
+        for (Map.Entry<Long, Pair<Long, Long>> value : list) {
+            temp.put(value.getKey(), value.getValue());
+        }
+
+        return new ArrayList<>(temp.keySet());
+    }
+
+    protected List<DataObject> getSortedValidSourcesList(DataStore 
srcDataStore, Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        List<DataObject> files = new ArrayList<>();
+        files.addAll(getAllValidTemplates(srcDataStore));
+        files.addAll(getAllValidSnapshotsAndChains(srcDataStore, 
snapshotChains));
+        files.addAll(getAllValidVolumes(srcDataStore));
+
+        files = sortFilesOnSize(files, snapshotChains);
+
+        return files;
+    }
+
+    protected List<DataObject> sortFilesOnSize(List<DataObject> files, 
Map<DataObject, Pair<List<SnapshotInfo>, Long>> snapshotChains) {
+        Collections.sort(files, new Comparator<DataObject>() {
+            @Override
+            public int compare(DataObject o1, DataObject o2) {
+                Long size1 = o1.getSize();
+                Long size2 = o2.getSize();
+                if (o1 instanceof SnapshotInfo) {
+                    size1 = snapshotChains.get(o1).second();
+                }
+                if (o2 instanceof  SnapshotInfo) {
+                    size2 = snapshotChains.get(o2).second();
+                }
+                return (int) (size2 - size1);
+            }
+        });
+        return files;
+    }
+
+    // Gets list of all valid templates, i.e, templates in "Ready" state for 
migration

Review comment:
       not a useful comment given the clear method name. extend the method name 
to be even more clear? make this a javadoc? remove?

##########
File path: 
engine/storage/image/src/main/java/org/apache/cloudstack/storage/image/StorageServiceImpl.java
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.storage.image;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.engine.subsystem.api.storage.CopyCommandResult;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataMotionService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
+import 
org.apache.cloudstack.engine.subsystem.api.storage.SecondaryStorageService;
+import org.apache.cloudstack.engine.subsystem.api.storage.SnapshotInfo;
+import org.apache.cloudstack.engine.subsystem.api.storage.TemplateInfo;
+import org.apache.cloudstack.framework.async.AsyncCallFuture;
+import org.apache.cloudstack.framework.async.AsyncCallbackDispatcher;
+import org.apache.cloudstack.framework.async.AsyncCompletionCallback;
+import org.apache.cloudstack.framework.async.AsyncRpcContext;
+import org.apache.cloudstack.storage.command.CopyCmdAnswer;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreVO;
+import org.apache.log4j.Logger;
+
+import com.cloud.secstorage.CommandExecLogDao;
+import com.cloud.utils.Pair;
+
+public class StorageServiceImpl implements SecondaryStorageService {

Review comment:
       Are you sure this is the best naming convention here? a `StorageService` 
sounds like more basic than a `SecondaryStorageService`. Is it inteded to 
implement more `Interface`s in the future?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to