This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 43c9565d9d HDDS-8888. Consider Datanode queue capacity when sending 
DeleteBlocks command (#4939)
43c9565d9d is described below

commit 43c9565d9d0250af5694d10c02f7634b79206ab6
Author: XiChen <[email protected]>
AuthorDate: Sun Jan 7 05:38:30 2024 +0800

    HDDS-8888. Consider Datanode queue capacity when sending DeleteBlocks 
command (#4939)
---
 .../common/statemachine/DatanodeConfiguration.java |   6 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |  41 ++++-
 .../scm/block/TestSCMBlockDeletingService.java     | 177 +++++++++++++++++++++
 3 files changed, 215 insertions(+), 9 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 3272aedb66..12ac40cabc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -185,7 +185,11 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
       defaultValue = "5",
       tags = {DATANODE},
       description = "The maximum number of block delete commands queued on " +
-          " a datanode"
+          " a datanode, This configuration is also used by the SCM to " +
+          "control whether to send delete commands to the DN. If the DN" +
+          " has more commands waiting in the queue than this value, " +
+          "the SCM will not send any new block delete commands. until the " +
+          "DN has processed some commands and the queue length is reduced."
   )
   private int blockDeleteQueueLimit = 5;
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 8677baf33b..7271d9dcba 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -42,11 +42,13 @@ import org.apache.hadoop.hdds.scm.ha.SCMService;
 import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.BackgroundService;
 import org.apache.hadoop.hdds.utils.BackgroundTask;
 import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
 import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.util.Time;
@@ -90,6 +92,7 @@ public class SCMBlockDeletingService extends BackgroundService
 
   private long safemodeExitMillis = 0;
   private final long safemodeExitRunDelayMillis;
+  private final long deleteBlocksPendingCommandLimit;
   private final Clock clock;
 
   @SuppressWarnings("parameternumber")
@@ -110,6 +113,9 @@ public class SCMBlockDeletingService extends 
BackgroundService
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
         HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
         TimeUnit.MILLISECONDS);
+    DatanodeConfiguration dnConf =
+        conf.getObject(DatanodeConfiguration.class);
+    this.deleteBlocksPendingCommandLimit = dnConf.getBlockDeleteQueueLimit();
     this.clock = clock;
     this.deletedBlockLog = deletedBlockLog;
     this.nodeManager = nodeManager;
@@ -155,13 +161,12 @@ public class SCMBlockDeletingService extends 
BackgroundService
       List<DatanodeDetails> datanodes =
           nodeManager.getNodes(NodeStatus.inServiceHealthy());
       if (datanodes != null) {
-        // When DN node is healthy and in-service, and previous commands 
-        // are handled for deleteBlocks Type, then it will be considered
-        // in this iteration
-        final Set<DatanodeDetails> included = datanodes.stream().filter(
-            dn -> nodeManager.getCommandQueueCount(dn.getUuid(),
-                Type.deleteBlocksCommand) == 0).collect(Collectors.toSet());
         try {
+          // When DN node is healthy and in-service, and their number of
+          // 'deleteBlocks' type commands is below the limit.
+          // These nodes will be considered for this iteration.
+          final Set<DatanodeDetails> included =
+              getDatanodesWithinCommandLimit(datanodes);
           DatanodeDeletedBlockTransactions transactions =
               deletedBlockLog.getTransactions(getBlockDeleteTXNum(), included);
 
@@ -205,7 +210,8 @@ public class SCMBlockDeletingService extends 
BackgroundService
           deletedBlockLog.incrementCount(new ArrayList<>(processedTxIDs));
         } catch (NotLeaderException nle) {
           LOG.warn("Skip current run, since not leader any more.", nle);
-          return EmptyTaskResult.newResult();
+        } catch (NodeNotFoundException e) {
+          LOG.error("Datanode not found in NodeManager. Should not happen", e);
         } catch (IOException e) {
           // We may tolerate a number of failures for sometime
           // but if it continues to fail, at some point we need to raise
@@ -213,7 +219,6 @@ public class SCMBlockDeletingService extends 
BackgroundService
           // continues to retry the scanning.
           LOG.error("Failed to get block deletion transactions from delTX log",
               e);
-          return EmptyTaskResult.newResult();
         }
       }
 
@@ -283,4 +288,24 @@ public class SCMBlockDeletingService extends 
BackgroundService
   public ScmBlockDeletingServiceMetrics getMetrics() {
     return this.metrics;
   }
