This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fefc405577 HDDS-9122. Make the Datanode CloseContainerCommandHandler 
async by queuing commands in an executor (#5277)
fefc405577 is described below

commit fefc405577c63896e347724c5ab0db70370061ea
Author: Sumit Agrawal <[email protected]>
AuthorDate: Mon Sep 18 15:52:19 2023 +0530

    HDDS-9122. Make the Datanode CloseContainerCommandHandler async by queuing 
commands in an executor (#5277)
---
 .../common/statemachine/DatanodeConfiguration.java |  31 +++++
 .../common/statemachine/DatanodeStateMachine.java  |   4 +-
 .../CloseContainerCommandHandler.java              | 155 ++++++++++++---------
 .../commandhandler/CommandDispatcher.java          |   6 +-
 .../TestCloseContainerCommandHandler.java          |  23 ++-
 5 files changed, 146 insertions(+), 73 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 1127fa69d0..bbcbcf12fe 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -40,6 +40,8 @@ public class DatanodeConfiguration {
 
   static final String CONTAINER_DELETE_THREADS_MAX_KEY =
       "hdds.datanode.container.delete.threads.max";
+  static final String CONTAINER_CLOSE_THREADS_MAX_KEY =
+      "hdds.datanode.container.close.threads.max";
   static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
       "hdds.datanode.periodic.disk.check.interval.minutes";
   public static final String DISK_CHECK_FILE_SIZE_KEY =
@@ -115,6 +117,7 @@ public class DatanodeConfiguration {
   private int numReadThreadPerVolume = 10;
 
   static final int CONTAINER_DELETE_THREADS_DEFAULT = 2;
+  static final int CONTAINER_CLOSE_THREADS_DEFAULT = 3;
   static final int BLOCK_DELETE_THREADS_DEFAULT = 5;
 
   /**
@@ -130,6 +133,19 @@ public class DatanodeConfiguration {
   )
   private int containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT;
 
+  /**
+   * The maximum number of threads used to close containers on a datanode
+   * simultaneously.
+   */
+  @Config(key = "container.close.threads.max",
+      type = ConfigType.INT,
+      defaultValue = "3",
+      tags = {DATANODE},
+      description = "The maximum number of threads used to close containers " +
+          "on a datanode"
+  )
+  private int containerCloseThreads = CONTAINER_CLOSE_THREADS_DEFAULT;
+
   /**
    * The maximum number of threads used to handle delete block commands.
    * It takes about 200ms to open a RocksDB with HDD media, so basically DN
@@ -503,6 +519,13 @@ public class DatanodeConfiguration {
       containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT;
     }
 
+    if (containerCloseThreads < 1) {
+      LOG.warn(CONTAINER_CLOSE_THREADS_MAX_KEY + " must be greater than zero" +
+              " and was set to {}. Defaulting to {}",
+          containerCloseThreads, CONTAINER_CLOSE_THREADS_DEFAULT);
+      containerCloseThreads = CONTAINER_CLOSE_THREADS_DEFAULT;
+    }
+
     if (periodicDiskCheckIntervalMinutes < 1) {
       LOG.warn(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY +
               " must be greater than zero and was set to {}. Defaulting to {}",
@@ -621,6 +644,14 @@ public class DatanodeConfiguration {
     return containerDeleteThreads;
   }
 
+  public void setContainerCloseThreads(int containerCloseThreads) {
+    this.containerCloseThreads = containerCloseThreads;
+  }
+
+  public int getContainerCloseThreads() {
+    return containerCloseThreads;
+  }
+
   public long getPeriodicDiskCheckIntervalMinutes() {
     return periodicDiskCheckIntervalMinutes;
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a3724e0c47..5c18e86bd3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -225,7 +225,9 @@ public class DatanodeStateMachine implements Closeable {
     // When we add new handlers just adding a new handler here should do the
     // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
-        .addHandler(new CloseContainerCommandHandler())
+        .addHandler(new CloseContainerCommandHandler(
+            dnConf.getContainerCloseThreads(),
+            dnConf.getCommandQueueLimit()))
         .addHandler(new DeleteBlocksCommandHandler(getContainer(),
             conf, dnConf.getBlockDeleteThreads(),
             dnConf.getBlockDeleteQueueLimit()))
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index e7a0b2e9f8..b34082550f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -16,6 +16,13 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -49,13 +56,22 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
+  private final ExecutorService executor;
   private long totalTime;
 
   /**
    * Constructs a ContainerReport handler.
    */
-  public CloseContainerCommandHandler() {
+  public CloseContainerCommandHandler(
+      int threadPoolSize, int queueSize) {
+    executor = new ThreadPoolExecutor(
+            threadPoolSize, threadPoolSize,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(queueSize),
+            new ThreadFactoryBuilder()
+                .setNameFormat("CloseContainerThread-%d").build());
   }
 
   /**
@@ -69,73 +85,78 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    invocationCount.incrementAndGet();
-    final long startTime = Time.monotonicNow();
-    final DatanodeDetails datanodeDetails = context.getParent()
-        .getDatanodeDetails();
-    final CloseContainerCommandProto closeCommand =
-        ((CloseContainerCommand)command).getProto();
-    final ContainerController controller = ozoneContainer.getController();
-    final long containerId = closeCommand.getContainerID();
-    LOG.debug("Processing Close Container command container #{}",
-        containerId);
-    try {
-      final Container container = controller.getContainer(containerId);
-
-      if (container == null) {
-        LOG.error("Container #{} does not exist in datanode. "
-            + "Container close failed.", containerId);
-        return;
-      }
-
-      // move the container to CLOSING if in OPEN state
-      controller.markContainerForClose(containerId);
-
-      switch (container.getContainerState()) {
-      case OPEN:
-      case CLOSING:
-        // If the container is part of open pipeline, close it via write 
channel
-        if (ozoneContainer.getWriteChannel()
-            .isExist(closeCommand.getPipelineID())) {
-          ContainerCommandRequestProto request =
-              getContainerCommandRequestProto(datanodeDetails,
-                  closeCommand.getContainerID(),
-                  command.getEncodedToken());
-          ozoneContainer.getWriteChannel()
-              .submitRequest(request, closeCommand.getPipelineID());
-        } else if (closeCommand.getForce()) {
-          // Non-RATIS containers should have the force close flag set, so they
-          // are moved to CLOSED immediately rather than going to quasi-closed.
-          controller.closeContainer(containerId);
-        } else {
-          controller.quasiCloseContainer(containerId,
-              "Ratis pipeline does not exist");
-          LOG.info("Marking Container {} quasi closed", containerId);
+    queuedCount.incrementAndGet();
+    CompletableFuture.runAsync(() -> {
+      invocationCount.incrementAndGet();
+      final long startTime = Time.monotonicNow();
+      final DatanodeDetails datanodeDetails = context.getParent()
+          .getDatanodeDetails();
+      final CloseContainerCommandProto closeCommand =
+          ((CloseContainerCommand) command).getProto();
+      final ContainerController controller = ozoneContainer.getController();
+      final long containerId = closeCommand.getContainerID();
+      LOG.debug("Processing Close Container command container #{}",
+          containerId);
+      try {
+        final Container container = controller.getContainer(containerId);
+
+        if (container == null) {
+          LOG.error("Container #{} does not exist in datanode. "
+              + "Container close failed.", containerId);
+          return;
         }
-        break;
-      case QUASI_CLOSED:
-        if (closeCommand.getForce()) {
-          controller.closeContainer(containerId);
+
+        // move the container to CLOSING if in OPEN state
+        controller.markContainerForClose(containerId);
+
+        switch (container.getContainerState()) {
+        case OPEN:
+        case CLOSING:
+          // If the container is part of open pipeline, close it via
+          // write channel
+          if (ozoneContainer.getWriteChannel()
+              .isExist(closeCommand.getPipelineID())) {
+            ContainerCommandRequestProto request =
+                getContainerCommandRequestProto(datanodeDetails,
+                    closeCommand.getContainerID(),
+                    command.getEncodedToken());
+            ozoneContainer.getWriteChannel()
+                .submitRequest(request, closeCommand.getPipelineID());
+          } else if (closeCommand.getForce()) {
+            // Non-RATIS containers should have the force close flag set, so
+            // they are moved to CLOSED immediately rather than going to
+            // quasi-closed
+            controller.closeContainer(containerId);
+          } else {
+            controller.quasiCloseContainer(containerId,
+                "Ratis pipeline does not exist");
+            LOG.info("Marking Container {} quasi closed", containerId);
+          }
+          break;
+        case QUASI_CLOSED:
+          if (closeCommand.getForce()) {
+            controller.closeContainer(containerId);
+          }
+          break;
+        case CLOSED:
+          break;
+        case UNHEALTHY:
+        case INVALID:
+          LOG.debug("Cannot close the container #{}, the container is"
+              + " in {} state.", containerId, container.getContainerState());
+          break;
+        default:
+          break;
         }
-        break;
-      case CLOSED:
-        break;
-      case UNHEALTHY:
-      case INVALID:
-        LOG.debug("Cannot close the container #{}, the container is"
-            + " in {} state.", containerId, container.getContainerState());
-        break;
-      default:
-        break;
+      } catch (NotLeaderException e) {
+        LOG.debug("Follower cannot close container #{}.", containerId);
+      } catch (IOException e) {
+        LOG.error("Can't close container #{}", containerId, e);
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
       }
-    } catch (NotLeaderException e) {
-      LOG.debug("Follower cannot close container #{}.", containerId);
-    } catch (IOException e) {
-      LOG.error("Can't close container #{}", containerId, e);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
+    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
   }
 
   private ContainerCommandRequestProto getContainerCommandRequestProto(
@@ -195,6 +216,6 @@ public class CloseContainerCommandHandler implements 
CommandHandler {
 
   @Override
   public int getQueuedCount() {
-    return 0;
+    return queuedCount.get();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index ebd862ca52..b6ec70b2ef 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -100,7 +100,11 @@ public final class CommandDispatcher {
     CommandHandler handler = handlerMap.get(command.getType());
     if (handler != null) {
       commandHandlerMetrics.increaseCommandCount(command.getType());
-      handler.handle(command, container, context, connectionManager);
+      try {
+        handler.handle(command, container, context, connectionManager);
+      } catch (Exception ex) {
+        LOG.error("Exception while handle command, ", ex);
+      }
     } else {
       LOG.error("Unknown SCM Command queued. There is no handler for this " +
           "command. Command: {}", command.getType().getDescriptorForType()
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 4d3d435ed2..e6267550be 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -70,7 +71,7 @@ public class TestCloseContainerCommandHandler {
   private ContainerController controller;
   private ContainerSet containerSet;
   private CloseContainerCommandHandler subject =
-      new CloseContainerCommandHandler();
+      new CloseContainerCommandHandler(1, 1000);
 
   private final ContainerLayoutVersion layout;
 
@@ -120,7 +121,7 @@ public class TestCloseContainerCommandHandler {
   public void closeContainerWithPipeline() throws Exception {
     // close a container that's associated with an existing pipeline
     subject.handle(closeWithKnownPipeline(), ozoneContainer, context, null);
-
+    waitTillFinishExecution(subject);
     verify(containerHandler)
         .markContainerForClose(container);
     verify(writeChannel)
@@ -130,9 +131,10 @@ public class TestCloseContainerCommandHandler {
   }
 
   @Test
-  public void closeContainerWithoutPipeline() throws IOException {
+  public void closeContainerWithoutPipeline() throws Exception {
     // close a container that's NOT associated with an open pipeline
     subject.handle(closeWithUnknownPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(containerHandler)
         .markContainerForClose(container);
@@ -145,9 +147,10 @@ public class TestCloseContainerCommandHandler {
   }
 
   @Test
-  public void closeContainerWithForceFlagSet() throws IOException {
+  public void closeContainerWithForceFlagSet() throws Exception {
     // close a container that's associated with an existing pipeline
     subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(containerHandler)
         .markContainerForClose(container);
@@ -162,6 +165,7 @@ public class TestCloseContainerCommandHandler {
         .setState(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED);
 
     subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(writeChannel, never())
         .submitRequest(any(), any());
@@ -173,6 +177,7 @@ public class TestCloseContainerCommandHandler {
   public void forceCloseOpenContainer() throws Exception {
     // force-close a container that's NOT associated with an open pipeline
     subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(writeChannel, never())
         .submitRequest(any(), any());
@@ -186,6 +191,7 @@ public class TestCloseContainerCommandHandler {
   public void forceCloseOpenContainerWithPipeline() throws Exception {
     // force-close a container that's associated with an existing pipeline
     subject.handle(forceCloseWithPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(containerHandler)
         .markContainerForClose(container);
@@ -205,7 +211,9 @@ public class TestCloseContainerCommandHandler {
     // Since the container is already closed, these commands should do nothing,
     // neither should they fail
     subject.handle(closeWithUnknownPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
     subject.handle(closeWithKnownPipeline(), ozoneContainer, context, null);
+    waitTillFinishExecution(subject);
 
     verify(containerHandler, never())
         .markContainerForClose(container);
@@ -279,4 +287,11 @@ public class TestCloseContainerCommandHandler {
         .addPort(restPort);
     return builder.build();
   }
+
+  private void waitTillFinishExecution(
+      CloseContainerCommandHandler closeHandler)
+      throws InterruptedException, TimeoutException {
+    GenericTestUtils.waitFor(()
+        -> closeHandler.getQueuedCount() <= 0, 10, 3000);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to