This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 04f6255295 HDDS-12127. RM should not expire pending deletes, but retry
until delete is confirmed or node is dead (#7746)
04f6255295 is described below
commit 04f625529514fa7cdf47daa4420b4032b6be8983
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Tue Jan 28 10:21:36 2025 +0000
HDDS-12127. RM should not expire pending deletes, but retry until delete is
confirmed or node is dead (#7746)
---
.../container/replication/ContainerReplicaOp.java | 12 +-
.../replication/ContainerReplicaPendingOps.java | 36 +++--
.../replication/ECUnderReplicationHandler.java | 2 +-
.../container/replication/ReplicationManager.java | 33 +++-
.../hdds/scm/server/StorageContainerManager.java | 3 +
.../scm/container/balancer/TestMoveManager.java | 28 ++--
.../TestContainerReplicaPendingOps.java | 176 +++++++++++++--------
.../replication/TestECContainerReplicaCount.java | 4 +-
.../replication/TestReplicationManager.java | 38 ++++-
.../TestReplicationManagerScenarios.java | 4 +-
.../commandhandler/TestBlockDeletion.java | 4 +-
11 files changed, 230 insertions(+), 110 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
index 34cc01eb89..99fcda97b2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaOp.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
/**
* Class to wrap details used to track pending replications.
@@ -34,19 +35,20 @@ public enum PendingOpType {
private final PendingOpType opType;
private final DatanodeDetails target;
private final int replicaIndex;
+ private final SCMCommand<?> command;
private final long deadlineEpochMillis;
public static ContainerReplicaOp create(PendingOpType opType,
DatanodeDetails target, int replicaIndex) {
- return new ContainerReplicaOp(opType, target, replicaIndex,
- System.currentTimeMillis());
+ return new ContainerReplicaOp(opType, target, replicaIndex, null,
System.currentTimeMillis());
}
public ContainerReplicaOp(PendingOpType opType,
- DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
this.opType = opType;
this.target = target;
this.replicaIndex = replicaIndex;
+ this.command = command;
this.deadlineEpochMillis = deadlineEpochMillis;
}
@@ -62,6 +64,10 @@ public int getReplicaIndex() {
return replicaIndex;
}
+ public SCMCommand<?> getCommand() {
+ return command;
+ }
+
public long getDeadlineEpochMillis() {
return deadlineEpochMillis;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
index 4eef0a8a74..ad2f8b6bf4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import java.time.Clock;
import java.util.ArrayList;
@@ -117,13 +118,14 @@ public List<ContainerReplicaOp> getPendingOps(ContainerID
containerID) {
* @param containerID ContainerID for which to add a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @param command The command to send to the datanode
* @param deadlineEpochMillis The time by which the replica should have been
* added and reported by the datanode, or it will
* be discarded.
*/
public void scheduleAddReplica(ContainerID containerID,
- DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
- addReplica(ADD, containerID, target, replicaIndex, deadlineEpochMillis);
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
+ addReplica(ADD, containerID, target, replicaIndex, command,
deadlineEpochMillis);
}
/**
@@ -131,13 +133,14 @@ public void scheduleAddReplica(ContainerID containerID,
* @param containerID ContainerID for which to delete a replica
* @param target The target datanode
* @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @param command The command to send to the datanode
* @param deadlineEpochMillis The time by which the replica should have been
* deleted and reported by the datanode, or it
will
* be discarded.
*/
public void scheduleDeleteReplica(ContainerID containerID,
- DatanodeDetails target, int replicaIndex, long deadlineEpochMillis) {
- addReplica(DELETE, containerID, target, replicaIndex, deadlineEpochMillis);
+ DatanodeDetails target, int replicaIndex, SCMCommand<?> command, long
deadlineEpochMillis) {
+ addReplica(DELETE, containerID, target, replicaIndex, command,
deadlineEpochMillis);
}
/**
@@ -150,7 +153,7 @@ public void scheduleDeleteReplica(ContainerID containerID,
*/
public boolean completeAddReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
- boolean completed = completeOp(ADD, containerID, target, replicaIndex);
+ boolean completed = completeOp(ADD, containerID, target, replicaIndex,
true);
if (isMetricsNotNull() && completed) {
if (isEC(replicaIndex)) {
replicationMetrics.incrEcReplicasCreatedTotal();
@@ -172,7 +175,7 @@ public boolean completeAddReplica(ContainerID containerID,
*/
public boolean completeDeleteReplica(ContainerID containerID,
DatanodeDetails target, int replicaIndex) {
- boolean completed = completeOp(DELETE, containerID, target, replicaIndex);
+ boolean completed = completeOp(DELETE, containerID, target, replicaIndex,
true);
if (isMetricsNotNull() && completed) {
if (isEC(replicaIndex)) {
replicationMetrics.incrEcReplicasDeletedTotal();
@@ -192,7 +195,7 @@ public boolean completeDeleteReplica(ContainerID
containerID,
public boolean removeOp(ContainerID containerID,
ContainerReplicaOp op) {
return completeOp(op.getOpType(), containerID, op.getTarget(),
- op.getReplicaIndex());
+ op.getReplicaIndex(), true);
}
/**
@@ -221,9 +224,13 @@ public void removeExpiredEntries() {
while (iterator.hasNext()) {
ContainerReplicaOp op = iterator.next();
if (clock.millis() > op.getDeadlineEpochMillis()) {
- iterator.remove();
+ if (op.getOpType() != DELETE) {
+ // For delete ops, we don't remove them from the list as RM must
resend them, or they
+ // will be removed via a container report when they are
confirmed as deleted.
+ iterator.remove();
+ decrementCounter(op.getOpType(), op.getReplicaIndex());
+ }
expiredOps.add(op);
- decrementCounter(op.getOpType(), op.getReplicaIndex());
updateTimeoutMetrics(op);
}
}
@@ -258,15 +265,18 @@ private void updateTimeoutMetrics(ContainerReplicaOp op) {
}
private void addReplica(ContainerReplicaOp.PendingOpType opType,
- ContainerID containerID, DatanodeDetails target, int replicaIndex,
+ ContainerID containerID, DatanodeDetails target, int replicaIndex,
SCMCommand<?> command,
long deadlineEpochMillis) {
Lock lock = writeLock(containerID);
lock(lock);
try {
+ // Remove any existing duplicate op for the same target and replicaIndex
before adding
+ // the new one. Especially for delete ops, they could be getting resent
after expiry.
+ completeOp(opType, containerID, target, replicaIndex, false);
List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
containerID, s -> new ArrayList<>());
ops.add(new ContainerReplicaOp(opType,
- target, replicaIndex, deadlineEpochMillis));
+ target, replicaIndex, command, deadlineEpochMillis));
incrementCounter(opType, replicaIndex);
} finally {
unlock(lock);
@@ -274,7 +284,7 @@ private void addReplica(ContainerReplicaOp.PendingOpType
opType,
}
private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
- ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+ ContainerID containerID, DatanodeDetails target, int replicaIndex,
boolean notifySubsribers) {
boolean found = false;
// List of completed ops that subscribers will be notified about
List<ContainerReplicaOp> completedOps = new ArrayList<>();
@@ -303,7 +313,7 @@ private boolean completeOp(ContainerReplicaOp.PendingOpType
opType,
unlock(lock);
}
- if (found) {
+ if (found && notifySubsribers) {
notifySubscribers(completedOps, containerID, false);
}
return found;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 2f77891046..09a757f577 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -621,7 +621,7 @@ private void createReplicateCommand(
private void adjustPendingOps(ECContainerReplicaCount replicaCount,
DatanodeDetails target, int replicaIndex) {
replicaCount.addPendingOp(new ContainerReplicaOp(
- ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex,
+ ContainerReplicaOp.PendingOpType.ADD, target, replicaIndex, null,
Long.MAX_VALUE));
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 1675da0fa8..07927ba689 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -103,7 +103,7 @@
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
-public class ReplicationManager implements SCMService {
+public class ReplicationManager implements SCMService,
ContainerReplicaPendingOpsSubscriber {
public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
@@ -673,8 +673,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
if (cmd.getType() == Type.deleteContainerCommand) {
DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
containerReplicaPendingOps.scheduleDeleteReplica(
- containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
- scmDeadlineEpochMs);
+ containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex(),
cmd, scmDeadlineEpochMs);
if (rcc.getReplicaIndex() > 0) {
getMetrics().incrEcDeletionCmdsSentTotal();
} else if (rcc.getReplicaIndex() == 0) {
@@ -687,8 +686,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
final ByteString targetIndexes = rcc.getMissingContainerIndexes();
for (int i = 0; i < targetIndexes.size(); i++) {
containerReplicaPendingOps.scheduleAddReplica(
- containerInfo.containerID(), targets.get(i),
targetIndexes.byteAt(i),
- scmDeadlineEpochMs);
+ containerInfo.containerID(), targets.get(i),
targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs);
}
getMetrics().incrEcReconstructionCmdsSentTotal();
} else if (cmd.getType() == Type.replicateContainerCommand) {
@@ -702,7 +700,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
*/
containerReplicaPendingOps.scheduleAddReplica(
containerInfo.containerID(),
- targetDatanode, rcc.getReplicaIndex(), scmDeadlineEpochMs);
+ targetDatanode, rcc.getReplicaIndex(), cmd, scmDeadlineEpochMs);
} else {
/*
This means the source will push replica to the target, so the op's
@@ -710,7 +708,7 @@ private void adjustPendingOpsAndMetrics(ContainerInfo
containerInfo,
*/
containerReplicaPendingOps.scheduleAddReplica(
containerInfo.containerID(),
- rcc.getTargetDatanode(), rcc.getReplicaIndex(),
scmDeadlineEpochMs);
+ rcc.getTargetDatanode(), rcc.getReplicaIndex(), cmd,
scmDeadlineEpochMs);
}
if (rcc.getReplicaIndex() > 0) {
@@ -1043,6 +1041,27 @@ ReplicationQueue getQueue() {
return replicationQueue.get();
}
+ @Override
+ public void opCompleted(ContainerReplicaOp op, ContainerID containerID,
boolean timedOut) {
+ if (!(timedOut && op.getOpType() ==
ContainerReplicaOp.PendingOpType.DELETE)) {
+ // We only care about expired delete ops. All others should be ignored.
+ return;
+ }
+ try {
+ ContainerInfo containerInfo = containerManager.getContainer(containerID);
+ // Sending the command in this way is un-throttled, and the command will
have its deadline
+ // adjusted to a new deadline as part of the sending process.
+ sendDatanodeCommand(op.getCommand(), containerInfo, op.getTarget());
+ } catch (ContainerNotFoundException e) {
+ // Should not happen, as even deleted containers are currently retained
in the SCM container map
+ LOG.error("Container {} not found when processing expired delete",
containerID, e);
+ } catch (NotLeaderException e) {
+ // If SCM leadership has changed, this is fine to ignore. All pending
ops will be expired
+ // once SCM leadership switches.
+ LOG.warn("SCM is not leader when processing expired delete", e);
+ }
+ }
+
/**
* Configuration used by the Replication Manager.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1a986f2f67..b6c41c7dcf 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -836,6 +836,9 @@ private void initializeSystemManagers(OzoneConfiguration
conf,
reconfigurationHandler.register(replicationManager.getConfig());
}
serviceManager.register(replicationManager);
+ // RM gets notified of expired pending delete from
containerReplicaPendingOps by subscribing to it
+ // so it can resend them.
+ containerReplicaPendingOps.registerSubscriber(replicationManager);
if (configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
} else {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
index 0c5667d407..8cc62ab03c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/balancer/TestMoveManager.java
@@ -199,13 +199,13 @@ public void testMovePendingOpsExist() throws Exception {
nodes.put(src, NodeStatus.inServiceHealthy());
nodes.put(tgt, NodeStatus.inServiceHealthy());
- pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, clock.millis()));
+ pendingOps.add(new ContainerReplicaOp(ADD, tgt, 0, null, clock.millis()));
assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_REPLICATION,
containerInfo.containerID());
pendingOps.clear();
- pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, clock.millis()));
+ pendingOps.add(new ContainerReplicaOp(DELETE, src, 0, null,
clock.millis()));
assertMoveFailsWith(REPLICATION_FAIL_INFLIGHT_DELETION,
containerInfo.containerID());
}
@@ -325,7 +325,7 @@ public void testDeleteCommandFails() throws Exception {
.when(containerManager).getContainer(any(ContainerID.class));
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult moveResult = res.get();
@@ -337,14 +337,14 @@ public void testSuccessfulMove() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
op = new ContainerReplicaOp(
- DELETE, src, 0, clock.millis() + 1000);
+ DELETE, src, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
@@ -374,7 +374,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws
Exception {
anyLong());
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, srcReplica.getReplicaIndex(), clock.millis() + 1000);
+ ADD, tgt, srcReplica.getReplicaIndex(), null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
verify(replicationManager).sendDeleteCommand(
@@ -382,7 +382,7 @@ public void testSuccessfulMoveNonZeroRepIndex() throws
Exception {
eq(true), anyLong());
op = new ContainerReplicaOp(
- DELETE, src, srcReplica.getReplicaIndex(), clock.millis() + 1000);
+ DELETE, src, srcReplica.getReplicaIndex(), null, clock.millis() +
1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
@@ -394,7 +394,7 @@ public void testMoveTimeoutOnAdd() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), true);
MoveManager.MoveResult finalResult = res.get();
@@ -406,14 +406,14 @@ public void testMoveTimeoutOnDelete() throws Exception {
CompletableFuture<MoveManager.MoveResult> res = setupSuccessfulMove();
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
verify(replicationManager).sendDeleteCommand(
eq(containerInfo), eq(0), eq(src), eq(true), anyLong());
op = new ContainerReplicaOp(
- DELETE, src, 0, clock.millis() + 1000);
+ DELETE, src, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), true);
MoveManager.MoveResult finalResult = res.get();
@@ -434,7 +434,7 @@ public void testMoveCompleteSrcNoLongerPresent() throws
Exception {
}
}
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
@@ -450,7 +450,7 @@ public void testMoveCompleteSrcNotHealthy() throws
Exception {
nodes.put(src, NodeStatus.inServiceStale());
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
@@ -468,7 +468,7 @@ public void testMoveCompleteSrcNotInService() throws
Exception {
HddsProtos.NodeOperationalState.DECOMMISSIONING,
HddsProtos.NodeState.HEALTHY));
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
@@ -487,7 +487,7 @@ public void testMoveCompleteFutureReplicasUnhealthy()
throws Exception {
.MisReplicatedHealthResult(containerInfo, false, null));
ContainerReplicaOp op = new ContainerReplicaOp(
- ADD, tgt, 0, clock.millis() + 1000);
+ ADD, tgt, 0, null, clock.millis() + 1000);
moveManager.opCompleted(op, containerInfo.containerID(), false);
MoveManager.MoveResult finalResult = res.get();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
index 3775531d30..0326a95f6c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestContainerReplicaPendingOps.java
@@ -23,6 +23,9 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -56,6 +59,8 @@ public class TestContainerReplicaPendingOps {
private DatanodeDetails dn3;
private ReplicationManagerMetrics metrics;
private long deadline;
+ private SCMCommand<?> addCmd;
+ private SCMCommand<?> deleteCmd;
@BeforeEach
public void setup() {
@@ -73,6 +78,9 @@ public void setup() {
dn1 = MockDatanodeDetails.randomDatanodeDetails();
dn2 = MockDatanodeDetails.randomDatanodeDetails();
dn3 = MockDatanodeDetails.randomDatanodeDetails();
+
+ addCmd = ReplicateContainerCommand.toTarget(1, dn3);
+ deleteCmd = new DeleteContainerCommand(1, false);
}
@AfterEach
@@ -91,8 +99,8 @@ public void testGetPendingOpsReturnsEmptyList() {
@Test
public void testClear() {
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 0, deleteCmd,
deadline);
assertEquals(1,
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
assertEquals(1,
pendingOps.getPendingOpCount(ContainerReplicaOp.PendingOpType.DELETE));
@@ -108,17 +116,30 @@ public void testClear() {
@Test
public void testCanAddReplicasForAdd() {
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd,
deadline);
+ // Duplicate for DN2
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 0, addCmd, deadline
+ 1);
+ // Not a duplicate for DN2 as different index. Should not happen in
practice as it is not valid to have 2 indexes
+ // on the same node.
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn2, 1, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd, deadline
+ 1);
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(new ContainerID(1));
- assertEquals(3, ops.size());
+ assertEquals(4, ops.size());
for (ContainerReplicaOp op : ops) {
- assertEquals(0, op.getReplicaIndex());
+ if (!op.getTarget().equals(dn2)) {
+ assertEquals(0, op.getReplicaIndex());
+ }
assertEquals(ADD, op.getOpType());
+ if (op.getTarget().equals(dn2) && op.getReplicaIndex() == 0) {
+ assertEquals(deadline + 1, op.getDeadlineEpochMillis());
+ } else {
+ assertEquals(deadline, op.getDeadlineEpochMillis());
+ }
}
List<DatanodeDetails> allDns = ops.stream()
.map(ContainerReplicaOp::getTarget).collect(Collectors.toList());
@@ -131,14 +152,15 @@ public void testCanAddReplicasForAdd() {
assertEquals(1, ops.get(0).getReplicaIndex());
assertEquals(ADD, ops.get(0).getOpType());
assertEquals(dn1, ops.get(0).getTarget());
+ assertEquals(deadline + 1, ops.get(0).getDeadlineEpochMillis());
}
@Test
public void testCanAddReplicasForDelete() {
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn3, 0, deleteCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd,
deadline);
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(new ContainerID(1));
@@ -162,11 +184,11 @@ public void testCanAddReplicasForDelete() {
@Test
public void testCompletingOps() {
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd,
deadline);
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(new ContainerID(1));
@@ -195,11 +217,11 @@ public void testCompletingOps() {
@Test
public void testRemoveSpecificOp() {
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deadline);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, deadline);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd,
deadline);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd,
deadline);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd,
deadline);
ContainerID cid = new ContainerID(1);
List<ContainerReplicaOp> ops = pendingOps.getPendingOps(cid);
@@ -218,17 +240,18 @@ public void testRemoveExpiredEntries() {
long expiry = clock.millis() + 1000;
long laterExpiry = clock.millis() + 2000;
long latestExpiry = clock.millis() + 3000;
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, expiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, laterExpiry);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, laterExpiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, latestExpiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 0, deleteCmd,
expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 0, addCmd, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn2, 0, deleteCmd,
laterExpiry);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn3, 0, addCmd,
laterExpiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn1, 1, deleteCmd,
latestExpiry);
+ pendingOps.scheduleAddReplica(new ContainerID(2), dn1, 1, addCmd,
latestExpiry);
List<ContainerReplicaOp> ops =
pendingOps.getPendingOps(new ContainerID(1));
assertEquals(4, ops.size());
ops = pendingOps.getPendingOps(new ContainerID(2));
- assertEquals(1, ops.size());
+ assertEquals(2, ops.size());
// Some entries expire at "start + 1000" some at start + 2000 and
// start + 3000. Clock is currently at "start"
@@ -240,42 +263,50 @@ public void testRemoveExpiredEntries() {
clock.fastForward(1000);
pendingOps.removeExpiredEntries();
- // Those with deadline + 1000 should be removed.
+ // Those ADD with deadline + 1000 should be removed, but deletes are
retained
ops = pendingOps.getPendingOps(new ContainerID(1));
- assertEquals(2, ops.size());
+ assertEquals(3, ops.size());
// We should lose the entries for DN1
- List<DatanodeDetails> dns = ops.stream()
- .map(ContainerReplicaOp::getTarget)
- .collect(Collectors.toList());
- assertThat(dns).doesNotContain(dn1);
- assertThat(dns).contains(dn2);
- assertThat(dns).contains(dn3);
+ assertFalse(isOpPresent(ops, dn1, 0, ADD));
+ assertTrue(isOpPresent(ops, dn1, 0, DELETE));
+ assertTrue(isOpPresent(ops, dn2, 0, DELETE));
clock.fastForward(1000);
pendingOps.removeExpiredEntries();
- // Now should only have entries for container 2
+ // Now should only have entries for container 2 and the deletes for
container 1
ops = pendingOps.getPendingOps(new ContainerID(1));
- assertEquals(0, ops.size());
+ assertEquals(2, ops.size());
+
+ assertTrue(isOpPresent(ops, dn1, 0, DELETE));
+ assertTrue(isOpPresent(ops, dn2, 0, DELETE));
+
ops = pendingOps.getPendingOps(new ContainerID(2));
- assertEquals(1, ops.size());
+ assertEquals(2, ops.size());
- // Advance the clock again and all should be removed
+ // Advance the clock again and all should be removed except deletes
clock.fastForward(1000);
pendingOps.removeExpiredEntries();
ops = pendingOps.getPendingOps(new ContainerID(2));
- assertEquals(0, ops.size());
+ assertTrue(isOpPresent(ops, dn1, 1, DELETE));
+ assertEquals(1, ops.size());
+ }
+
+ private boolean isOpPresent(List<ContainerReplicaOp> ops, DatanodeDetails dn,
+ int index, ContainerReplicaOp.PendingOpType type) {
+ return ops.stream().anyMatch(op -> op.getTarget().equals(dn) &&
+ op.getReplicaIndex() == index && op.getOpType() == type);
}
@Test
public void testReplicationMetrics() {
long expiry = clock.millis() + 1000;
- pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, expiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, expiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(1), dn1, 1, deleteCmd,
expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(1), dn1, 2, addCmd, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(2), dn2, 1, deleteCmd,
expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(2), dn3, 1, addCmd, expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(3), dn3, 0, addCmd, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(4), dn3, 0, deleteCmd,
expiry);
// InFlight Replication and Deletion
assertEquals(3, pendingOps.getPendingOpCount(ADD));
@@ -296,17 +327,17 @@ public void testReplicationMetrics() {
assertEquals(metrics.getReplicaDeleteTimeoutTotal(), 1);
expiry = clock.millis() + 1000;
- pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, expiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, expiry);
- pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, expiry);
- pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(3), dn1, 2, deleteCmd,
expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(3), dn1, 3, addCmd, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(4), dn2, 2, deleteCmd,
expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(4), dn3, 4, addCmd, expiry);
+ pendingOps.scheduleAddReplica(new ContainerID(5), dn3, 0, addCmd, expiry);
+ pendingOps.scheduleDeleteReplica(new ContainerID(6), dn3, 0, deleteCmd,
expiry);
// InFlight Replication and Deletion. Previous Inflight should be
- // removed as they were timed out.
+ // removed as they were timed out, but deletes are retained
assertEquals(3, pendingOps.getPendingOpCount(ADD));
- assertEquals(3, pendingOps.getPendingOpCount(DELETE));
+ assertEquals(6, pendingOps.getPendingOpCount(DELETE));
pendingOps.completeDeleteReplica(new ContainerID(3), dn1, 2);
pendingOps.completeAddReplica(new ContainerID(3), dn1, 3);
@@ -325,7 +356,7 @@ public void testReplicationMetrics() {
// Checking pendingOpCount doesn't go below zero
assertEquals(0, pendingOps.getPendingOpCount(ADD));
- assertEquals(0, pendingOps.getPendingOpCount(DELETE));
+ assertEquals(3, pendingOps.getPendingOpCount(DELETE));
}
/**
@@ -344,9 +375,9 @@ public void testNotifySubscribers() {
// schedule an ADD and a DELETE
ContainerID containerID = new ContainerID(1);
- pendingOps.scheduleAddReplica(containerID, dn1, 0, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn1, 0, addCmd, deadline);
ContainerReplicaOp addOp = pendingOps.getPendingOps(containerID).get(0);
- pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
+ pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
// complete the ADD and verify that subscribers were notified
pendingOps.completeAddReplica(containerID, dn1, 0);
@@ -360,8 +391,8 @@ public void testNotifySubscribers() {
verify(subscriber2, times(1)).opCompleted(deleteOp, containerID, false);
// now, test notification on expiration
- pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
- pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline);
+ pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
for (ContainerReplicaOp op : pendingOps.getPendingOps(containerID)) {
if (op.getOpType() == ADD) {
addOp = op;
@@ -383,8 +414,8 @@ public void
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
ContainerID containerID = new ContainerID(1);
// schedule ops
- pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deadline);
- pendingOps.scheduleAddReplica(containerID, dn2, 0, deadline);
+ pendingOps.scheduleDeleteReplica(containerID, dn1, 0, deleteCmd, deadline);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
// register subscriber
ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
@@ -397,4 +428,25 @@ public void
subscribersShouldNotBeNotifiedWhenOpsHaveNotExpired() {
// subscriber
verifyNoMoreInteractions(subscriber1);
}
+
+ @Test
+ public void subscribersShouldNotBeNotifiedWhenReplacingAnOpWithDuplicate() {
+ ContainerID containerID = new ContainerID(1);
+
+ // schedule ops
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline);
+
+ // register subscriber
+ ContainerReplicaPendingOpsSubscriber subscriber1 = mock(
+ ContainerReplicaPendingOpsSubscriber.class);
+ pendingOps.registerSubscriber(subscriber1);
+
+ clock.fastForward(1000);
+ pendingOps.scheduleAddReplica(containerID, dn2, 0, addCmd, deadline + 1);
+ // no entries have expired, so there should be zero interactions with the
+ // subscriber
+ verifyNoMoreInteractions(subscriber1);
+ }
+
+
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
index ff0b838bd8..da7a41de28 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECContainerReplicaCount.java
@@ -92,7 +92,7 @@ public void testContainerMissingReplica() {
// appears missing
ContainerReplicaOp op = new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.ADD,
- MockDatanodeDetails.randomDatanodeDetails(), 5, Long.MAX_VALUE);
+ MockDatanodeDetails.randomDatanodeDetails(), 5, null, Long.MAX_VALUE);
rcnt.addPendingOp(op);
assertTrue(rcnt.isSufficientlyReplicated(true));
assertEquals(0, rcnt.unavailableIndexes(true).size());
@@ -213,7 +213,7 @@ public void testOverReplicatedContainer() {
// as not over replicated.
rcnt.addPendingOp(new ContainerReplicaOp(
ContainerReplicaOp.PendingOpType.DELETE,
- MockDatanodeDetails.randomDatanodeDetails(), 2, Long.MAX_VALUE));
+ MockDatanodeDetails.randomDatanodeDetails(), 2, null, Long.MAX_VALUE));
assertFalse(rcnt.isOverReplicated(true));
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index d335394451..34da3a077f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -92,6 +92,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -224,9 +225,9 @@ private void enableProcessAll() {
@Test
public void testPendingOpsClearedWhenStarting() {
containerReplicaPendingOps.scheduleAddReplica(ContainerID.valueOf(1),
- MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE);
+ MockDatanodeDetails.randomDatanodeDetails(), 1, null,
Integer.MAX_VALUE);
containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2),
- MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE);
+ MockDatanodeDetails.randomDatanodeDetails(), 1, null,
Integer.MAX_VALUE);
assertEquals(1, containerReplicaPendingOps
.getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD));
assertEquals(1, containerReplicaPendingOps
@@ -733,7 +734,7 @@ public void testUnderReplicatedContainerFixedByPending()
HddsProtos.LifeCycleState.CLOSED);
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
- MockDatanodeDetails.randomDatanodeDetails(), 5,
+ MockDatanodeDetails.randomDatanodeDetails(), 5, null,
clock.millis() + 10000);
replicationManager.processContainer(
@@ -1024,7 +1025,7 @@ public void testOverReplicatedFixByPending()
addReplicas(container, ContainerReplicaProto.State.CLOSED,
1, 2, 3, 4, 5, 5);
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
- MockDatanodeDetails.randomDatanodeDetails(), 5,
+ MockDatanodeDetails.randomDatanodeDetails(), 5, null,
clock.millis() + 10000);
replicationManager.processContainer(
container, repQueue, repReport);
@@ -1739,6 +1740,35 @@ public void testInflightReplicationLimit() throws
IOException {
rm.getReplicationInFlightLimit());
}
+ @Test
+ public void testPendingOpExpiry() throws ContainerNotFoundException {
+ when(containerManager.getContainer(any()))
+ .thenReturn(ReplicationTestUtil.createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED, 10, 20));
+ // This is just some arbitrary epoch time in the past
+ long commandDeadline = 1000;
+ SCMCommand<?> command = new DeleteContainerCommand(1L, true);
+
+ DatanodeDetails dn1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails dn2 = MockDatanodeDetails.randomDatanodeDetails();
+
+ ContainerReplicaOp addOp =
ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, dn1, 1);
+ ContainerReplicaOp delOp = new ContainerReplicaOp(
+ ContainerReplicaOp.PendingOpType.DELETE, dn2, 1, command,
commandDeadline);
+
+ replicationManager.opCompleted(addOp, new ContainerID(1L), false);
+ replicationManager.opCompleted(delOp, new ContainerID(1L), false);
+ // No commands should be sent for either of the above ops.
+ assertEquals(0, commandsSent.size());
+
+ replicationManager.opCompleted(delOp, new ContainerID(1L), true);
+ assertEquals(1, commandsSent.size());
+ Pair<UUID, SCMCommand<?>> sentCommand = commandsSent.iterator().next();
+ // The target should be DN2 and the deadline should have been updated from
the value set in commandDeadline above
+ assertEquals(dn2.getUuid(), sentCommand.getLeft());
+ assertNotEquals(commandDeadline, sentCommand.getRight().getDeadline());
+ }
+
@SafeVarargs
private final Set<ContainerReplica> addReplicas(ContainerInfo container,
ContainerReplicaProto.State replicaState,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
index d540118599..3267569684 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerScenarios.java
@@ -250,10 +250,10 @@ private void loadPendingOps(ContainerInfo container,
Scenario scenario) {
for (PendingReplica r : scenario.getPendingReplicas()) {
if (r.getType() == ContainerReplicaOp.PendingOpType.ADD) {
containerReplicaPendingOps.scheduleAddReplica(container.containerID(),
r.getDatanodeDetails(),
- r.getReplicaIndex(), Long.MAX_VALUE);
+ r.getReplicaIndex(), null, Long.MAX_VALUE);
} else if (r.getType() == ContainerReplicaOp.PendingOpType.DELETE) {
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
r.getDatanodeDetails(),
- r.getReplicaIndex(), Long.MAX_VALUE);
+ r.getReplicaIndex(), null, Long.MAX_VALUE);
}
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index df5f3ec0d2..732c75ce68 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -404,6 +404,8 @@ public void testContainerStatisticsAfterDelete() throws
Exception {
});
});
+ LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG);
+ logCapturer.clearOutput();
cluster.shutdownHddsDatanode(0);
replicationManager.processAll();
((EventQueue)scm.getEventQueue()).processAll(1000);
@@ -411,8 +413,6 @@ public void testContainerStatisticsAfterDelete() throws
Exception {
containerInfos.stream().forEach(container ->
assertEquals(HddsProtos.LifeCycleState.DELETING,
container.getState()));
- LogCapturer logCapturer = LogCapturer.captureLogs(ReplicationManager.LOG);
- logCapturer.clearOutput();
Thread.sleep(5000);
replicationManager.processAll();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]