This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f6255295 HDDS-12127. RM should not expire pending deletes, but retry 
until delete is confirmed or node is dead (#7746)
04f6255295 is described below

commit 04f625529514fa7cdf47daa4420b4032b6be8983
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Jan 28 10:21:36 2025 +0000

    HDDS-12127. RM should not expire pending deletes, but retry until delete is 
confirmed or node is dead (#7746)
---
 .../container/replication/ContainerReplicaOp.java  |  12 +-
 .../replication/ContainerReplicaPendingOps.java    |  36 +++--
 .../replication/ECUnderReplicationHandler.java     |   2 +-
 .../container/replication/ReplicationManager.java  |  33 +++-
 .../hdds/scm/server/StorageContainerManager.java   |   3 +
 .../scm/container/balancer/TestMoveManager.java    |  28 ++--
 .../TestContainerReplicaPendingOps.java            | 176 +++++++++++++--------
 .../replication/TestECContainerReplicaCount.java   |   4 +-
 .../replication/TestReplicationManager.java        |  38 ++++-
 .../TestReplicationManagerScenarios.java           |   4 +-
 .../commandhandler/TestBlockDeletion.java          |   4 +-
 11 files changed, 230 insertions(+), 110 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 34cc01eb89..99fcda97b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 /**
  * Class to wrap details used to track pending replications.
@@ -34,19 +35,20 @@ public enum PendingOpType {
   private final PendingOpType opType;
   private final DatanodeDetails target;
   private final int replicaIndex;
+  private final SCMCommand<?> command;
   private final long deadlineEpochMillis;
 
   public static ContainerReplicaOp create(PendingOpType opType,
       DatanodeDetails target, int replicaIndex) {
-    return new ContainerReplicaOp(opType, target, replicaIndex,
-        System.currentTimeMillis());
+    return new ContainerReplicaOp(opType, target, replicaIndex, null, 
System.currentTimeMillis());
   }
 
   public ContainerReplicaOp(PendingOpType opType,
-      DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
     this.opType = opType;
     this.target = target;
     this.replicaIndex = replicaIndex;
+    this.command = command;
     this.deadlineEpochMillis = deadlineEpochMillis;
   }
 
@@ -62,6 +64,10 @@ public int getReplicaIndex() {
     return replicaIndex;
   }
 
+  public SCMCommand<?> getCommand() {
+    return command;
+  }
+
   public long getDeadlineEpochMillis() {
     return deadlineEpochMillis;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
index 4eef0a8a74..ad2f8b6bf4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import java.time.Clock;
 import java.util.ArrayList;
@@ -117,13 +118,14 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID 
containerID) {
    * @param containerID ContainerID for which to add a replica
    * @param target The target datanode
    * @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
+   * @param command The command to send to the datanode
    * @param deadlineEpochMillis The time by which the replica should have been
    *                            added and reported by the datanode, or it will
    *                            be discarded.
    */
   public void scheduleAddReplica(ContainerID containerID,
-      DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
-    addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis);
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
+    addReplica(ADD, containerID, target, replicaIndex, command, 
deadlineEpochMillis);
   }
 
   /**
@@ -131,13 +133,14 @@ public void scheduleAddReplica(ContainerID containerID,
    * @param containerID ContainerID for which to delete a replica
    * @param target The target datanode
    * @param replicaIndex The replica index (zero for Ratis, &gt; 0 for EC)
+   * @param command The command to send to the datanode
    * @param deadlineEpochMillis The time by which the replica should have been
    *                            deleted and reported by the datanode, or it 
will
    *                            be discarded.
    */
   public void scheduleDeleteReplica(ContainerID containerID,
-      DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
-    addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis);
+      DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long 
deadlineEpochMillis) {
+    addReplica(DELETE, containerID, target, replicaIndex, command, 
deadlineEpochMillis);
   }
 
   /**
@@ -150,7 +153,7 @@ public void scheduleDeleteReplica(ContainerID containerID,
    */
   public boolean completeAddReplica(ContainerID containerID,
       DatanodeDetails target, int replicaIndex) {
-    boolean completed = completeOp(ADD, containerID, target, replicaIndex);
+    boolean completed = completeOp(ADD, containerID, target, replicaIndex, 
true);
     if (isMetricsNotNull() && completed) {
       if (isEC(replicaIndex)) {
         replicationMetrics.incrEcReplicasCreatedTotal();
@@ -172,7 +175,7 @@ public boolean completeAddReplica(ContainerID containerID,
    */
   public boolean completeDeleteReplica(ContainerID containerID,
       DatanodeDetails target, int replicaIndex) {
-    boolean completed = completeOp(DELETE, containerID, target, replicaIndex);
+    boolean completed = completeOp(DELETE, containerID, target, replicaIndex, 
true);
     if (isMetricsNotNull() && completed) {
       if (isEC(replicaIndex)) {
         replicationMetrics.incrEcReplicasDeletedTotal();
@@ -192,7 +195,7 @@ public boolean completeDeleteReplica(ContainerID 
containerID,
   public boolean removeOp(ContainerID containerID,
       ContainerReplicaOp op) {
     return completeOp(op.getOpType(), containerID, op.getTarget(),
-        op.getReplicaIndex());
+        op.getReplicaIndex(), true);
   }
 
   /**
@@ -221,9 +224,13 @@ public void removeExpiredEntries() {
         while (iterator.hasNext()) {
           ContainerReplicaOp op = iterator.next();
           if (clock.millis() > op.getDeadlineEpochMillis()) {
-            iterator.remove();
+            if (op.getOpType() != DELETE) {
+              // For delete ops, we don't remove them from the list as RM must 
resend them, or they
+              // will be removed via a container report when they are 
confirmed as deleted.
+              iterator.remove();
+              decrementCounter(op.getOpType(), op.getReplicaIndex());
+            }
             expiredOps.add(op);
-            decrementCounter(op.getOpType(), op.getReplicaIndex());
             updateTimeoutMetrics(op);
           }
         }
@@ -258,15 +265,18 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
   }
 
   private void addReplica(ContainerReplicaOp.PendingOpType opType,
-      ContainerID containerID, DatanodeDetails target, int replicaIndex,
+      ContainerID containerID, DatanodeDetails target, int replicaIndex, 
SCMCommand<?> command,
       long deadlineEpochMillis) {
     Lock lock = writeLock(containerID);
     lock(lock);
     try {
+      // Remove any existing duplicate op for the same target and replicaIndex 
before adding
+      // the new one. Especially for delete ops, they could be getting resent 
after expiry.
+      completeOp(opType, containerID, target, replicaIndex, false);
       List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
           containerID, s -> new ArrayList<>());
       ops.add(new ContainerReplicaOp(opType,
-          target, replicaIndex, deadlineEpochMillis));
+          target, replicaIndex, command, deadlineEpochMillis));
       incrementCounter(opType, replicaIndex);
     } finally {
       unlock(lock);
@@ -274,7 +284,7 @@ private void addReplica(ContainerReplicaOp.PendingOpType 
opType,
   }
 
   private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
-      ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+      ContainerID containerID, DatanodeDetails target, int replicaIndex, 
boolean notifySubsribers) {
     boolean found = false;
     // List of completed ops that subscribers will be notified about
     List<ContainerReplicaOp> completedOps = new ArrayList<>();
@@ -303,7 +313,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType 
opType,
       unlock(lock);
     }
 
-    if (found) {
+    if (found && notifySubsribers) {
       notifySubscribers(completedOps, containerID, false);
     }
     return found;
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 2f77891046..09a757f577 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -621,7 +621,7 @@ private void createReplicateCommand(
   private void adjustPendingOps(ECContainerReplicaCount replicaCount,
                                 DatanodeDetails target, int replicaIndex) {
     replicaCount.addPendingOp(new ContainerReplicaOp(
-        ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
+        ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null,
         Long.MAX_VALUE));
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 1675da0fa8..07927ba689 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -103,7 +103,7 @@
  * that the containers are properly replicated. Replication Manager deals only
  * with Quasi Closed / Closed container.
  */
-public class ReplicationManager implements SCMService {
+public class ReplicationManager implements SCMService, 
ContainerReplicaPendingOpsSubscriber {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(ReplicationManager.class);
@@ -673,8 +673,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
     if (cmd.getType() == Type.deleteContainerCommand) {
       DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
       containerReplicaPendingOps.scheduleDeleteReplica(
-          containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
-          scmDeadlineEpochMs);
+          containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(), 
cmd, scmDeadlineEpochMs);
       if (rcc.getReplicaIndex() > 0) {
         getMetrics().incrEcDeletionCmdsSentTotal();
       } else if (rcc.getReplicaIndex() == 0) {
@@ -687,8 +686,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
       final ByteString targetIndexes = rcc.getMissingContainerIndexes();
       for (int i = 0; i < targetIndexes.size(); i++) {
         containerReplicaPendingOps.scheduleAddReplica(
-            containerInfo.containerID(), targets.get(i), 
targetIndexes.byteAt(i),
-            scmDeadlineEpochMs);
+            containerInfo.containerID(), targets.get(i), 
targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs);
       }
       getMetrics().incrEcReconstructionCmdsSentTotal();
     } else if (cmd.getType() == Type.replicateContainerCommand) {
@@ -702,7 +700,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
          */
         containerReplicaPendingOps.scheduleAddReplica(
             containerInfo.containerID(),
-            targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs);
+            targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
       } else {
         /*
         This means the source will push replica to the target, so the op's
@@ -710,7 +708,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo 
containerInfo,
          */
         containerReplicaPendingOps.scheduleAddReplica(
             containerInfo.containerID(),
-            rcc.getTargetDatanode(), rcc.getReplicaIndex(), 
scmDeadlineEpochMs);
+            rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd, 
scmDeadlineEpochMs);
       }
 
       if (rcc.getReplicaIndex() > 0) {
@@ -1043,6 +1041,27 @@ ReplicationQueue getQueue() {
     return replicationQueue.get();
   }
 
+  @Override
+  public void opCompleted(ContainerReplicaOp op, ContainerID containerID, 
boolean timedOut) {
+    if (!(timedOut && op.getOpType() == 
ContainerReplicaOp.PendingOpType.DELETE)) {
+      // We only care about expired delete ops. All others should be ignored.
+      return;
+    }
+    try {
+      ContainerInfo containerInfo = containerManager.getContainer(containerID);
+      // Sending the command in this way is un-throttled, and the command will 
have its deadline
+      // adjusted to a new deadline as part of the sending process.
+      sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget());
+    } catch (ContainerNotFoundException e) {
+      // Should not happen, as even deleted containers are currently retained 
in the SCM container map
+      LOG.error("Container {} not found when processing expired delete", 
containerID, e);
+    } catch (NotLeaderException e) {
+      // If SCM leadership has changed, this is fine to ignore. All pending 
ops will be expired
+      // once SCM leadership switches.
+      LOG.warn("SCM is not leader when processing expired delete", e);
+    }
+  }
+
   /**
    * Configuration used by the Replication Manager.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1a986f2f67..b6c41c7dcf 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -836,6 +836,9 @@ private void initializeSystemManagers(OzoneConfiguration 
conf,
       reconfigurationHandler.register(replicationManager.getConfig());
     }
     serviceManager.register(replicationManager);
+    // RM gets notified of expired pending delete from 
containerReplicaPendingOps by subscribing to it
+    // so it can resend them.
+    containerReplicaPendingOps.registerSubscriber(replicationManager);
     if (configurator.getScmSafeModeManager() != null) {
       scmSafeModeManager = configurator.getScmSafeModeManager();
     } else {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
index 0c5667d407..8cc62ab03c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
@@ -199,13 +199,13 @@ public void testMovePendingOpsExist() throws Exception {
     nodes.put(src, NodeStatus.inServiceHealthy());
     nodes.put(tgt, NodeStatus.inServiceHealthy());
 
-    pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, clock.millis()));
+    pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, null, clock.millis()));
 
     assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_REPLICATION,
         containerInfo.containerID());
 
     pendingOps.clear();
-    pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, clock.millis()));
+    pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, null, 
clock.millis()));
     assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_DELETION,
         containerInfo.containerID());
   }
@@ -325,7 +325,7 @@ public void testDeleteCommandFails() throws Exception {
         .when(containerManager).getContainer(any(ContainerID.class));
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult moveResult = res.get();
@@ -337,14 +337,14 @@ public void testSuccessfulMove() throws Exception {
     CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     verify(replicationManager).sendDeleteCommand(
         eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
 
     op = new ContainerReplicaOp(
-        DELETE, src, 0, clock.millis() + 1000);
+        DELETE, src, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -374,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws 
Exception {
         anyLong());
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, srcReplica.getReplicaIndex(), clock.millis() + 1000);
+        ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     verify(replicationManager).sendDeleteCommand(
@@ -382,7 +382,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws 
Exception {
         eq(true), anyLong());
 
     op = new ContainerReplicaOp(
-        DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000);
+        DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() + 
1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -394,7 +394,7 @@ public void testMoveTimeoutOnAdd() throws Exception {
     CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), true);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -406,14 +406,14 @@ public void testMoveTimeoutOnDelete() throws Exception {
     CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     verify(replicationManager).sendDeleteCommand(
         eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
 
     op = new ContainerReplicaOp(
-        DELETE, src, 0, clock.millis() + 1000);
+        DELETE, src, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), true);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -434,7 +434,7 @@ public void testMoveCompleteSrcNoLongerPresent() throws 
Exception {
       }
     }
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -450,7 +450,7 @@ public void testMoveCompleteSrcNotHealthy() throws 
Exception {
 
     nodes.put(src, NodeStatus.inServiceStale());
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -468,7 +468,7 @@ public void testMoveCompleteSrcNotInService() throws 
Exception {
         HddsProtos.NodeOperationalState.DECOMMISSIONING,
         HddsProtos.NodeState.HEALTHY));
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
@@ -487,7 +487,7 @@ public void testMoveCompleteFutureReplicasUnhealthy() 
throws Exception {
             .MisReplicatedHealthResult(containerInfo, false, null));
 
     ContainerReplicaOp op = new ContainerReplicaOp(
-        ADD, tgt, 0, clock.millis() + 1000);
+        ADD, tgt, 0, null, clock.millis() + 1000);
     moveManager.opCompleted(op, containerInfo.containerID(), false);
 
     MoveManager.MoveResult finalResult = res.get();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
index 3775531d30..0326a95f6c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -23,6 +23,9 @@
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.TestClock;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -56,6 +59,8 @@ public class TestContainerReplicaPendingOps {
   private DatanodeDetails dn3;
   private ReplicationManagerMetrics metrics;
   private long deadline;
+  private SCMCommand<?> addCmd;
+  private SCMCommand<?> deleteCmd;
 
   @BeforeEach
   public void setup() {
@@ -73,6 +78,9 @@ public void setup() {
     dn1 = MockDatanodeDetails.randomDatanodeDetails();
     dn2 = MockDatanodeDetails.randomDatanodeDetails();
     dn3 = MockDatanodeDetails.randomDatanodeDetails();
+
+    addCmd = ReplicateContainerCommand.toTarget(1, dn3);
+    deleteCmd =  new DeleteContainerCommand(1, false);
   }
 
   @AfterEach
@@ -91,8 +99,8 @@ public void testGetPendingOpsReturnsEmptyList() {
 
   @Test
   public void testClear() {
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deleteCmd, 
deadline);
 
     assertEquals(1, 
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
     assertEquals(1, 
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.DELETE));
@@ -108,17 +116,30 @@ public void testClear() {
 
   @Test
   public void testCanAddReplicasForAdd() {
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, 
deadline);
+    // Duplicate for DN2
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd, deadline 
+ 1);
+    // Not a duplicate for DN2 as different index. Should not happen in 
practice as it is not valid to have 2 indexes
+    // on the same node.
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 1, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, deadline 
+ 1);
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(new ContainerID(1));
-    assertEquals(3, ops.size());
+    assertEquals(4, ops.size());
     for (ContainerReplicaOp op : ops) {
-      assertEquals(0, op.getReplicaIndex());
+      if (!op.getTarget().equals(dn2)) {
+        assertEquals(0, op.getReplicaIndex());
+      }
       assertEquals(ADD, op.getOpType());
+      if (op.getTarget().equals(dn2) && op.getReplicaIndex() == 0) {
+        assertEquals(deadline + 1, op.getDeadlineEpochMillis());
+      } else {
+        assertEquals(deadline, op.getDeadlineEpochMillis());
+      }
     }
     List<DatanodeDetails> allDns = ops.stream()
         .map(ContainerReplicaOp::getTarget).collect(Collectors.toList());
@@ -131,14 +152,15 @@ public void testCanAddReplicasForAdd() {
     assertEquals(1, ops.get(0).getReplicaIndex());
     assertEquals(ADD, ops.get(0).getOpType());
     assertEquals(dn1, ops.get(0).getTarget());
+    assertEquals(deadline + 1, ops.get(0).getDeadlineEpochMillis());
   }
 
   @Test
   public void testCanAddReplicasForDelete() {
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, 
deadline);
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(new ContainerID(1));
@@ -162,11 +184,11 @@ public void testCanAddReplicasForDelete() {
 
   @Test
   public void testCompletingOps() {
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, 
deadline);
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(new ContainerID(1));
@@ -195,11 +217,11 @@ public void testCompletingOps() {
 
   @Test
   public void testRemoveSpecificOp() {
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, 
deadline);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, 
deadline);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, 
deadline);
 
     ContainerID cid = new ContainerID(1);
     List<ContainerReplicaOp> ops = pendingOps.getPendingOps(cid);
@@ -218,17 +240,18 @@ public void testRemoveExpiredEntries() {
     long expiry = clock.millis() + 1000;
     long laterExpiry =  clock.millis() + 2000;
     long latestExpiry = clock.millis() + 3000;
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, expiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, laterExpiry);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, laterExpiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, latestExpiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd, 
expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd, 
laterExpiry);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd, 
laterExpiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd, 
latestExpiry);
+    pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, 
latestExpiry);
 
     List<ContainerReplicaOp> ops =
         pendingOps.getPendingOps(new ContainerID(1));
     assertEquals(4, ops.size());
     ops = pendingOps.getPendingOps(new ContainerID(2));
-    assertEquals(1, ops.size());
+    assertEquals(2, ops.size());
 
     // Some entries expire at "start + 1000" some at start + 2000 and
     // start + 3000. Clock is currently at "start"
@@ -240,42 +263,50 @@ public void testRemoveExpiredEntries() {
 
     clock.fastForward(1000);
     pendingOps.removeExpiredEntries();
-    // Those with deadline + 1000 should be removed.
+    // Those ADD with deadline + 1000 should be removed, but deletes are 
retained
     ops = pendingOps.getPendingOps(new ContainerID(1));
-    assertEquals(2, ops.size());
+    assertEquals(3, ops.size());
     // We should lose the entries for DN1
-    List<DatanodeDetails> dns = ops.stream()
-        .map(ContainerReplicaOp::getTarget)
-        .collect(Collectors.toList());
-    assertThat(dns).doesNotContain(dn1);
-    assertThat(dns).contains(dn2);
-    assertThat(dns).contains(dn3);
+    assertFalse(isOpPresent(ops, dn1, 0, ADD));
+    assertTrue(isOpPresent(ops, dn1, 0, DELETE));
+    assertTrue(isOpPresent(ops, dn2, 0, DELETE));
 
     clock.fastForward(1000);
     pendingOps.removeExpiredEntries();
 
-    // Now should only have entries for container 2
+    // Now should only have entries for container 2 and the deletes for 
container 1
     ops = pendingOps.getPendingOps(new ContainerID(1));
-    assertEquals(0, ops.size());
+    assertEquals(2, ops.size());
+
+    assertTrue(isOpPresent(ops, dn1, 0, DELETE));
+    assertTrue(isOpPresent(ops, dn2, 0, DELETE));
+
     ops = pendingOps.getPendingOps(new ContainerID(2));
-    assertEquals(1, ops.size());
+    assertEquals(2, ops.size());
 
-    // Advance the clock again and all should be removed
+    // Advance the clock again and all should be removed except deletes
     clock.fastForward(1000);
     pendingOps.removeExpiredEntries();
     ops = pendingOps.getPendingOps(new ContainerID(2));
-    assertEquals(0, ops.size());
+    assertTrue(isOpPresent(ops, dn1, 1, DELETE));
+    assertEquals(1, ops.size());
+  }
+
+  private boolean isOpPresent(List<ContainerReplicaOp> ops, DatanodeDetails dn,
+      int index, ContainerReplicaOp.PendingOpType type) {
+    return ops.stream().anyMatch(op -> op.getTarget().equals(dn) &&
+        op.getReplicaIndex() == index && op.getOpType() == type);
   }
 
   @Test
   public void testReplicationMetrics() {
     long expiry = clock.millis() + 1000;
-    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, expiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, expiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, deleteCmd, 
expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, addCmd, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, deleteCmd, 
expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, addCmd, expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, addCmd, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, deleteCmd, 
expiry);
 
     // InFlight Replication and Deletion
     assertEquals(3, pendingOps.getPendingOpCount(ADD));
@@ -296,17 +327,17 @@ public void testReplicationMetrics() {
     assertEquals(metrics.getReplicaDeleteTimeoutTotal(), 1);
 
     expiry = clock.millis() + 1000;
-    pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, expiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, expiry);
-    pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, expiry);
-    pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, deleteCmd, 
expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, addCmd, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, deleteCmd, 
expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, addCmd, expiry);
+    pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, addCmd, expiry);
+    pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, deleteCmd, 
expiry);
 
     // InFlight Replication and Deletion. Previous Inflight should be
-    // removed as they were timed out.
+    // removed as they were timed out, but deletes are retained
     assertEquals(3, pendingOps.getPendingOpCount(ADD));
-    assertEquals(3, pendingOps.getPendingOpCount(DELETE));
+    assertEquals(6, pendingOps.getPendingOpCount(DELETE));
 
     pendingOps.completeDeleteReplica(new ContainerID(3), dn1, 2);
     pendingOps.completeAddReplica(new ContainerID(3), dn1, 3);
@@ -325,7 +356,7 @@ public void testReplicationMetrics() {
 
     // Checking pendingOpCount doesn't go below zero
     assertEquals(0, pendingOps.getPendingOpCount(ADD));
-    assertEquals(0, pendingOps.getPendingOpCount(DELETE));
+    assertEquals(3, pendingOps.getPendingOpCount(DELETE));
   }
 
   /**
@@ -344,9 +375,9 @@ public void testNotifySubscribers() {
 
     // schedule an ADD and a DELETE
     ContainerID containerID = new ContainerID(1);
-    pendingOps.scheduleAddReplica(containerID, dn1, 0, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline);
     ContainerReplicaOp addOp = pendingOps.getPendingOps(containerID).get(0);
-    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
+    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
 
     // complete the ADD and verify that subscribers were notified
     pendingOps.completeAddReplica(containerID, dn1, 0);
@@ -360,8 +391,8 @@ public void testNotifySubscribers() {
     verify(subscriber2, times(1)).opCompleted(deleteOp, containerID, false);
 
     // now, test notification on expiration
-    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline);
+    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
     for (ContainerReplicaOp op : pendingOps.getPendingOps(containerID)) {
       if (op.getOpType() == ADD) {
         addOp = op;
@@ -383,8 +414,8 @@ public void 
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
     ContainerID containerID = new ContainerID(1);
 
     // schedule ops
-    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
-    pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline);
+    pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
 
     // register subscriber
     ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -397,4 +428,25 @@ public void 
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
     // subscriber
     verifyNoMoreInteractions(subscriber1);
   }
+
+  @Test
+  public void subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
+    ContainerID containerID = new ContainerID(1);
+
+    // schedule ops
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+
+    // register subscriber
+    ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
+        ContainerReplicaPendingOpsSubscriber.class);
+    pendingOps.registerSubscriber(subscriber1);
+
+    clock.fastForward(1000);
+    pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1);
+    // no entries have expired, so there should be zero interactions with the
+    // subscriber
+    verifyNoMoreInteractions(subscriber1);
+  }
+
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
index ff0b838bd8..da7a41de28 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
@@ -92,7 +92,7 @@ public void testContainerMissingReplica() {
     // appears missing
     ContainerReplicaOp op = new ContainerReplicaOp(
         ContainerReplicaOp.PendingOpType.ADD,
-        MockDatanodeDetails.randomDatanodeDetails(), 5, Long.MAX_VALUE);
+        MockDatanodeDetails.randomDatanodeDetails(), 5, null, Long.MAX_VALUE);
     rcnt.addPendingOp(op);
     assertTrue(rcnt.isSufficientlyReplicated(true));
     assertEquals(0, rcnt.unavailableIndexes(true).size());
@@ -213,7 +213,7 @@ public void testOverReplicatedContainer() {
     // as not over replicated.
     rcnt.addPendingOp(new ContainerReplicaOp(
         ContainerReplicaOp.PendingOpType.DELETE,
-        MockDatanodeDetails.randomDatanodeDetails(), 2, Long.MAX_VALUE));
+        MockDatanodeDetails.randomDatanodeDetails(), 2, null, Long.MAX_VALUE));
     assertFalse(rcnt.isOverReplicated(true));
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index d335394451..34da3a077f 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -92,6 +92,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -224,9 +225,9 @@ private void enableProcessAll() {
   @Test
   public void testPendingOpsClearedWhenStarting() {
     containerReplicaPendingOps.scheduleAddReplica(ContainerID.valueOf(1),
-        MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE);
+        MockDatanodeDetails.randomDatanodeDetails(), 1, null, 
Integer.MAX_VALUE);
     containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2),
-        MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE);
+        MockDatanodeDetails.randomDatanodeDetails(), 1, null, 
Integer.MAX_VALUE);
     assertEquals(1, containerReplicaPendingOps
         .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
     assertEquals(1, containerReplicaPendingOps
@@ -733,7 +734,7 @@ public void testUnderReplicatedContainerFixedByPending()
         HddsProtos.LifeCycleState.CLOSED);
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
     containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
-        MockDatanodeDetails.randomDatanodeDetails(), 5,
+        MockDatanodeDetails.randomDatanodeDetails(), 5, null,
         clock.millis() + 10000);
 
     replicationManager.processContainer(
@@ -1024,7 +1025,7 @@ public void testOverReplicatedFixByPending()
     addReplicas(container, ContainerReplicaProto.State.CLOSED,
         1, 2, 3, 4, 5, 5);
     containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
-        MockDatanodeDetails.randomDatanodeDetails(), 5,
+        MockDatanodeDetails.randomDatanodeDetails(), 5, null,
         clock.millis() + 10000);
     replicationManager.processContainer(
         container, repQueue, repReport);
@@ -1739,6 +1740,35 @@ public void testInflightReplicationLimit() throws 
IOException {
         rm.getReplicationInFlightLimit());
   }
 
+  @Test
+  public void testPendingOpExpiry() throws ContainerNotFoundException {
+    when(containerManager.getContainer(any()))
+        .thenReturn(ReplicationTestUtil.createContainerInfo(repConfig, 1,
+            HddsProtos.LifeCycleState.CLOSED, 10, 20));
+    // This is just some arbitrary epoch time in the past
+    long commandDeadline = 1000;
+    SCMCommand<?> command = new DeleteContainerCommand(1L, true);
+
+    DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails();
+
+    ContainerReplicaOp addOp = 
ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, dn1, 1);
+    ContainerReplicaOp delOp = new ContainerReplicaOp(
+        ContainerReplicaOp.PendingOpType.DELETE, dn2, 1, command, 
commandDeadline);
+
+    replicationManager.opCompleted(addOp, new ContainerID(1L), false);
+    replicationManager.opCompleted(delOp, new ContainerID(1L), false);
+    // No commands should be sent for either of the above ops.
+    assertEquals(0, commandsSent.size());
+
+    replicationManager.opCompleted(delOp, new ContainerID(1L), true);
+    assertEquals(1, commandsSent.size());
+    Pair<UUID, SCMCommand<?>> sentCommand = commandsSent.iterator().next();
+    // The target should be DN2 and the deadline should have been updated from 
the value set in commandDeadline above
+    assertEquals(dn2.getUuid(), sentCommand.getLeft());
+    assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
+  }
+
   @SafeVarargs
   private final Set<ContainerReplica>  addReplicas(ContainerInfo container,
       ContainerReplicaProto.State replicaState,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
index d540118599..3267569684 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
@@ -250,10 +250,10 @@ private void loadPendingOps(ContainerInfo container, 
Scenario scenario) {
     for (PendingReplica r : scenario.getPendingReplicas()) {
       if (r.getType() == ContainerReplicaOp.PendingOpType.ADD) {
         containerReplicaPendingOps.scheduleAddReplica(container.containerID(), 
r.getDatanodeDetails(),
-            r.getReplicaIndex(), Long.MAX_VALUE);
+            r.getReplicaIndex(), null, Long.MAX_VALUE);
       } else if (r.getType() == ContainerReplicaOp.PendingOpType.DELETE) {
         
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(), 
r.getDatanodeDetails(),
-            r.getReplicaIndex(), Long.MAX_VALUE);
+            r.getReplicaIndex(), null, Long.MAX_VALUE);
       }
     }
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index df5f3ec0d2..732c75ce68 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -404,6 +404,8 @@ public void testContainerStatisticsAfterDelete() throws 
Exception {
       });
     });
 
+    LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG);
+    logCapturer.clearOutput();
     cluster.shutdownHddsDatanode(0);
     replicationManager.processAll();
     ((EventQueue)scm.getEventQueue()).processAll(1000);
@@ -411,8 +413,6 @@ public void testContainerStatisticsAfterDelete() throws 
Exception {
     containerInfos.stream().forEach(container ->
         assertEquals(HddsProtos.LifeCycleState.DELETING,
             container.getState()));
-    LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG);
-    logCapturer.clearOutput();
 
     Thread.sleep(5000);
     replicationManager.processAll();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to