aswinshakil commented on code in PR #3990:
URL: https://github.com/apache/ozone/pull/3990#discussion_r1029863048
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -401,22 +403,54 @@ public void sendCloseContainerEvent(ContainerID
containerID) {
*/
public void sendDeleteCommand(final ContainerInfo container, int
replicaIndex,
final DatanodeDetails datanode) throws NotLeaderException {
- LOG.info("Sending delete container command for container {}" +
- " to datanode {}", container.containerID(), datanode);
-
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.containerID(), false);
- deleteCommand.setTerm(getScmTerm());
-
- final CommandForDatanode<DeleteContainerCommandProto> datanodeCommand =
- new CommandForDatanode<>(datanode.getUuid(), deleteCommand);
+ deleteCommand.setReplicaIndex(replicaIndex);
+ sendDatanodeCommand(deleteCommand, container, datanode);
+ }
+
+ public void sendDatanodeCommand(SCMCommand<?> command,
+ ContainerInfo containerInfo, DatanodeDetails target)
+ throws NotLeaderException {
+ LOG.info("Sending command of type {} for container {} to {}",
+ command.getType(), containerInfo, target);
+ command.setTerm(getScmTerm());
+ final CommandForDatanode<?> datanodeCommand =
+ new CommandForDatanode<>(target.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
- containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
- datanode, replicaIndex);
-
- synchronized (this) {
- metrics.incrNumDeletionCmdsSent();
- metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+ adjustPendingOpsAndMetrics(containerInfo, command, target);
+ }
+
+ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
+ SCMCommand<?> cmd, DatanodeDetails targetDatanode) {
+ if (cmd.getType() == Type.deleteContainerCommand) {
+ DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
+ containerReplicaPendingOps.scheduleDeleteReplica(
+ containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex());
+ if (rcc.getReplicaIndex() > 0) {
+ getMetrics().incrEcDeletionCmdsSentTotal();
+ } else if (rcc.getReplicaIndex() == 0) {
+ getMetrics().incrNumDeletionCmdsSent();
+ getMetrics().incrNumDeletionBytesTotal(containerInfo.getUsedBytes());
+ }
+ } else if (cmd.getType() == Type.reconstructECContainersCommand) {
+ ReconstructECContainersCommand rcc = (ReconstructECContainersCommand)
cmd;
+ List<DatanodeDetails> targets = rcc.getTargetDatanodes();
+ byte[] targetIndexes = rcc.getMissingContainerIndexes();
+ for (int i = 0; i < targetIndexes.length; i++) {
+ containerReplicaPendingOps.scheduleAddReplica(
+ containerInfo.containerID(), targets.get(i), targetIndexes[i]);
+ }
+ getMetrics().incrEcReconstructionCmdsSentTotal();
Review Comment:
Shouldn't metrics be inside the `for` loop? As we are sending Reconstruction
commands for each replica?
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -401,22 +403,54 @@ public void sendCloseContainerEvent(ContainerID
containerID) {
*/
public void sendDeleteCommand(final ContainerInfo container, int
replicaIndex,
final DatanodeDetails datanode) throws NotLeaderException {
- LOG.info("Sending delete container command for container {}" +
- " to datanode {}", container.containerID(), datanode);
-
final DeleteContainerCommand deleteCommand =
new DeleteContainerCommand(container.containerID(), false);
- deleteCommand.setTerm(getScmTerm());
-
- final CommandForDatanode<DeleteContainerCommandProto> datanodeCommand =
- new CommandForDatanode<>(datanode.getUuid(), deleteCommand);
+ deleteCommand.setReplicaIndex(replicaIndex);
+ sendDatanodeCommand(deleteCommand, container, datanode);
+ }
+
+ public void sendDatanodeCommand(SCMCommand<?> command,
+ ContainerInfo containerInfo, DatanodeDetails target)
+ throws NotLeaderException {
+ LOG.info("Sending command of type {} for container {} to {}",
+ command.getType(), containerInfo, target);
+ command.setTerm(getScmTerm());
+ final CommandForDatanode<?> datanodeCommand =
+ new CommandForDatanode<>(target.getUuid(), command);
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
- containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
- datanode, replicaIndex);
-
- synchronized (this) {
- metrics.incrNumDeletionCmdsSent();
- metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+ adjustPendingOpsAndMetrics(containerInfo, command, target);
+ }
+
+ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo,
+ SCMCommand<?> cmd, DatanodeDetails targetDatanode) {
+ if (cmd.getType() == Type.deleteContainerCommand) {
+ DeleteContainerCommand rcc = (DeleteContainerCommand) cmd;
+ containerReplicaPendingOps.scheduleDeleteReplica(
+ containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex());
+ if (rcc.getReplicaIndex() > 0) {
+ getMetrics().incrEcDeletionCmdsSentTotal();
+ } else if (rcc.getReplicaIndex() == 0) {
+ getMetrics().incrNumDeletionCmdsSent();
+ getMetrics().incrNumDeletionBytesTotal(containerInfo.getUsedBytes());
+ }
+ } else if (cmd.getType() == Type.reconstructECContainersCommand) {
+ ReconstructECContainersCommand rcc = (ReconstructECContainersCommand)
cmd;
+ List<DatanodeDetails> targets = rcc.getTargetDatanodes();
+ byte[] targetIndexes = rcc.getMissingContainerIndexes();
+ for (int i = 0; i < targetIndexes.length; i++) {
+ containerReplicaPendingOps.scheduleAddReplica(
+ containerInfo.containerID(), targets.get(i), targetIndexes[i]);
+ }
+ getMetrics().incrEcReconstructionCmdsSentTotal();
+ } else if (cmd.getType() == Type.replicateContainerCommand) {
+ ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd;
+ containerReplicaPendingOps.scheduleAddReplica(
+ containerInfo.containerID(), targetDatanode, rcc.getReplicaIndex());
+ if (rcc.getReplicaIndex() > 0) {
+ getMetrics().incrEcReplicationCmdsSentTotal();
+ } else if (rcc.getReplicaIndex() == 0) {
+ getMetrics().incrNumReplicationCmdsSent();
+ }
Review Comment:
Can we throw an exception when the command type doesn't match any condition
like previously?
```
else {
throw new IOException("Unexpected command type " + cmd.getType());
}
```
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java:
##########
@@ -341,6 +352,168 @@ public void testUnderReplicationQueuePopulated() {
Assert.assertNull(res);
}
+ @Test
+ public void testSendDatanodeDeleteCommand() throws NotLeaderException {
+ ECReplicationConfig ecRepConfig = new ECReplicationConfig(3, 2);
+ ContainerInfo containerInfo =
+ ReplicationTestUtil.createContainerInfo(ecRepConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED, 10, 20);
+ DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails();
+
+ DeleteContainerCommand deleteContainerCommand = new DeleteContainerCommand(
+ containerInfo.getContainerID());
+ deleteContainerCommand.setReplicaIndex(1);
+
+ replicationManager.sendDatanodeCommand(deleteContainerCommand,
+ containerInfo, target);
+
+ List<ContainerReplicaOp> ops = containerReplicaPendingOps.getPendingOps(
+ containerInfo.containerID());
+ Mockito.verify(eventPublisher).fireEvent(any(), any());
+ Assertions.assertEquals(1, ops.size());
+ Assertions.assertEquals(ContainerReplicaOp.PendingOpType.DELETE,
+ ops.get(0).getOpType());
+ Assertions.assertEquals(target, ops.get(0).getTarget());
+ Assertions.assertEquals(1, ops.get(0).getReplicaIndex());
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getEcDeletionCmdsSentTotal());
+ Assertions.assertEquals(0, replicationManager.getMetrics()
+ .getNumDeletionCmdsSent());
+
+ // Repeat with Ratis container, as different metrics should be incremented
+ Mockito.clearInvocations(eventPublisher);
+ RatisReplicationConfig ratisRepConfig =
+ RatisReplicationConfig.getInstance(THREE);
+ containerInfo = ReplicationTestUtil.createContainerInfo(ratisRepConfig, 2,
+ HddsProtos.LifeCycleState.CLOSED, 10, 20);
+
+ deleteContainerCommand = new DeleteContainerCommand(
+ containerInfo.getContainerID());
+ replicationManager.sendDatanodeCommand(deleteContainerCommand,
+ containerInfo, target);
+
+ ops =
containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
+ Mockito.verify(eventPublisher).fireEvent(any(), any());
+ Assertions.assertEquals(1, ops.size());
+ Assertions.assertEquals(ContainerReplicaOp.PendingOpType.DELETE,
+ ops.get(0).getOpType());
+ Assertions.assertEquals(target, ops.get(0).getTarget());
+ Assertions.assertEquals(0, ops.get(0).getReplicaIndex());
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getEcDeletionCmdsSentTotal());
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getNumDeletionCmdsSent());
+ Assertions.assertEquals(20, replicationManager.getMetrics()
+ .getNumDeletionBytesTotal());
+ }
+
+ @Test
+ public void testSendDatanodeReconstructCommand() throws NotLeaderException {
+ ECReplicationConfig ecRepConfig = new ECReplicationConfig(3, 2);
+ ContainerInfo containerInfo =
+ ReplicationTestUtil.createContainerInfo(ecRepConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED, 10, 20);
+
+ List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
+ sourceNodes = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ sourceNodes.add(
+ new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
+ MockDatanodeDetails.randomDatanodeDetails(), i));
+ }
+ List<DatanodeDetails> targetNodes = new ArrayList<>();
+ DatanodeDetails target4 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails target5 = MockDatanodeDetails.randomDatanodeDetails();
+ targetNodes.add(target4);
+ targetNodes.add(target5);
+ byte[] missingIndexes = {4, 5};
+
+ ReconstructECContainersCommand command = new
ReconstructECContainersCommand(
+ containerInfo.getContainerID(), sourceNodes, targetNodes,
+ missingIndexes, ecRepConfig);
+
+ replicationManager.sendDatanodeCommand(command, containerInfo, target4);
+
+ List<ContainerReplicaOp> ops = containerReplicaPendingOps.getPendingOps(
+ containerInfo.containerID());
+ Mockito.verify(eventPublisher).fireEvent(any(), any());
+ Assertions.assertEquals(2, ops.size());
+ Set<DatanodeDetails> cmdTargets = new HashSet<>();
+ Set<Integer> cmdIndexes = new HashSet<>();
+ for (ContainerReplicaOp op : ops) {
+ Assertions.assertEquals(ADD, op.getOpType());
+ cmdTargets.add(op.getTarget());
+ cmdIndexes.add(op.getReplicaIndex());
+ }
+ Assertions.assertEquals(2, cmdTargets.size());
+ for (DatanodeDetails dn : targetNodes) {
+ Assertions.assertTrue(cmdTargets.contains(dn));
+ }
+
+ Assertions.assertEquals(2, cmdIndexes.size());
+ for (int i : missingIndexes) {
+ Assertions.assertTrue(cmdIndexes.contains(i));
+ }
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getEcReconstructionCmdsSentTotal());
+ }
+
+ @Test
+ public void testSendDatanodeRewplicateCommand() throws NotLeaderException {
Review Comment:
```suggestion
public void testSendDatanodeReplicateCommand() throws NotLeaderException {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]