abh1sar commented on code in PR #13074:
URL: https://github.com/apache/cloudstack/pull/13074#discussion_r3310539625


##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -168,6 +200,305 @@ protected Host getVMHypervisorHost(VirtualMachine vm) {
         return 
resourceManager.findOneRandomRunningHostByHypervisor(Hypervisor.HypervisorType.KVM,
 vm.getDataCenterId());
     }
 
+    /**
+     * Returned by {@link #decideChain(VirtualMachine)} to describe the next 
backup's place in
+     * the chain: full vs incremental, the bitmap name to create, and (for 
incrementals) the
+     * parent bitmap and parent file path.
+     */
+    static final class ChainDecision {
+        final String mode;            // "full" or "incremental"
+        final String bitmapNew;
+        final String bitmapParent;    // null for full
+        // Per-volume parent backup file paths, one per current VM volume in 
deviceId order.
+        // null/empty for full. Replaces the single parentPath field — each 
volume needs its
+        // own parent file because backup files are named after each volume's 
own UUID
+        // (root.<uuid>.qcow2 / datadisk.<uuid>.qcow2), abh1sar review at line 
340.

Review Comment:
   Please remove any code comments that mention GitHub usernames or review 
comments.
   



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -168,6 +200,305 @@ protected Host getVMHypervisorHost(VirtualMachine vm) {
         return 
resourceManager.findOneRandomRunningHostByHypervisor(Hypervisor.HypervisorType.KVM,
 vm.getDataCenterId());
     }
 
+    /**
+     * Returned by {@link #decideChain(VirtualMachine)} to describe the next 
backup's place in
+     * the chain: full vs incremental, the bitmap name to create, and (for 
incrementals) the
+     * parent bitmap and parent file path.
+     */
+    static final class ChainDecision {
+        final String mode;            // "full" or "incremental"
+        final String bitmapNew;
+        final String bitmapParent;    // null for full
+        // Per-volume parent backup file paths, one per current VM volume in 
deviceId order.
+        // null/empty for full. Replaces the single parentPath field — each 
volume needs its
+        // own parent file because backup files are named after each volume's 
own UUID
+        // (root.<uuid>.qcow2 / datadisk.<uuid>.qcow2), abh1sar review at line 
340.
+        final List<String> parentPaths;
+        final String chainId;         // chain identifier this backup belongs 
to
+        final int chainPosition;      // 0 for full, N for the Nth incremental 
in the chain
+
+        private ChainDecision(String mode, String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                              String chainId, int chainPosition) {
+            this.mode = mode;
+            this.bitmapNew = bitmapNew;
+            this.bitmapParent = bitmapParent;
+            this.parentPaths = parentPaths;
+            this.chainId = chainId;
+            this.chainPosition = chainPosition;
+        }
+
+        static ChainDecision fullStart(String bitmapName) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_FULL, bitmapName, 
null, null,
+                    UUID.randomUUID().toString(), 0);
+        }
+
+        static ChainDecision incremental(String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                                         String chainId, int chainPosition) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_INCREMENTAL, 
bitmapNew, bitmapParent,
+                    parentPaths, chainId, chainPosition);
+        }
+
+        boolean isIncremental() {
+            return NASBackupChainKeys.TYPE_INCREMENTAL.equals(mode);
+        }
+    }
+
+    /**
+     * Decides whether the next backup for {@code vm} should be a fresh full 
or an incremental
+     * appended to the existing chain. Stopped VMs are always full (libvirt 
{@code backup-begin}
+     * requires a running QEMU process). The {@code nas.backup.full.every} 
ConfigKey controls
+     * how many backups (full + incrementals) form one chain before a new full 
is forced.
+     *
+     * <p>The decision is anchored on the VM's {@code 
nas.active_checkpoint_id} detail, which
+     * records the bitmap that currently exists on the running QEMU. After a 
restore that
+     * detail is cleared, so the next backup is automatically full — even 
though there may be
+     * a more recent "last backup taken" row in the database. This matches the 
prescription in
+     * the PR review (avoid relying on "last backup" because that breaks after 
restore).</p>
+     */
+    protected ChainDecision decideChain(VirtualMachine vm) {
+        final String newBitmap = "backup-" + System.currentTimeMillis() / 
1000L;
+
+        // Master switch — when the operator disables incrementals at the zone 
level every
+        // backup is taken as a fresh full. Existing chains stay restorable 
because each
+        // backup's metadata is kept independently; restoring an incremental 
still walks its
+        // own chain (the per-backup chain_id / parent_backup_id details 
persist regardless
+        // of the live config). The next backup with this flag back on starts 
a new chain.
+        Boolean incrementalEnabled = 
NASBackupIncrementalEnabled.valueIn(vm.getDataCenterId());
+        if (incrementalEnabled == null || !incrementalEnabled) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Stopped VMs cannot do incrementals — script will also fall back, 
but we make the
+        // decision here so we register the right type up-front.
+        if (VirtualMachine.State.Stopped.equals(vm.getState())) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        Integer fullEvery = NASBackupFullEvery.valueIn(vm.getDataCenterId());
+        if (fullEvery == null || fullEvery <= 1) {
+            // Disabled or every-backup-is-full mode.
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 1. If the VM has no active_checkpoint_id, there is no bitmap on the 
host to use as
+        //    a parent. This is the case after restore (we clear it), after VM 
was just assigned
+        //    to the offering, or on the very first backup.
+        String activeCheckpoint = readVmActiveCheckpoint(vm.getId());
+        if (activeCheckpoint == null) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 2. Find the latest BackedUp backup of this VM whose BITMAP_NAME 
matches the VM's
+        //    active_checkpoint_id. Only that backup is a safe parent — any 
earlier backup
+        //    would have a bitmap that QEMU may have rotated out. Per the 
review:
+        //      "The latest backup should have the bitmap_name equal to the 
VM's
+        //       active_checkpoint_id which will become the parent backup. If 
not, force full."
+        Backup parent = findLatestBackedUpBackupWithBitmap(vm.getId(), 
activeCheckpoint);

Review Comment:
   It's not - Get the latest backup to have the bitmap_name equal to the VM's.
   
   It's actually - Get the latest backup -> If it has bitmap_name equal to VM's 
take incremental backup otherwise take full.
   
   After restore the next backup is always full. That full backup will then 
become the latest backup. An incremental backup will always be on top of the 
latest backup taken.
   So we don't need to get all backups sort them and search for backup matching 
the bitmap.
   
   If the latest backup doesn't match the VM's bitmap then it is a full backup.
   



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -168,6 +200,305 @@ protected Host getVMHypervisorHost(VirtualMachine vm) {
         return 
resourceManager.findOneRandomRunningHostByHypervisor(Hypervisor.HypervisorType.KVM,
 vm.getDataCenterId());
     }
 
+    /**
+     * Returned by {@link #decideChain(VirtualMachine)} to describe the next 
backup's place in
+     * the chain: full vs incremental, the bitmap name to create, and (for 
incrementals) the
+     * parent bitmap and parent file path.
+     */
+    static final class ChainDecision {
+        final String mode;            // "full" or "incremental"
+        final String bitmapNew;
+        final String bitmapParent;    // null for full
+        // Per-volume parent backup file paths, one per current VM volume in 
deviceId order.
+        // null/empty for full. Replaces the single parentPath field — each 
volume needs its
+        // own parent file because backup files are named after each volume's 
own UUID
+        // (root.<uuid>.qcow2 / datadisk.<uuid>.qcow2), abh1sar review at line 
340.
+        final List<String> parentPaths;
+        final String chainId;         // chain identifier this backup belongs 
to
+        final int chainPosition;      // 0 for full, N for the Nth incremental 
in the chain
+
+        private ChainDecision(String mode, String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                              String chainId, int chainPosition) {
+            this.mode = mode;
+            this.bitmapNew = bitmapNew;
+            this.bitmapParent = bitmapParent;
+            this.parentPaths = parentPaths;
+            this.chainId = chainId;
+            this.chainPosition = chainPosition;
+        }
+
+        static ChainDecision fullStart(String bitmapName) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_FULL, bitmapName, 
null, null,
+                    UUID.randomUUID().toString(), 0);
+        }
+
+        static ChainDecision incremental(String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                                         String chainId, int chainPosition) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_INCREMENTAL, 
bitmapNew, bitmapParent,
+                    parentPaths, chainId, chainPosition);
+        }
+
+        boolean isIncremental() {
+            return NASBackupChainKeys.TYPE_INCREMENTAL.equals(mode);
+        }
+    }
+
+    /**
+     * Decides whether the next backup for {@code vm} should be a fresh full 
or an incremental
+     * appended to the existing chain. Stopped VMs are always full (libvirt 
{@code backup-begin}
+     * requires a running QEMU process). The {@code nas.backup.full.every} 
ConfigKey controls
+     * how many backups (full + incrementals) form one chain before a new full 
is forced.
+     *
+     * <p>The decision is anchored on the VM's {@code 
nas.active_checkpoint_id} detail, which
+     * records the bitmap that currently exists on the running QEMU. After a 
restore that
+     * detail is cleared, so the next backup is automatically full — even 
though there may be
+     * a more recent "last backup taken" row in the database. This matches the 
prescription in
+     * the PR review (avoid relying on "last backup" because that breaks after 
restore).</p>
+     */
+    protected ChainDecision decideChain(VirtualMachine vm) {
+        final String newBitmap = "backup-" + System.currentTimeMillis() / 
1000L;
+
+        // Master switch — when the operator disables incrementals at the zone 
level every
+        // backup is taken as a fresh full. Existing chains stay restorable 
because each
+        // backup's metadata is kept independently; restoring an incremental 
still walks its
+        // own chain (the per-backup chain_id / parent_backup_id details 
persist regardless
+        // of the live config). The next backup with this flag back on starts 
a new chain.
+        Boolean incrementalEnabled = 
NASBackupIncrementalEnabled.valueIn(vm.getDataCenterId());
+        if (incrementalEnabled == null || !incrementalEnabled) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Stopped VMs cannot do incrementals — script will also fall back, 
but we make the
+        // decision here so we register the right type up-front.
+        if (VirtualMachine.State.Stopped.equals(vm.getState())) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        Integer fullEvery = NASBackupFullEvery.valueIn(vm.getDataCenterId());
+        if (fullEvery == null || fullEvery <= 1) {
+            // Disabled or every-backup-is-full mode.
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 1. If the VM has no active_checkpoint_id, there is no bitmap on the 
host to use as
+        //    a parent. This is the case after restore (we clear it), after VM 
was just assigned
+        //    to the offering, or on the very first backup.
+        String activeCheckpoint = readVmActiveCheckpoint(vm.getId());
+        if (activeCheckpoint == null) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 2. Find the latest BackedUp backup of this VM whose BITMAP_NAME 
matches the VM's
+        //    active_checkpoint_id. Only that backup is a safe parent — any 
earlier backup
+        //    would have a bitmap that QEMU may have rotated out. Per the 
review:
+        //      "The latest backup should have the bitmap_name equal to the 
VM's
+        //       active_checkpoint_id which will become the parent backup. If 
not, force full."
+        Backup parent = findLatestBackedUpBackupWithBitmap(vm.getId(), 
activeCheckpoint);
+        if (parent == null) {
+            LOG.debug("VM {} has active_checkpoint_id={} but no matching 
backup found — forcing full",
+                    vm.getInstanceName(), activeCheckpoint);
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        String parentChainId = readDetail(parent, NASBackupChainKeys.CHAIN_ID);
+        int parentChainPosition = chainPosition(parent);
+        if (parentChainId == null || parentChainPosition == Integer.MAX_VALUE) 
{
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Force a fresh full when the chain has reached the configured length.
+        if (parentChainPosition + 1 >= fullEvery) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // The script needs the parent backup's on-NAS file path PER VOLUME so 
it can rebase
+        // each new qcow2 onto the matching parent. The paths are stored 
relative to the NAS
+        // mount root — the script resolves them inside its mount session. 
When alignment
+        // fails (volume count changed, etc.) compose returns null and we fall 
back to full
+        // so we don't risk corrupting the chain.
+        List<String> parentPaths = composeParentBackupPaths(parent, 
vm.getId());
+        if (parentPaths == null) {
+            LOG.debug("VM {} parent backup {} volume layout no longer matches 
current VM — forcing full",
+                    vm.getInstanceName(), parent.getUuid());
+            return ChainDecision.fullStart(newBitmap);
+        }
+        return ChainDecision.incremental(newBitmap, activeCheckpoint, 
parentPaths,
+                parentChainId, parentChainPosition + 1);
+    }
+
+    /**
+     * Read the {@code nas.active_checkpoint_id} VM detail. Returns {@code 
null} when no detail
+     * exists (post-restore, first backup, or after explicit reset).
+     */
+    private String readVmActiveCheckpoint(long vmId) {
+        VMInstanceDetailVO d = vmInstanceDetailsDao.findDetail(vmId, 
NASBackupChainKeys.VM_ACTIVE_CHECKPOINT_ID);
+        if (d == null) {
+            return null;
+        }
+        String v = d.getValue();
+        return (v == null || v.isEmpty()) ? null : v;
+    }
+
+    /**
+     * Locate the most-recent {@code BackedUp} backup for {@code vmId} whose 
bitmap name equals
+     * {@code bitmapName}. Used by {@link #decideChain} to anchor the next 
incremental.
+     */
+    private Backup findLatestBackedUpBackupWithBitmap(long vmId, String 
bitmapName) {
+        List<Backup> history = backupDao.listByVmId(null, vmId);
+        if (history == null || history.isEmpty()) {
+            return null;
+        }
+        history.sort(Comparator.comparing(Backup::getDate).reversed());
+        for (Backup b : history) {
+            if (!Backup.Status.BackedUp.equals(b.getStatus())) {
+                continue;
+            }
+            if (bitmapName.equals(readDetail(b, 
NASBackupChainKeys.BITMAP_NAME))) {
+                return b;
+            }
+        }
+        return null;
+    }
+
+    private String readDetail(Backup backup, String key) {
+        BackupDetailVO d = backupDetailsDao.findDetail(backup.getId(), key);
+        return d == null ? null : d.getValue();
+    }
+
+    /**
+     * Compose the on-NAS path of EVERY parent backup file (one per VM volume) 
in the same
+     * order the script will iterate the current VM's disks (deviceId asc). 
Relative to the
+     * NAS mount, matches the layout written by {@code nasbackup.sh}:
+     *   first  disk -> {@code <backupPath>/root.<volUuid>.qcow2}
+     *   others      -> {@code <backupPath>/datadisk.<volUuid>.qcow2}
+     *
+     * Returns {@code null} if the parent's stored volume list can't be 
aligned with the
+     * current VM's volumes (count mismatch / different volume identities). In 
that case the
+     * caller should force a fresh full so we don't accidentally rebase a data 
disk onto the
+     * root parent — exactly the bug abh1sar flagged at line 340.

Review Comment:
   remove user and review comment mentions



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -239,9 +578,35 @@ public Pair<Boolean, Backup> takeBackup(final 
VirtualMachine vm, Boolean quiesce
             backupVO.setDate(new Date());
             backupVO.setSize(answer.getSize());
             backupVO.setStatus(Backup.Status.BackedUp);
+            // If the agent fell back to full (stopped VM mid-incremental 
cycle), record this
+            // backup as a full and start a new chain.
+            ChainDecision effective = decision;
+            if (answer.getIncrementalFallback()) {
+                effective = ChainDecision.fullStart(decision.bitmapNew);
+                backupVO.setType("FULL");
+            }
             List<Volume> volumes = new 
ArrayList<>(volumeDao.findByInstance(vm.getId()));
             
backupVO.setBackedUpVolumes(backupManager.createVolumeInfoFromVolumes(volumes));
             if (backupDao.update(backupVO.getId(), backupVO)) {
+                persistChainMetadata(backupVO, effective, 
answer.getBitmapCreated());
+                if (answer.getBitmapRecreated() != null) {
+                    backupDetailsDao.persist(new 
BackupDetailVO(backupVO.getId(),
+                            NASBackupChainKeys.BITMAP_RECREATED, 
answer.getBitmapRecreated(), true));

Review Comment:
   I don't see `BITMAP_RECREATED` used anywhere and also don't see the value of 
storing it.
   Can you remove all occurrence of it (from this file and nasbackup.sh)
   
   And it's not actually bitmap_recreated but checkpoint redefined. 
bitmap_recreated suggests that bitmap was recreated on the disks. Anyway I 
don't think we need it.



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -506,13 +909,162 @@ public boolean deleteBackup(Backup backup, boolean 
forced) {
         } catch (OperationTimedoutException e) {
             throw new CloudRuntimeException("Operation to delete backup timed 
out, please try again");
         }
+        if (answer == null || !answer.getResult()) {
+            logger.warn("Failed to delete backup file for {} ({}); leaving DB 
row intact",
+                    backup.getUuid(), backup.getExternalId());
+            return false;
+        }
+        backupDao.remove(backup.getId());
+        return true;
+    }
 
-        if (answer != null && answer.getResult()) {
-            return backupDao.remove(backup.getId());
+    /**
+     * Mark {@code backup} as delete-pending in {@code backup_details}. 
Idempotent.
+     */
+    private void markDeletePending(Backup backup) {
+        BackupDetailVO existing = backupDetailsDao.findDetail(backup.getId(), 
NASBackupChainKeys.DELETE_PENDING);
+        if (existing == null) {
+            backupDetailsDao.persist(new BackupDetailVO(backup.getId(),
+                    NASBackupChainKeys.DELETE_PENDING, "true", true));
         }
+    }
 
-        logger.debug("There was an error removing the backup with id {}", 
backup.getId());
-        return false;
+    /**
+     * @return true if this backup carries the delete-pending tombstone.
+     */
+    private boolean isDeletePending(Backup backup) {
+        BackupDetailVO d = backupDetailsDao.findDetail(backup.getId(), 
NASBackupChainKeys.DELETE_PENDING);
+        return d != null && "true".equalsIgnoreCase(d.getValue());
+    }
+
+    /**
+     * Return the live (not delete-pending, not Removed) children of {@code 
parent} within the
+     * same chain. Equivalent to "incrementals whose parent_backup_id points 
at parent".
+     */
+    private List<Backup> findLiveChildren(Backup parent) {
+        String parentUuid = parent.getUuid();
+        String chainId = readDetail(parent, NASBackupChainKeys.CHAIN_ID);
+        if (parentUuid == null || chainId == null) {
+            return Collections.emptyList();
+        }
+        List<Backup> children = new ArrayList<>();
+        for (Backup b : backupDao.listByVmId(null, parent.getVmId())) {
+            if (b.getId() == parent.getId()) {
+                continue;
+            }
+            if (!chainId.equals(readDetail(b, NASBackupChainKeys.CHAIN_ID))) {
+                continue;
+            }
+            if (!parentUuid.equals(readDetail(b, 
NASBackupChainKeys.PARENT_BACKUP_ID))) {
+                continue;
+            }
+            if (isDeletePending(b)) {
+                // Tombstoned children don't keep us alive — they're already 
on the way out.
+                continue;
+            }
+            children.add(b);
+        }
+        return children;
+    }
+
+    /**
+     * Look up this backup's immediate parent in the chain (by {@code 
PARENT_BACKUP_ID}).
+     * Returns {@code null} if this is the full (no parent) or the parent row 
is gone.
+     */
+    private Backup findChainParent(Backup backup) {
+        String parentUuid = readDetail(backup, 
NASBackupChainKeys.PARENT_BACKUP_ID);
+        if (parentUuid == null || parentUuid.isEmpty()) {
+            return null;
+        }
+        for (Backup b : backupDao.listByVmId(null, backup.getVmId())) {
+            if (parentUuid.equals(b.getUuid())) {
+                return b;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Physically delete {@code backup}, then walk up the chain while each 
ancestor is in
+     * delete-pending state with no other live children. Mirrors the snapshot 
subsystem
+     * pattern: once a leaf is gone, garbage-collect any tombstoned parents.
+     */
+    private boolean deleteBackupAndSweepPendingAncestors(Backup backup, 
BackupRepository repo, Host host) {
+        if (!deleteBackupFileAndRow(backup, repo, host)) {
+            return false;
+        }
+        Backup parent = findChainParent(backup);
+        while (parent != null && isDeletePending(parent) && 
findLiveChildren(parent).isEmpty()) {

Review Comment:
   We don't need the check `findLiveChildren(parent).isEmpty()` - it will 
always be true.
   
   We come here when backup doesn't have any live children and we delete the 
backup at line 993 so its parent should also have no live children.
   
   You can rename the method `deleteLeafBackupAndSweepPendingAncestors` to be 
more clear
   
   Also call `Backup parent = findChainParent(backup);` before
   ```
   
   if (!deleteBackupFileAndRow(backup, repo, host)) {
               return false;
           }
   ```
   as backup is getting deleted inside `deleteBackupFileAndRow`



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -239,9 +578,35 @@ public Pair<Boolean, Backup> takeBackup(final 
VirtualMachine vm, Boolean quiesce
             backupVO.setDate(new Date());
             backupVO.setSize(answer.getSize());
             backupVO.setStatus(Backup.Status.BackedUp);
+            // If the agent fell back to full (stopped VM mid-incremental 
cycle), record this
+            // backup as a full and start a new chain.
+            ChainDecision effective = decision;
+            if (answer.getIncrementalFallback()) {
+                effective = ChainDecision.fullStart(decision.bitmapNew);
+                backupVO.setType("FULL");

Review Comment:
   We update backupVO.type here to FULL but haven't updated the backup-details 
table which still says Incremental (as initially decided).
   `
   backupDetailsDao.persist(new BackupDetailVO(backup.getId(), 
NASBackupChainKeys.TYPE, decision.mode, true));`
   
   We are having two sources of truth here. I recommend not to persist 
`NASBackupChainKeys.TYPE` in backup details, but use backupVO.type as the 
single source of truth to say if the backup is full or incremental.



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -168,6 +200,305 @@ protected Host getVMHypervisorHost(VirtualMachine vm) {
         return 
resourceManager.findOneRandomRunningHostByHypervisor(Hypervisor.HypervisorType.KVM,
 vm.getDataCenterId());
     }
 
+    /**
+     * Returned by {@link #decideChain(VirtualMachine)} to describe the next 
backup's place in
+     * the chain: full vs incremental, the bitmap name to create, and (for 
incrementals) the
+     * parent bitmap and parent file path.
+     */
+    static final class ChainDecision {
+        final String mode;            // "full" or "incremental"
+        final String bitmapNew;
+        final String bitmapParent;    // null for full
+        // Per-volume parent backup file paths, one per current VM volume in 
deviceId order.
+        // null/empty for full. Replaces the single parentPath field — each 
volume needs its
+        // own parent file because backup files are named after each volume's 
own UUID
+        // (root.<uuid>.qcow2 / datadisk.<uuid>.qcow2), abh1sar review at line 
340.
+        final List<String> parentPaths;
+        final String chainId;         // chain identifier this backup belongs 
to
+        final int chainPosition;      // 0 for full, N for the Nth incremental 
in the chain
+
+        private ChainDecision(String mode, String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                              String chainId, int chainPosition) {
+            this.mode = mode;
+            this.bitmapNew = bitmapNew;
+            this.bitmapParent = bitmapParent;
+            this.parentPaths = parentPaths;
+            this.chainId = chainId;
+            this.chainPosition = chainPosition;
+        }
+
+        static ChainDecision fullStart(String bitmapName) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_FULL, bitmapName, 
null, null,
+                    UUID.randomUUID().toString(), 0);
+        }
+
+        static ChainDecision incremental(String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                                         String chainId, int chainPosition) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_INCREMENTAL, 
bitmapNew, bitmapParent,
+                    parentPaths, chainId, chainPosition);
+        }
+
+        boolean isIncremental() {
+            return NASBackupChainKeys.TYPE_INCREMENTAL.equals(mode);
+        }
+    }
+
+    /**
+     * Decides whether the next backup for {@code vm} should be a fresh full 
or an incremental
+     * appended to the existing chain. Stopped VMs are always full (libvirt 
{@code backup-begin}
+     * requires a running QEMU process). The {@code nas.backup.full.every} 
ConfigKey controls
+     * how many backups (full + incrementals) form one chain before a new full 
is forced.
+     *
+     * <p>The decision is anchored on the VM's {@code 
nas.active_checkpoint_id} detail, which
+     * records the bitmap that currently exists on the running QEMU. After a 
restore that
+     * detail is cleared, so the next backup is automatically full — even 
though there may be
+     * a more recent "last backup taken" row in the database. This matches the 
prescription in
+     * the PR review (avoid relying on "last backup" because that breaks after 
restore).</p>
+     */
+    protected ChainDecision decideChain(VirtualMachine vm) {
+        final String newBitmap = "backup-" + System.currentTimeMillis() / 
1000L;
+
+        // Master switch — when the operator disables incrementals at the zone 
level every
+        // backup is taken as a fresh full. Existing chains stay restorable 
because each
+        // backup's metadata is kept independently; restoring an incremental 
still walks its
+        // own chain (the per-backup chain_id / parent_backup_id details 
persist regardless
+        // of the live config). The next backup with this flag back on starts 
a new chain.
+        Boolean incrementalEnabled = 
NASBackupIncrementalEnabled.valueIn(vm.getDataCenterId());
+        if (incrementalEnabled == null || !incrementalEnabled) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Stopped VMs cannot do incrementals — script will also fall back, 
but we make the
+        // decision here so we register the right type up-front.
+        if (VirtualMachine.State.Stopped.equals(vm.getState())) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        Integer fullEvery = NASBackupFullEvery.valueIn(vm.getDataCenterId());
+        if (fullEvery == null || fullEvery <= 1) {
+            // Disabled or every-backup-is-full mode.
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 1. If the VM has no active_checkpoint_id, there is no bitmap on the 
host to use as
+        //    a parent. This is the case after restore (we clear it), after VM 
was just assigned
+        //    to the offering, or on the very first backup.
+        String activeCheckpoint = readVmActiveCheckpoint(vm.getId());
+        if (activeCheckpoint == null) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 2. Find the latest BackedUp backup of this VM whose BITMAP_NAME 
matches the VM's
+        //    active_checkpoint_id. Only that backup is a safe parent — any 
earlier backup
+        //    would have a bitmap that QEMU may have rotated out. Per the 
review:
+        //      "The latest backup should have the bitmap_name equal to the 
VM's
+        //       active_checkpoint_id which will become the parent backup. If 
not, force full."
+        Backup parent = findLatestBackedUpBackupWithBitmap(vm.getId(), 
activeCheckpoint);
+        if (parent == null) {
+            LOG.debug("VM {} has active_checkpoint_id={} but no matching 
backup found — forcing full",
+                    vm.getInstanceName(), activeCheckpoint);
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        String parentChainId = readDetail(parent, NASBackupChainKeys.CHAIN_ID);
+        int parentChainPosition = chainPosition(parent);
+        if (parentChainId == null || parentChainPosition == Integer.MAX_VALUE) 
{
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Force a fresh full when the chain has reached the configured length.
+        if (parentChainPosition + 1 >= fullEvery) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // The script needs the parent backup's on-NAS file path PER VOLUME so 
it can rebase
+        // each new qcow2 onto the matching parent. The paths are stored 
relative to the NAS
+        // mount root — the script resolves them inside its mount session. 
When alignment
+        // fails (volume count changed, etc.) compose returns null and we fall 
back to full
+        // so we don't risk corrupting the chain.
+        List<String> parentPaths = composeParentBackupPaths(parent, 
vm.getId());
+        if (parentPaths == null) {
+            LOG.debug("VM {} parent backup {} volume layout no longer matches 
current VM — forcing full",
+                    vm.getInstanceName(), parent.getUuid());
+            return ChainDecision.fullStart(newBitmap);
+        }
+        return ChainDecision.incremental(newBitmap, activeCheckpoint, 
parentPaths,
+                parentChainId, parentChainPosition + 1);
+    }
+
+    /**
+     * Read the {@code nas.active_checkpoint_id} VM detail. Returns {@code 
null} when no detail
+     * exists (post-restore, first backup, or after explicit reset).
+     */
+    private String readVmActiveCheckpoint(long vmId) {
+        VMInstanceDetailVO d = vmInstanceDetailsDao.findDetail(vmId, 
NASBackupChainKeys.VM_ACTIVE_CHECKPOINT_ID);
+        if (d == null) {
+            return null;
+        }
+        String v = d.getValue();
+        return (v == null || v.isEmpty()) ? null : v;
+    }
+
+    /**
+     * Locate the most-recent {@code BackedUp} backup for {@code vmId} whose 
bitmap name equals
+     * {@code bitmapName}. Used by {@link #decideChain} to anchor the next 
incremental.
+     */
+    private Backup findLatestBackedUpBackupWithBitmap(long vmId, String 
bitmapName) {
+        List<Backup> history = backupDao.listByVmId(null, vmId);
+        if (history == null || history.isEmpty()) {
+            return null;
+        }
+        history.sort(Comparator.comparing(Backup::getDate).reversed());
+        for (Backup b : history) {
+            if (!Backup.Status.BackedUp.equals(b.getStatus())) {
+                continue;
+            }
+            if (bitmapName.equals(readDetail(b, 
NASBackupChainKeys.BITMAP_NAME))) {
+                return b;
+            }
+        }
+        return null;
+    }
+
+    private String readDetail(Backup backup, String key) {
+        BackupDetailVO d = backupDetailsDao.findDetail(backup.getId(), key);
+        return d == null ? null : d.getValue();
+    }
+
+    /**
+     * Compose the on-NAS path of EVERY parent backup file (one per VM volume) 
in the same
+     * order the script will iterate the current VM's disks (deviceId asc). 
Relative to the
+     * NAS mount, matches the layout written by {@code nasbackup.sh}:
+     *   first  disk -> {@code <backupPath>/root.<volUuid>.qcow2}
+     *   others      -> {@code <backupPath>/datadisk.<volUuid>.qcow2}
+     *
+     * Returns {@code null} if the parent's stored volume list can't be 
aligned with the
+     * current VM's volumes (count mismatch / different volume identities). In 
that case the
+     * caller should force a fresh full so we don't accidentally rebase a data 
disk onto the
+     * root parent — exactly the bug abh1sar flagged at line 340.
+     */
+    private List<String> composeParentBackupPaths(Backup parent, long vmId) {
+        // backupPath is stored as externalId by createBackupObject — e.g.
+        // "i-2-1234-VM/2026.04.27.13.45.00".
+        String dir = parent.getExternalId();
+        if (dir == null || dir.isEmpty()) {
+            return null;
+        }
+
+        // Read the parent's backed-up volume list (uuid order = deviceId 
order at the time
+        // the parent was taken). The script names files as root.<uuid>.qcow2 
for the first
+        // entry and datadisk.<uuid>.qcow2 for subsequent entries — match that 
here.
+        List<Backup.VolumeInfo> parentVols = parent.getBackedUpVolumes();
+        if (parentVols == null || parentVols.isEmpty()) {
+            return null;
+        }
+
+        // Sanity 1: the current VM must have the same number of volumes. If 
it doesn't (volume
+        // added or removed since the parent), positional alignment is unsafe 
— caller falls back to full.
+        List<VolumeVO> currentVols = volumeDao.findByInstance(vmId);
+        if (currentVols == null || currentVols.size() != parentVols.size()) {
+            return null;
+        }
+
+        // Sanity 2: VolumeDao.findByInstance() has no ordering guarantee. 
Sort the current
+        // volumes by deviceId so positional comparison against parentVols 
(also deviceId-ordered
+        // at backup time) is meaningful.
+        List<VolumeVO> currentSorted = new ArrayList<>(currentVols);
+        currentSorted.sort(Comparator.comparing(Volume::getDeviceId));
+
+        // Sanity 3: verify each current volume's UUID matches the parent's 
recorded UUID at the
+        // same position. If any disk was detached + a different one attached 
in its place, the
+        // chain cannot be safely continued — force a full instead of silently 
rebasing onto the
+        // wrong parent file.
+        for (int i = 0; i < parentVols.size(); i++) {
+            String parentUuid = parentVols.get(i).getUuid();
+            String currentUuid = currentSorted.get(i).getUuid();
+            if (parentUuid == null || parentUuid.isEmpty()
+                    || currentUuid == null || !parentUuid.equals(currentUuid)) 
{
+                LOG.debug("Volume identity mismatch at position {} for VM {}: 
parent uuid {} vs current uuid {}. " +
+                        "Forcing a full backup to start a fresh chain.",
+                        i, vmId, parentUuid, currentUuid);
+                return null;
+            }
+        }
+
+        List<String> paths = new ArrayList<>(parentVols.size());

Review Comment:
   Can we just call getVolumePoolsAndPaths(parentVols) here?
   It serves the same purpose and handles all supported storage pools
   



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -506,13 +909,162 @@ public boolean deleteBackup(Backup backup, boolean 
forced) {
         } catch (OperationTimedoutException e) {
             throw new CloudRuntimeException("Operation to delete backup timed 
out, please try again");
         }
+        if (answer == null || !answer.getResult()) {
+            logger.warn("Failed to delete backup file for {} ({}); leaving DB 
row intact",
+                    backup.getUuid(), backup.getExternalId());
+            return false;
+        }
+        backupDao.remove(backup.getId());
+        return true;
+    }
 
-        if (answer != null && answer.getResult()) {
-            return backupDao.remove(backup.getId());
+    /**
+     * Mark {@code backup} as delete-pending in {@code backup_details}. 
Idempotent.
+     */
+    private void markDeletePending(Backup backup) {
+        BackupDetailVO existing = backupDetailsDao.findDetail(backup.getId(), 
NASBackupChainKeys.DELETE_PENDING);
+        if (existing == null) {
+            backupDetailsDao.persist(new BackupDetailVO(backup.getId(),
+                    NASBackupChainKeys.DELETE_PENDING, "true", true));
         }
+    }
 
-        logger.debug("There was an error removing the backup with id {}", 
backup.getId());
-        return false;
+    /**
+     * @return true if this backup carries the delete-pending tombstone.
+     */
+    private boolean isDeletePending(Backup backup) {
+        BackupDetailVO d = backupDetailsDao.findDetail(backup.getId(), 
NASBackupChainKeys.DELETE_PENDING);
+        return d != null && "true".equalsIgnoreCase(d.getValue());
+    }
+
+    /**
+     * Return the live (not delete-pending, not Removed) children of {@code 
parent} within the
+     * same chain. Equivalent to "incrementals whose parent_backup_id points 
at parent".
+     */
+    private List<Backup> findLiveChildren(Backup parent) {
+        String parentUuid = parent.getUuid();
+        String chainId = readDetail(parent, NASBackupChainKeys.CHAIN_ID);
+        if (parentUuid == null || chainId == null) {
+            return Collections.emptyList();
+        }
+        List<Backup> children = new ArrayList<>();
+        for (Backup b : backupDao.listByVmId(null, parent.getVmId())) {
+            if (b.getId() == parent.getId()) {
+                continue;
+            }
+            if (!chainId.equals(readDetail(b, NASBackupChainKeys.CHAIN_ID))) {
+                continue;
+            }
+            if (!parentUuid.equals(readDetail(b, 
NASBackupChainKeys.PARENT_BACKUP_ID))) {
+                continue;
+            }
+            if (isDeletePending(b)) {
+                // Tombstoned children don't keep us alive — they're already 
on the way out.
+                continue;
+            }
+            children.add(b);
+        }
+        return children;
+    }
+
+    /**
+     * Look up this backup's immediate parent in the chain (by {@code 
PARENT_BACKUP_ID}).
+     * Returns {@code null} if this is the full (no parent) or the parent row 
is gone.
+     */
+    private Backup findChainParent(Backup backup) {
+        String parentUuid = readDetail(backup, 
NASBackupChainKeys.PARENT_BACKUP_ID);
+        if (parentUuid == null || parentUuid.isEmpty()) {
+            return null;
+        }
+        for (Backup b : backupDao.listByVmId(null, backup.getVmId())) {
+            if (parentUuid.equals(b.getUuid())) {
+                return b;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Physically delete {@code backup}, then walk up the chain while each 
ancestor is in
+     * delete-pending state with no other live children. Mirrors the snapshot 
subsystem
+     * pattern: once a leaf is gone, garbage-collect any tombstoned parents.
+     */
+    private boolean deleteBackupAndSweepPendingAncestors(Backup backup, 
BackupRepository repo, Host host) {
+        if (!deleteBackupFileAndRow(backup, repo, host)) {
+            return false;
+        }
+        Backup parent = findChainParent(backup);
+        while (parent != null && isDeletePending(parent) && 
findLiveChildren(parent).isEmpty()) {
+            Backup nextParent = findChainParent(parent);
+            if (!deleteBackupFileAndRow(parent, repo, host)) {
+                // Stop the sweep; the rest of the tombstoned chain will be 
collected on a
+                // future delete that re-runs the sweep.
+                return true;
+            }
+            parent = nextParent;
+        }
+        return true;
+    }
+
+    /**
+     * Forced delete: remove this backup plus every descendant, leaf-first. 
Used when the
+     * caller explicitly passes {@code forced=true} and wants the whole 
subtree gone now.
+     */
+    private boolean cascadeDeleteSubtree(Backup root, BackupRepository repo, 
Host host) {
+        List<Backup> subtree = collectSubtree(root);
+        // Sort by chain_position descending so leaves go first — this keeps 
every rm
+        // operating on a chain that still resolves, even if the user is 
watching mid-flight.
+        subtree.sort(Comparator.comparingInt((Backup b) -> 
chainPosition(b)).reversed());
+        boolean ok = true;
+        for (Backup victim : subtree) {
+            if (!deleteBackupFileAndRow(victim, repo, host)) {
+                ok = false;
+                break;
+            }
+        }
+        return ok;
+    }
+
+    /**
+     * Collect {@code root} plus every transitive child in the same chain. 
BFS-style.
+     */
+    private List<Backup> collectSubtree(Backup root) {

Review Comment:
   We don't need BFS style sweep here as any backup can be part of a single 
chain only.
   
   just look up all the backups and find the backup with the highest 
`CHAIN_POSITION` with the same `CHAIN_ID`.
   return that backup and call `deleteBackupAndSweepPendingAncestors()` with 
that backup in cascadeDeleteSubtree()



##########
plugins/backup/nas/src/main/java/org/apache/cloudstack/backup/NASBackupProvider.java:
##########
@@ -168,6 +200,305 @@ protected Host getVMHypervisorHost(VirtualMachine vm) {
         return 
resourceManager.findOneRandomRunningHostByHypervisor(Hypervisor.HypervisorType.KVM,
 vm.getDataCenterId());
     }
 
+    /**
+     * Returned by {@link #decideChain(VirtualMachine)} to describe the next 
backup's place in
+     * the chain: full vs incremental, the bitmap name to create, and (for 
incrementals) the
+     * parent bitmap and parent file path.
+     */
+    static final class ChainDecision {
+        final String mode;            // "full" or "incremental"
+        final String bitmapNew;
+        final String bitmapParent;    // null for full
+        // Per-volume parent backup file paths, one per current VM volume in 
deviceId order.
+        // null/empty for full. Replaces the single parentPath field — each 
volume needs its
+        // own parent file because backup files are named after each volume's 
own UUID
+        // (root.<uuid>.qcow2 / datadisk.<uuid>.qcow2), abh1sar review at line 
340.
+        final List<String> parentPaths;
+        final String chainId;         // chain identifier this backup belongs 
to
+        final int chainPosition;      // 0 for full, N for the Nth incremental 
in the chain
+
+        private ChainDecision(String mode, String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                              String chainId, int chainPosition) {
+            this.mode = mode;
+            this.bitmapNew = bitmapNew;
+            this.bitmapParent = bitmapParent;
+            this.parentPaths = parentPaths;
+            this.chainId = chainId;
+            this.chainPosition = chainPosition;
+        }
+
+        static ChainDecision fullStart(String bitmapName) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_FULL, bitmapName, 
null, null,
+                    UUID.randomUUID().toString(), 0);
+        }
+
+        static ChainDecision incremental(String bitmapNew, String 
bitmapParent, List<String> parentPaths,
+                                         String chainId, int chainPosition) {
+            return new ChainDecision(NASBackupChainKeys.TYPE_INCREMENTAL, 
bitmapNew, bitmapParent,
+                    parentPaths, chainId, chainPosition);
+        }
+
+        boolean isIncremental() {
+            return NASBackupChainKeys.TYPE_INCREMENTAL.equals(mode);
+        }
+    }
+
+    /**
+     * Decides whether the next backup for {@code vm} should be a fresh full 
or an incremental
+     * appended to the existing chain. Stopped VMs are always full (libvirt 
{@code backup-begin}
+     * requires a running QEMU process). The {@code nas.backup.full.every} 
ConfigKey controls
+     * how many backups (full + incrementals) form one chain before a new full 
is forced.
+     *
+     * <p>The decision is anchored on the VM's {@code 
nas.active_checkpoint_id} detail, which
+     * records the bitmap that currently exists on the running QEMU. After a 
restore that
+     * detail is cleared, so the next backup is automatically full — even 
though there may be
+     * a more recent "last backup taken" row in the database. This matches the 
prescription in
+     * the PR review (avoid relying on "last backup" because that breaks after 
restore).</p>
+     */
+    protected ChainDecision decideChain(VirtualMachine vm) {
+        final String newBitmap = "backup-" + System.currentTimeMillis() / 
1000L;
+
+        // Master switch — when the operator disables incrementals at the zone 
level every
+        // backup is taken as a fresh full. Existing chains stay restorable 
because each
+        // backup's metadata is kept independently; restoring an incremental 
still walks its
+        // own chain (the per-backup chain_id / parent_backup_id details 
persist regardless
+        // of the live config). The next backup with this flag back on starts 
a new chain.
+        Boolean incrementalEnabled = 
NASBackupIncrementalEnabled.valueIn(vm.getDataCenterId());
+        if (incrementalEnabled == null || !incrementalEnabled) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Stopped VMs cannot do incrementals — script will also fall back, 
but we make the
+        // decision here so we register the right type up-front.
+        if (VirtualMachine.State.Stopped.equals(vm.getState())) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        Integer fullEvery = NASBackupFullEvery.valueIn(vm.getDataCenterId());
+        if (fullEvery == null || fullEvery <= 1) {
+            // Disabled or every-backup-is-full mode.
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 1. If the VM has no active_checkpoint_id, there is no bitmap on the 
host to use as
+        //    a parent. This is the case after restore (we clear it), after VM 
was just assigned
+        //    to the offering, or on the very first backup.
+        String activeCheckpoint = readVmActiveCheckpoint(vm.getId());
+        if (activeCheckpoint == null) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // 2. Find the latest BackedUp backup of this VM whose BITMAP_NAME 
matches the VM's
+        //    active_checkpoint_id. Only that backup is a safe parent — any 
earlier backup
+        //    would have a bitmap that QEMU may have rotated out. Per the 
review:
+        //      "The latest backup should have the bitmap_name equal to the 
VM's
+        //       active_checkpoint_id which will become the parent backup. If 
not, force full."
+        Backup parent = findLatestBackedUpBackupWithBitmap(vm.getId(), 
activeCheckpoint);
+        if (parent == null) {
+            LOG.debug("VM {} has active_checkpoint_id={} but no matching 
backup found — forcing full",
+                    vm.getInstanceName(), activeCheckpoint);
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        String parentChainId = readDetail(parent, NASBackupChainKeys.CHAIN_ID);
+        int parentChainPosition = chainPosition(parent);
+        if (parentChainId == null || parentChainPosition == Integer.MAX_VALUE) 
{
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // Force a fresh full when the chain has reached the configured length.
+        if (parentChainPosition + 1 >= fullEvery) {
+            return ChainDecision.fullStart(newBitmap);
+        }
+
+        // The script needs the parent backup's on-NAS file path PER VOLUME so 
it can rebase
+        // each new qcow2 onto the matching parent. The paths are stored 
relative to the NAS
+        // mount root — the script resolves them inside its mount session. 
When alignment
+        // fails (volume count changed, etc.) compose returns null and we fall 
back to full
+        // so we don't risk corrupting the chain.
+        List<String> parentPaths = composeParentBackupPaths(parent, 
vm.getId());
+        if (parentPaths == null) {
+            LOG.debug("VM {} parent backup {} volume layout no longer matches 
current VM — forcing full",
+                    vm.getInstanceName(), parent.getUuid());
+            return ChainDecision.fullStart(newBitmap);
+        }
+        return ChainDecision.incremental(newBitmap, activeCheckpoint, 
parentPaths,
+                parentChainId, parentChainPosition + 1);
+    }
+
+    /**
+     * Read the {@code nas.active_checkpoint_id} VM detail. Returns {@code 
null} when no detail
+     * exists (post-restore, first backup, or after explicit reset).
+     */
+    private String readVmActiveCheckpoint(long vmId) {
+        VMInstanceDetailVO d = vmInstanceDetailsDao.findDetail(vmId, 
NASBackupChainKeys.VM_ACTIVE_CHECKPOINT_ID);
+        if (d == null) {
+            return null;
+        }
+        String v = d.getValue();
+        return (v == null || v.isEmpty()) ? null : v;
+    }
+
+    /**
+     * Locate the most-recent {@code BackedUp} backup for {@code vmId} whose 
bitmap name equals
+     * {@code bitmapName}. Used by {@link #decideChain} to anchor the next 
incremental.
+     */
+    private Backup findLatestBackedUpBackupWithBitmap(long vmId, String 
bitmapName) {
+        List<Backup> history = backupDao.listByVmId(null, vmId);
+        if (history == null || history.isEmpty()) {
+            return null;
+        }
+        history.sort(Comparator.comparing(Backup::getDate).reversed());
+        for (Backup b : history) {
+            if (!Backup.Status.BackedUp.equals(b.getStatus())) {
+                continue;
+            }
+            if (bitmapName.equals(readDetail(b, 
NASBackupChainKeys.BITMAP_NAME))) {
+                return b;
+            }
+        }
+        return null;
+    }
+
+    private String readDetail(Backup backup, String key) {
+        BackupDetailVO d = backupDetailsDao.findDetail(backup.getId(), key);
+        return d == null ? null : d.getValue();
+    }
+
+    /**
+     * Compose the on-NAS path of EVERY parent backup file (one per VM volume) 
in the same
+     * order the script will iterate the current VM's disks (deviceId asc). 
Relative to the
+     * NAS mount, matches the layout written by {@code nasbackup.sh}:
+     *   first  disk -> {@code <backupPath>/root.<volUuid>.qcow2}
+     *   others      -> {@code <backupPath>/datadisk.<volUuid>.qcow2}
+     *
+     * Returns {@code null} if the parent's stored volume list can't be 
aligned with the
+     * current VM's volumes (count mismatch / different volume identities). In 
that case the
+     * caller should force a fresh full so we don't accidentally rebase a data 
disk onto the
+     * root parent — exactly the bug abh1sar flagged at line 340.
+     */
+    private List<String> composeParentBackupPaths(Backup parent, long vmId) {
+        // backupPath is stored as externalId by createBackupObject — e.g.
+        // "i-2-1234-VM/2026.04.27.13.45.00".
+        String dir = parent.getExternalId();
+        if (dir == null || dir.isEmpty()) {
+            return null;
+        }
+
+        // Read the parent's backed-up volume list (uuid order = deviceId 
order at the time
+        // the parent was taken). The script names files as root.<uuid>.qcow2 
for the first
+        // entry and datadisk.<uuid>.qcow2 for subsequent entries — match that 
here.
+        List<Backup.VolumeInfo> parentVols = parent.getBackedUpVolumes();
+        if (parentVols == null || parentVols.isEmpty()) {
+            return null;
+        }
+
+        // Sanity 1: the current VM must have the same number of volumes. If 
it doesn't (volume
+        // added or removed since the parent), positional alignment is unsafe 
— caller falls back to full.
+        List<VolumeVO> currentVols = volumeDao.findByInstance(vmId);
+        if (currentVols == null || currentVols.size() != parentVols.size()) {
+            return null;
+        }
+
+        // Sanity 2: VolumeDao.findByInstance() has no ordering guarantee. 
Sort the current
+        // volumes by deviceId so positional comparison against parentVols 
(also deviceId-ordered
+        // at backup time) is meaningful.
+        List<VolumeVO> currentSorted = new ArrayList<>(currentVols);
+        currentSorted.sort(Comparator.comparing(Volume::getDeviceId));
+
+        // Sanity 3: verify each current volume's UUID matches the parent's 
recorded UUID at the
+        // same position. If any disk was detached + a different one attached 
in its place, the
+        // chain cannot be safely continued — force a full instead of silently 
rebasing onto the
+        // wrong parent file.

Review Comment:
   This is not possible. When an instance is assigned to a backup offering, 
detach and attach volume operations are not allowed. It can happen that the 
backup offering is removed from the VM, volume is detached and  new volume 
attached, and then backup offering is assigned back.
   
   We should avoid doing incremental in this case by setting VM's active 
checkpoint id to null whenever backup offering is removed from the VM. This 
will make sure that the next backup is not incremental.
   And we don't need to explicitly check all the volumes.
   
   You can add the change to reset VM's active checkpoint Id in 
`NASBackupProvider.removeVMFromBackupOffering()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to