This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a48cc95d HDDS-8155. SCM Block deleting service add transaction 
blocks for stale/dead/decommissioning DNs (#4403)
d9a48cc95d is described below

commit d9a48cc95d2ac22f54e20385b0ba9384e021a24f
Author: Sumit Agrawal <[email protected]>
AuthorDate: Thu Mar 23 13:21:03 2023 +0530

    HDDS-8155. SCM Block deleting service add transaction blocks for 
stale/dead/decommissioning DNs (#4403)
---
 .../apache/hadoop/hdds/scm/block/DeletedBlockLog.java   |  7 ++++++-
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java      | 14 ++++++++++----
 .../hadoop/hdds/scm/block/SCMBlockDeletingService.java  | 16 ++++++++--------
 .../apache/hadoop/hdds/scm/node/DeadNodeHandler.java    |  8 ++++++++
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java      | 17 +++++++++++++++--
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java       |  7 ++++++-
 .../hadoop/ozone/container/TestECContainerRecovery.java |  7 +++++--
 7 files changed, 58 insertions(+), 18 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index ea2013917d..20a68e5010 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
+import java.util.Set;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
     .DeleteBlockTransactionResult;
@@ -43,11 +45,14 @@ public interface DeletedBlockLog extends Closeable {
    * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
    * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
    * stop.
+   *
    * @param blockDeletionLimit Maximum number of blocks to fetch
+   * @param dnList healthy dn list
    * @return Mapping from containerId to latest transactionId for the 
container.
    * @throws IOException
    */
-  DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit)
+  DatanodeDeletedBlockTransactions getTransactions(
+      int blockDeletionLimit, Set<DatanodeDetails> dnList)
       throws IOException, TimeoutException;
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 589a9f3f76..52b8be9d64 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -401,8 +401,10 @@ public class DeletedBlockLogImpl
   public void close() throws IOException {
   }
 
-  private void getTransaction(DeletedBlocksTransaction tx,
-      DatanodeDeletedBlockTransactions transactions) {
+  private void getTransaction(
+      DeletedBlocksTransaction tx,
+      DatanodeDeletedBlockTransactions transactions,
+      Set<DatanodeDetails> dnList) {
     try {
       DeletedBlocksTransaction updatedTxn = DeletedBlocksTransaction
           .newBuilder(tx)
@@ -413,6 +415,9 @@ public class DeletedBlockLogImpl
               ContainerID.valueOf(updatedTxn.getContainerID()));
       for (ContainerReplica replica : replicas) {
         UUID dnID = replica.getDatanodeDetails().getUuid();
+        if (!dnList.contains(replica.getDatanodeDetails())) {
+          continue;
+        }
         Set<UUID> dnsWithTransactionCommitted =
             transactionToDNsCommitMap.get(updatedTxn.getTxID());
         if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
@@ -429,7 +434,8 @@ public class DeletedBlockLogImpl
 
   @Override
   public DatanodeDeletedBlockTransactions getTransactions(
-      int blockDeletionLimit) throws IOException, TimeoutException {
+      int blockDeletionLimit, Set<DatanodeDetails> dnList)
+      throws IOException, TimeoutException {
     lock.lock();
     try {
       DatanodeDeletedBlockTransactions transactions =
@@ -455,7 +461,7 @@ public class DeletedBlockLogImpl
               txIDs.add(txn.getTxID());
             } else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
                 && !containerManager.getContainer(id).isOpen()) {
-              getTransaction(txn, transactions);
+              getTransaction(txn, transactions, dnList);
               transactionToDNsCommitMap
                   .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
             }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 1b587c7b4b..abb77abefd 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -134,14 +135,18 @@ public class SCMBlockDeletingService extends 
BackgroundService
       if (LOG.isDebugEnabled()) {
         LOG.debug("Running DeletedBlockTransactionScanner");
       }
-      // TODO - DECOMM - should we be deleting blocks from decom nodes
-      //        and what about entering maintenance.
       List<DatanodeDetails> datanodes =
           nodeManager.getNodes(NodeStatus.inServiceHealthy());
       if (datanodes != null) {
+        // When DN node is healthy and in-service, and previous commands 
+        // are handled for deleteBlocks Type, then it will be considered
+        // in this iteration
+        final Set<DatanodeDetails> included = datanodes.stream().filter(
+            dn -> nodeManager.getCommandQueueCount(dn.getUuid(),
+                Type.deleteBlocksCommand) == 0).collect(Collectors.toSet());
         try {
           DatanodeDeletedBlockTransactions transactions =
-              deletedBlockLog.getTransactions(blockDeleteLimitSize);
+              deletedBlockLog.getTransactions(blockDeleteLimitSize, included);
 
           if (transactions.isEmpty()) {
             return EmptyTaskResult.newResult();
@@ -156,10 +161,6 @@ public class SCMBlockDeletingService extends 
BackgroundService
               processedTxIDs.addAll(dnTXs.stream()
                   .map(DeletedBlocksTransaction::getTxID)
                   .collect(Collectors.toSet()));
-              // TODO commandQueue needs a cap.
-              // We should stop caching new commands if num of un-processed
-              // command is bigger than a limit, e.g 50. In case datanode goes
-              // offline for sometime, the cached commands be flooded.
               SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
               command.setTerm(scmContext.getTermOfLeader());
               eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
@@ -176,7 +177,6 @@ public class SCMBlockDeletingService extends 
BackgroundService
               }
             }
           }
-          // TODO: Fix ME!!!
           LOG.info("Totally added {} blocks to be deleted for"
                   + " {} datanodes, task elapsed time: {}ms",
               transactions.getBlocksDeleted(),
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 55ae6d1b0f..dd332913b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeoutException;
 
@@ -35,6 +36,7 @@ import 
org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +89,12 @@ public class DeadNodeHandler implements 
EventHandler<DatanodeDetails> {
       if (!nodeManager.getNodeStatus(datanodeDetails).isInMaintenance()) {
         removeContainerReplicas(datanodeDetails);
       }
+      
+      // remove commands in command queue for the DN
+      final List<SCMCommand> cmdList = nodeManager.getCommandQueue(
+          datanodeDetails.getUuid());
+      LOG.info("Clearing command queue of size {} for DN {}",
+          cmdList.size(), datanodeDetails);
 
       //move dead datanode out of ClusterNetworkTopology
       NetworkTopology nt = nodeManager.getClusterNetworkTopologyMap();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
index 033c1a0745..e519d7c56e 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java
@@ -296,7 +296,8 @@ public class TestDeletedBlockLog {
   private List<DeletedBlocksTransaction> getTransactions(
       int maximumAllowedBlocksNum) throws IOException, TimeoutException {
     DatanodeDeletedBlockTransactions transactions =
-        deletedBlockLog.getTransactions(maximumAllowedBlocksNum);
+        deletedBlockLog.getTransactions(maximumAllowedBlocksNum,
+            dnList.stream().collect(Collectors.toSet()));
     List<DeletedBlocksTransaction> txns = new LinkedList<>();
     for (DatanodeDetails dn : dnList) {
       txns.addAll(Optional.ofNullable(
@@ -440,6 +441,17 @@ public class TestDeletedBlockLog {
     Assertions.assertEquals(0, blocks.size());
   }
 
+  @Test
+  public void testDNOnlyOneNodeHealthy() throws Exception {
+    Map<Long, List<Long>> deletedBlocks = generateData(50);
+    addTransactions(deletedBlocks, true);
+    DatanodeDeletedBlockTransactions transactions
+        = deletedBlockLog.getTransactions(
+        30 * BLOCKS_PER_TXN * THREE,
+        dnList.subList(0, 1).stream().collect(Collectors.toSet()));
+    Assertions.assertEquals(1, 
transactions.getDatanodeTransactionMap().size());
+  }
+
   @Test
   public void testInadequateReplicaCommit() throws Exception {
     Map<Long, List<Long>> deletedBlocks = generateData(50);
@@ -461,7 +473,8 @@ public class TestDeletedBlockLog {
     // For the rest txn, txn will be got from all dns.
     // Committed txn will be: 1-40. 1-40. 31-40
     commitTransactions(deletedBlockLog.getTransactions(
-        30 * BLOCKS_PER_TXN * THREE));
+        30 * BLOCKS_PER_TXN * THREE,
+        dnList.stream().collect(Collectors.toSet())));
 
     // The rest txn shall be: 41-50. 41-50. 41-50
     List<DeletedBlocksTransaction> blocks = getAllTransactions();
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 0b29853339..168fdd11a5 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -25,6 +25,7 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLU
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdds.server.events.EventPublisher;
 
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.security.authentication.client
     .AuthenticationException;
 import org.apache.ozone.test.GenericTestUtils;
@@ -244,9 +246,10 @@ public class TestDeadNodeHandler {
                 ContainerID.valueOf(container3.getContainerID()));
     Assertions.assertEquals(1, container3Replicas.size());
 
-
     // Now set the node to anything other than IN_MAINTENANCE and the relevant
     // replicas should be removed
+    DeleteBlocksCommand cmd = new DeleteBlocksCommand(Collections.emptyList());
+    nodeManager.addDatanodeCommand(datanode1.getUuid(), cmd);
     nodeManager.setNodeOperationalState(datanode1,
         HddsProtos.NodeOperationalState.IN_SERVICE);
     deadNodeHandler.onMessage(datanode1, publisher);
@@ -254,6 +257,8 @@ public class TestDeadNodeHandler {
     //deadNodeHandler.onMessage call will not change this
     Assertions.assertFalse(
         nodeManager.getClusterNetworkTopologyMap().contains(datanode1));
+    Assertions.assertEquals(0, 
+        nodeManager.getCommandQueueCount(datanode1.getUuid(), cmd.getType()));
 
     container1Replicas = containerManager
         
.getContainerReplicas(ContainerID.valueOf(container1.getContainerID()));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index 5cccdf716a..73d9252e57 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -248,12 +248,15 @@ public class TestECContainerRecovery {
     cluster.restartHddsDatanode(pipeline.getFirstNode(), true);
     // Check container is over replicated.
     waitForContainerCount(6, container.containerID(), scm);
+
+    // Resume RM and wait the over replicated replica deleted.
+    // ReplicationManager fix container replica state if different
+    scm.getReplicationManager().start();
+
     // Wait for all the replicas to be closed.
     container = scm.getContainerInfo(container.getContainerID());
     waitForDNContainerState(container, scm);
 
-    // Resume RM and wait the over replicated replica deleted.
-    scm.getReplicationManager().start();
     waitForContainerCount(5, container.containerID(), scm);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to