+
+  /**
+   * Filters and returns a set of healthy datanodes that have not exceeded
+   * the deleteBlocksPendingCommandLimit.
+   *
+   * @param datanodes a list of DatanodeDetails
+   * @return a set of filtered DatanodeDetails
+   */
+  @VisibleForTesting
+  protected Set<DatanodeDetails> getDatanodesWithinCommandLimit(
+      List<DatanodeDetails> datanodes) throws NodeNotFoundException {
+    final Set<DatanodeDetails> included = new HashSet<>();
+    for (DatanodeDetails dn : datanodes) {
+      if (nodeManager.getTotalDatanodeCommandCount(dn, 
Type.deleteBlocksCommand) < deleteBlocksPendingCommandLimit
+          && nodeManager.getCommandQueueCount(dn.getUuid(), 
Type.deleteBlocksCommand) < 2) {
+        included.add(dn);
+      }
+    }
+    return included;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
new file mode 100644
index 0000000000..3bd7ad00f6
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestSCMBlockDeletingService.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.ReconfigurationHandler;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test SCMBlockDeletingService.
+ */
+public class TestSCMBlockDeletingService {
+  private SCMBlockDeletingService service;
+  private EventPublisher eventPublisher;
+  private List<DatanodeDetails> datanodeDetails;
+  private OzoneConfiguration conf;
+  private NodeManager nodeManager;
+  private ScmBlockDeletingServiceMetrics metrics;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    nodeManager = mock(NodeManager.class);
+    eventPublisher = mock(EventPublisher.class);
+    conf = new OzoneConfiguration();
+    metrics = ScmBlockDeletingServiceMetrics.create();
+    when(nodeManager.getTotalDatanodeCommandCount(any(),
+        any())).thenReturn(0);
+    SCMServiceManager scmServiceManager = mock(SCMServiceManager.class);
+    SCMContext scmContext = mock(SCMContext.class);
+
+    DatanodeDeletedBlockTransactions ddbt =
+        new DatanodeDeletedBlockTransactions();
+    DatanodeDetails datanode1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails datanode2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails datanode3 = MockDatanodeDetails.randomDatanodeDetails();
+    datanodeDetails = Arrays.asList(datanode1, datanode2, datanode3);
+    when(nodeManager.getNodes(NodeStatus.inServiceHealthy())).thenReturn(
+        datanodeDetails);
+    DeletedBlocksTransaction tx1 = createTestDeleteTxn(1, Arrays.asList(1L), 
1);
+    ddbt.addTransactionToDN(datanode1.getUuid(), tx1);
+    ddbt.addTransactionToDN(datanode2.getUuid(), tx1);
+    ddbt.addTransactionToDN(datanode3.getUuid(), tx1);
+    DeletedBlockLog mockDeletedBlockLog = mock(DeletedBlockLog.class);
+    when(mockDeletedBlockLog.getTransactions(
+        anyInt(), anySet())).thenReturn(ddbt);
+
+    service = spy(new SCMBlockDeletingService(
+        mockDeletedBlockLog, nodeManager, eventPublisher, scmContext,
+        scmServiceManager, conf, conf.getObject(ScmConfig.class), metrics, 
Clock.system(
+        ZoneOffset.UTC), mock(ReconfigurationHandler.class)));
+    when(service.shouldRun()).thenReturn(true);
+  }
+
+  @AfterEach
+  public void stop() {
+    service.stop();
+    ScmBlockDeletingServiceMetrics.unRegister();
+  }
+
+  @Test
+  public void testCall() throws Exception {
+    callDeletedBlockTransactionScanner();
+
+    ArgumentCaptor<CommandForDatanode> argumentCaptor =
+        ArgumentCaptor.forClass(CommandForDatanode.class);
+
+    // Three Datanode is healthy and in-service, and the task queue is empty,
+    // so the transaction will send to all three Datanode
+    verify(eventPublisher, times(3)).fireEvent(
+        eq(SCMEvents.DATANODE_COMMAND), argumentCaptor.capture());
+    List<CommandForDatanode> actualCommands = argumentCaptor.getAllValues();
+    List<UUID> actualDnIds = actualCommands.stream()
+        .map(CommandForDatanode::getDatanodeId)
+        .collect(Collectors.toList());
+    Set<UUID> expectedDnIdsSet = datanodeDetails.stream()
+        .map(DatanodeDetails::getUuid).collect(Collectors.toSet());
+
+    assertEquals(expectedDnIdsSet, new HashSet<>(actualDnIds));
+    assertEquals(datanodeDetails.size(),
+        metrics.getNumBlockDeletionCommandSent());
+    // Echo Command has one Transaction
+    assertEquals(datanodeDetails.size() * 1,
+        metrics.getNumBlockDeletionTransactionSent());
+  }
+
+  private void callDeletedBlockTransactionScanner() throws Exception {
+    service.getTasks().poll().call();
+  }
+
+  @Test
+  public void testLimitCommandSending() throws Exception {
+    DatanodeConfiguration dnConf =
+        conf.getObject(DatanodeConfiguration.class);
+    int pendingCommandLimit = dnConf.getBlockDeleteQueueLimit();
+
+    // The number of commands pending on all Datanodes has reached the limit.
+    when(nodeManager.getTotalDatanodeCommandCount(any(),
+        any())).thenReturn(pendingCommandLimit);
+    assertEquals(0,
+        service.getDatanodesWithinCommandLimit(datanodeDetails).size());
+
+    // The number of commands pending on all Datanodes is 0
+    when(nodeManager.getTotalDatanodeCommandCount(any(),
+        any())).thenReturn(0);
+    assertEquals(datanodeDetails.size(),
+        service.getDatanodesWithinCommandLimit(datanodeDetails).size());
+
+    // The number of commands pending on first Datanodes has reached the limit.
+    DatanodeDetails fullDatanode = datanodeDetails.get(0);
+    when(nodeManager.getTotalDatanodeCommandCount(fullDatanode,
+        Type.deleteBlocksCommand)).thenReturn(pendingCommandLimit);
+    Set<DatanodeDetails> includeNodes =
+        service.getDatanodesWithinCommandLimit(datanodeDetails);
+    assertEquals(datanodeDetails.size() - 1,
+        includeNodes.size());
+    assertFalse(includeNodes.contains(fullDatanode));
+  }
+
+  private DeletedBlocksTransaction createTestDeleteTxn(
+      long txnID, List<Long> blocks, long containerID) {
+    return DeletedBlocksTransaction.newBuilder().setTxID(txnID)
+        .setContainerID(containerID).addAllLocalID(blocks).setCount(0).build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to