This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 812d23f070 HDDS-8624. Process pipeline commands asynchronously in 
datanode (#4713)
812d23f070 is described below

commit 812d23f0708d6c93945620e812f02665a60e1ba3
Author: hao guo <[email protected]>
AuthorDate: Thu May 25 17:37:30 2023 +0800

    HDDS-8624. Process pipeline commands asynchronously in datanode (#4713)
---
 .../common/statemachine/DatanodeStateMachine.java  |  45 ++++++---
 .../ClosePipelineCommandHandler.java               |  58 ++++++-----
 .../CreatePipelineCommandHandler.java              | 108 ++++++++++++---------
 .../TestCreatePipelineCommandHandler.java          |   7 +-
 4 files changed, 128 insertions(+), 90 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 73d064f04b..1cfaa44da7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -92,6 +92,7 @@ public class DatanodeStateMachine implements Closeable {
   static final Logger LOG =
       LoggerFactory.getLogger(DatanodeStateMachine.class);
   private final ExecutorService executorService;
+  private final ExecutorService pipelineCommandExecutorService;
   private final ConfigurationSource conf;
   private final SCMConnectionManager connectionManager;
   private final ECReconstructionCoordinator ecReconstructionCoordinator;
@@ -216,6 +217,11 @@ public class DatanodeStateMachine implements Closeable {
     reconstructECContainersCommandHandler =
         new ReconstructECContainersCommandHandler(conf, supervisor,
         ecReconstructionCoordinator);
+
+    pipelineCommandExecutorService = Executors
+        .newSingleThreadExecutor(new ThreadFactoryBuilder()
+            .setNameFormat("PipelineCommandHandlerThread-%d").build());
+
     // When we add new handlers just adding a new handler here should do the
     // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
@@ -229,8 +235,10 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new DeleteContainerCommandHandler(
             dnConf.getContainerDeleteThreads(), clock,
             dnConf.getCommandQueueLimit()))
-        .addHandler(new ClosePipelineCommandHandler())
-        .addHandler(new CreatePipelineCommandHandler(conf))
+        .addHandler(
+            new ClosePipelineCommandHandler(pipelineCommandExecutorService))
+        .addHandler(new CreatePipelineCommandHandler(conf,
+            pipelineCommandExecutorService))
         .addHandler(new SetNodeOperationalStateCommandHandler(conf,
             supervisor::nodeStateUpdated))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
@@ -410,20 +418,8 @@ public class DatanodeStateMachine implements Closeable {
     context.setState(DatanodeStates.getLastState());
     replicationSupervisorMetrics.unRegister();
     ecReconstructionMetrics.unRegister();
-    executorService.shutdown();
-    try {
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        executorService.shutdownNow();
-      }
-
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOG.error("Unable to shutdown state machine properly.");
-      }
-    } catch (InterruptedException e) {
-      LOG.error("Error attempting to shutdown.", e);
-      executorService.shutdownNow();
-      Thread.currentThread().interrupt();
-    }
+    executorServiceShutdownGraceful(executorService);
+    executorServiceShutdownGraceful(pipelineCommandExecutorService);
 
     if (connectionManager != null) {
       connectionManager.close();
@@ -446,6 +442,23 @@ public class DatanodeStateMachine implements Closeable {
     }
   }
 
+  private void executorServiceShutdownGraceful(ExecutorService executor) {
+    executor.shutdown();
+    try {
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        executor.shutdownNow();
+      }
+
+      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown state machine properly.");
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Error attempting to shutdown.", e);
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
   /**
    * States that a datanode  can be in. GetNextState will move this enum from
    * getInitState to getLastState.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 4efd55de55..0116721ba1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -34,6 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
 
-  private AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private long totalTime;
+  private final Executor executor;
 
   /**
    * Constructs a closePipelineCommand handler.
    */
-  public ClosePipelineCommandHandler() {
+  public ClosePipelineCommandHandler(Executor executor) {
+    this.executor = executor;
   }
 
   /**
@@ -64,29 +70,33 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    invocationCount.incrementAndGet();
-    final long startTime = Time.monotonicNow();
-    final DatanodeDetails dn = context.getParent().getDatanodeDetails();
-    ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand) command;
-    final PipelineID pipelineID = closePipelineCommand.getPipelineID();
-    final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+    queuedCount.incrementAndGet();
+    CompletableFuture.runAsync(() -> {
+      invocationCount.incrementAndGet();
+      final long startTime = Time.monotonicNow();
+      final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+      ClosePipelineCommand closePipelineCommand =
+          (ClosePipelineCommand) command;
+      final PipelineID pipelineID = closePipelineCommand.getPipelineID();
+      final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
 
-    try {
-      XceiverServerSpi server = ozoneContainer.getWriteChannel();
-      if (server.isExist(pipelineIdProto)) {
-        server.removeGroup(pipelineIdProto);
-        LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
-            dn.getUuidString());
-      } else {
-        LOG.debug("Ignoring close pipeline command for pipeline {} " +
-            "as it does not exist", pipelineID);
+      try {
+        XceiverServerSpi server = ozoneContainer.getWriteChannel();
+        if (server.isExist(pipelineIdProto)) {
+          server.removeGroup(pipelineIdProto);
+          LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
+              dn.getUuidString());
+        } else {
+          LOG.debug("Ignoring close pipeline command for pipeline {} " +
+              "as it does not exist", pipelineID);
+        }
+      } catch (IOException e) {
+        LOG.error("Can't close pipeline {}", pipelineID, e);
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
       }
-    } catch (IOException e) {
-      LOG.error("Can't close pipeline {}", pipelineID, e);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
+    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
   }
 
   /**
@@ -124,6 +134,6 @@ public class ClosePipelineCommandHandler implements 
CommandHandler {
 
   @Override
   public int getQueuedCount() {
-    return 0;
+    return queuedCount.get();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index ae22af92b3..d55e410db2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -17,6 +17,9 @@
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.List;
 import java.util.function.BiFunction;
@@ -53,20 +56,25 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
       LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
 
   private final AtomicLong invocationCount = new AtomicLong(0);
+  private final AtomicInteger queuedCount = new AtomicInteger(0);
   private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
 
   private long totalTime;
+  private final Executor executor;
 
   /**
    * Constructs a createPipelineCommand handler.
    */
-  public CreatePipelineCommandHandler(ConfigurationSource conf) {
-    this(RatisHelper.newRaftClient(conf));
+  public CreatePipelineCommandHandler(ConfigurationSource conf,
+                                      Executor executor) {
+    this(RatisHelper.newRaftClient(conf), executor);
   }
 
   CreatePipelineCommandHandler(
-      BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient) {
+      BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+      Executor executor) {
     this.newRaftClient = newRaftClient;
+    this.executor = executor;
   }
 
   /**
@@ -80,52 +88,56 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
   @Override
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
-    invocationCount.incrementAndGet();
-    final long startTime = Time.monotonicNow();
-    final DatanodeDetails dn = context.getParent()
-        .getDatanodeDetails();
-    final CreatePipelineCommand createCommand = (CreatePipelineCommand) 
command;
-    final PipelineID pipelineID = createCommand.getPipelineID();
-    final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
-    final List<DatanodeDetails> peers = createCommand.getNodeList();
-    final List<Integer> priorityList = createCommand.getPriorityList();
+    queuedCount.incrementAndGet();
+    CompletableFuture.runAsync(() -> {
+      invocationCount.incrementAndGet();
+      final long startTime = Time.monotonicNow();
+      final DatanodeDetails dn = context.getParent()
+          .getDatanodeDetails();
+      final CreatePipelineCommand createCommand =
+          (CreatePipelineCommand) command;
+      final PipelineID pipelineID = createCommand.getPipelineID();
+      final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+      final List<DatanodeDetails> peers = createCommand.getNodeList();
+      final List<Integer> priorityList = createCommand.getPriorityList();
 
-    try {
-      XceiverServerSpi server = ozoneContainer.getWriteChannel();
-      if (!server.isExist(pipelineIdProto)) {
-        final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
-        final RaftGroup group =
-            RatisHelper.newRaftGroup(groupId, peers, priorityList);
-        server.addGroup(pipelineIdProto, peers, priorityList);
-        peers.stream().filter(
-            d -> !d.getUuid().equals(dn.getUuid()))
-            .forEach(d -> {
-              final RaftPeer peer = RatisHelper.toRaftPeer(d);
-              try (RaftClient client = newRaftClient.apply(peer,
-                  ozoneContainer.getTlsClientConfig())) {
-                client.getGroupManagementApi(peer.getId()).add(group);
-              } catch (AlreadyExistsException ae) {
-                // do not log
-              } catch (IOException ioe) {
-                LOG.warn("Add group failed for {}", d, ioe);
-              }
-            });
-        LOG.info("Created Pipeline {} {} {}.",
-            createCommand.getReplicationType(), createCommand.getFactor(),
-            pipelineID);
+      try {
+        XceiverServerSpi server = ozoneContainer.getWriteChannel();
+        if (!server.isExist(pipelineIdProto)) {
+          final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+          final RaftGroup group =
+              RatisHelper.newRaftGroup(groupId, peers, priorityList);
+          server.addGroup(pipelineIdProto, peers, priorityList);
+          peers.stream().filter(
+              d -> !d.getUuid().equals(dn.getUuid()))
+              .forEach(d -> {
+                final RaftPeer peer = RatisHelper.toRaftPeer(d);
+                try (RaftClient client = newRaftClient.apply(peer,
+                    ozoneContainer.getTlsClientConfig())) {
+                  client.getGroupManagementApi(peer.getId()).add(group);
+                } catch (AlreadyExistsException ae) {
+                  // do not log
+                } catch (IOException ioe) {
+                  LOG.warn("Add group failed for {}", d, ioe);
+                }
+              });
+          LOG.info("Created Pipeline {} {} {}.",
+              createCommand.getReplicationType(), createCommand.getFactor(),
+              pipelineID);
+        }
+      } catch (IOException e) {
+        // The server.addGroup may exec after a getGroupManagementApi call
+        // from another peer, so we may got an AlreadyExistsException.
+        if (!(e.getCause() instanceof AlreadyExistsException)) {
+          LOG.error("Can't create pipeline {} {} {}",
+              createCommand.getReplicationType(),
+              createCommand.getFactor(), pipelineID, e);
+        }
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
       }
-    } catch (IOException e) {
-      // The server.addGroup may exec after a getGroupManagementApi call
-      // from another peer, so we may got an AlreadyExistsException.
-      if (!(e.getCause() instanceof AlreadyExistsException)) {
-        LOG.error("Can't create pipeline {} {} {}",
-            createCommand.getReplicationType(),
-            createCommand.getFactor(), pipelineID, e);
-      }
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
+    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
   }
 
   /**
@@ -163,6 +175,6 @@ public class CreatePipelineCommandHandler implements 
CommandHandler {
 
   @Override
   public int getQueuedCount() {
-    return 0;
+    return queuedCount.get();
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index 350be67e8d..681d63d92b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -92,7 +93,8 @@ public class TestCreatePipelineCommandHandler {
         .thenReturn(false);
 
     final CreatePipelineCommandHandler commandHandler =
-        new CreatePipelineCommandHandler((leader, tls) -> raftClient);
+        new CreatePipelineCommandHandler((leader, tls) -> raftClient,
+            MoreExecutors.directExecutor());
     commandHandler.handle(command, ozoneContainer, stateContext,
         connectionManager);
 
@@ -124,7 +126,8 @@ public class TestCreatePipelineCommandHandler {
         .thenReturn(true);
 
     final CreatePipelineCommandHandler commandHandler =
-        new CreatePipelineCommandHandler(new OzoneConfiguration());
+        new CreatePipelineCommandHandler(new OzoneConfiguration(),
+            MoreExecutors.directExecutor());
     commandHandler.handle(command, ozoneContainer, stateContext,
         connectionManager);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to