This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 812d23f070 HDDS-8624. Process pipeline commands asynchronously in
datanode (#4713)
812d23f070 is described below
commit 812d23f0708d6c93945620e812f02665a60e1ba3
Author: hao guo <[email protected]>
AuthorDate: Thu May 25 17:37:30 2023 +0800
HDDS-8624. Process pipeline commands asynchronously in datanode (#4713)
---
.../common/statemachine/DatanodeStateMachine.java | 45 ++++++---
.../ClosePipelineCommandHandler.java | 58 ++++++-----
.../CreatePipelineCommandHandler.java | 108 ++++++++++++---------
.../TestCreatePipelineCommandHandler.java | 7 +-
4 files changed, 128 insertions(+), 90 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 73d064f04b..1cfaa44da7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -92,6 +92,7 @@ public class DatanodeStateMachine implements Closeable {
static final Logger LOG =
LoggerFactory.getLogger(DatanodeStateMachine.class);
private final ExecutorService executorService;
+ private final ExecutorService pipelineCommandExecutorService;
private final ConfigurationSource conf;
private final SCMConnectionManager connectionManager;
private final ECReconstructionCoordinator ecReconstructionCoordinator;
@@ -216,6 +217,11 @@ public class DatanodeStateMachine implements Closeable {
reconstructECContainersCommandHandler =
new ReconstructECContainersCommandHandler(conf, supervisor,
ecReconstructionCoordinator);
+
+ pipelineCommandExecutorService = Executors
+ .newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("PipelineCommandHandlerThread-%d").build());
+
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
@@ -229,8 +235,10 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new DeleteContainerCommandHandler(
dnConf.getContainerDeleteThreads(), clock,
dnConf.getCommandQueueLimit()))
- .addHandler(new ClosePipelineCommandHandler())
- .addHandler(new CreatePipelineCommandHandler(conf))
+ .addHandler(
+ new ClosePipelineCommandHandler(pipelineCommandExecutorService))
+ .addHandler(new CreatePipelineCommandHandler(conf,
+ pipelineCommandExecutorService))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
supervisor::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
@@ -410,20 +418,8 @@ public class DatanodeStateMachine implements Closeable {
context.setState(DatanodeStates.getLastState());
replicationSupervisorMetrics.unRegister();
ecReconstructionMetrics.unRegister();
- executorService.shutdown();
- try {
- if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
- executorService.shutdownNow();
- }
-
- if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
- LOG.error("Unable to shutdown state machine properly.");
- }
- } catch (InterruptedException e) {
- LOG.error("Error attempting to shutdown.", e);
- executorService.shutdownNow();
- Thread.currentThread().interrupt();
- }
+ executorServiceShutdownGraceful(executorService);
+ executorServiceShutdownGraceful(pipelineCommandExecutorService);
if (connectionManager != null) {
connectionManager.close();
@@ -446,6 +442,23 @@ public class DatanodeStateMachine implements Closeable {
}
}
+ private void executorServiceShutdownGraceful(ExecutorService executor) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.error("Unable to shutdown state machine properly.");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error attempting to shutdown.", e);
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
/**
* States that a datanode can be in. GetNextState will move this enum from
* getInitState to getLastState.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 4efd55de55..0116721ba1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -34,6 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -44,13 +47,16 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
- private AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private long totalTime;
+ private final Executor executor;
/**
* Constructs a closePipelineCommand handler.
*/
- public ClosePipelineCommandHandler() {
+ public ClosePipelineCommandHandler(Executor executor) {
+ this.executor = executor;
}
/**
@@ -64,29 +70,33 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
- invocationCount.incrementAndGet();
- final long startTime = Time.monotonicNow();
- final DatanodeDetails dn = context.getParent().getDatanodeDetails();
- ClosePipelineCommand closePipelineCommand = (ClosePipelineCommand) command;
- final PipelineID pipelineID = closePipelineCommand.getPipelineID();
- final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+ ClosePipelineCommand closePipelineCommand =
+ (ClosePipelineCommand) command;
+ final PipelineID pipelineID = closePipelineCommand.getPipelineID();
+ final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
- try {
- XceiverServerSpi server = ozoneContainer.getWriteChannel();
- if (server.isExist(pipelineIdProto)) {
- server.removeGroup(pipelineIdProto);
- LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
- dn.getUuidString());
- } else {
- LOG.debug("Ignoring close pipeline command for pipeline {} " +
- "as it does not exist", pipelineID);
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ if (server.isExist(pipelineIdProto)) {
+ server.removeGroup(pipelineIdProto);
+ LOG.info("Close Pipeline {} command on datanode {}.", pipelineID,
+ dn.getUuidString());
+ } else {
+ LOG.debug("Ignoring close pipeline command for pipeline {} " +
+ "as it does not exist", pipelineID);
+ }
+ } catch (IOException e) {
+ LOG.error("Can't close pipeline {}", pipelineID, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
}
- } catch (IOException e) {
- LOG.error("Can't close pipeline {}", pipelineID, e);
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
- }
+ }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
/**
@@ -124,6 +134,6 @@ public class ClosePipelineCommandHandler implements
CommandHandler {
@Override
public int getQueuedCount() {
- return 0;
+ return queuedCount.get();
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index ae22af92b3..d55e410db2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.function.BiFunction;
@@ -53,20 +56,25 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
private final BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient;
private long totalTime;
+ private final Executor executor;
/**
* Constructs a createPipelineCommand handler.
*/
- public CreatePipelineCommandHandler(ConfigurationSource conf) {
- this(RatisHelper.newRaftClient(conf));
+ public CreatePipelineCommandHandler(ConfigurationSource conf,
+ Executor executor) {
+ this(RatisHelper.newRaftClient(conf), executor);
}
CreatePipelineCommandHandler(
- BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient) {
+ BiFunction<RaftPeer, GrpcTlsConfig, RaftClient> newRaftClient,
+ Executor executor) {
this.newRaftClient = newRaftClient;
+ this.executor = executor;
}
/**
@@ -80,52 +88,56 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
- invocationCount.incrementAndGet();
- final long startTime = Time.monotonicNow();
- final DatanodeDetails dn = context.getParent()
- .getDatanodeDetails();
- final CreatePipelineCommand createCommand = (CreatePipelineCommand)
command;
- final PipelineID pipelineID = createCommand.getPipelineID();
- final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
- final List<DatanodeDetails> peers = createCommand.getNodeList();
- final List<Integer> priorityList = createCommand.getPriorityList();
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails dn = context.getParent()
+ .getDatanodeDetails();
+ final CreatePipelineCommand createCommand =
+ (CreatePipelineCommand) command;
+ final PipelineID pipelineID = createCommand.getPipelineID();
+ final HddsProtos.PipelineID pipelineIdProto = pipelineID.getProtobuf();
+ final List<DatanodeDetails> peers = createCommand.getNodeList();
+ final List<Integer> priorityList = createCommand.getPriorityList();
- try {
- XceiverServerSpi server = ozoneContainer.getWriteChannel();
- if (!server.isExist(pipelineIdProto)) {
- final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
- final RaftGroup group =
- RatisHelper.newRaftGroup(groupId, peers, priorityList);
- server.addGroup(pipelineIdProto, peers, priorityList);
- peers.stream().filter(
- d -> !d.getUuid().equals(dn.getUuid()))
- .forEach(d -> {
- final RaftPeer peer = RatisHelper.toRaftPeer(d);
- try (RaftClient client = newRaftClient.apply(peer,
- ozoneContainer.getTlsClientConfig())) {
- client.getGroupManagementApi(peer.getId()).add(group);
- } catch (AlreadyExistsException ae) {
- // do not log
- } catch (IOException ioe) {
- LOG.warn("Add group failed for {}", d, ioe);
- }
- });
- LOG.info("Created Pipeline {} {} {}.",
- createCommand.getReplicationType(), createCommand.getFactor(),
- pipelineID);
+ try {
+ XceiverServerSpi server = ozoneContainer.getWriteChannel();
+ if (!server.isExist(pipelineIdProto)) {
+ final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+ final RaftGroup group =
+ RatisHelper.newRaftGroup(groupId, peers, priorityList);
+ server.addGroup(pipelineIdProto, peers, priorityList);
+ peers.stream().filter(
+ d -> !d.getUuid().equals(dn.getUuid()))
+ .forEach(d -> {
+ final RaftPeer peer = RatisHelper.toRaftPeer(d);
+ try (RaftClient client = newRaftClient.apply(peer,
+ ozoneContainer.getTlsClientConfig())) {
+ client.getGroupManagementApi(peer.getId()).add(group);
+ } catch (AlreadyExistsException ae) {
+ // do not log
+ } catch (IOException ioe) {
+ LOG.warn("Add group failed for {}", d, ioe);
+ }
+ });
+ LOG.info("Created Pipeline {} {} {}.",
+ createCommand.getReplicationType(), createCommand.getFactor(),
+ pipelineID);
+ }
+ } catch (IOException e) {
+ // The server.addGroup may exec after a getGroupManagementApi call
+ // from another peer, so we may got an AlreadyExistsException.
+ if (!(e.getCause() instanceof AlreadyExistsException)) {
+ LOG.error("Can't create pipeline {} {} {}",
+ createCommand.getReplicationType(),
+ createCommand.getFactor(), pipelineID, e);
+ }
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
}
- } catch (IOException e) {
- // The server.addGroup may exec after a getGroupManagementApi call
- // from another peer, so we may got an AlreadyExistsException.
- if (!(e.getCause() instanceof AlreadyExistsException)) {
- LOG.error("Can't create pipeline {} {} {}",
- createCommand.getReplicationType(),
- createCommand.getFactor(), pipelineID, e);
- }
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
- }
+ }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
/**
@@ -163,6 +175,6 @@ public class CreatePipelineCommandHandler implements
CommandHandler {
@Override
public int getQueuedCount() {
- return 0;
+ return queuedCount.get();
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index 350be67e8d..681d63d92b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@ -92,7 +93,8 @@ public class TestCreatePipelineCommandHandler {
.thenReturn(false);
final CreatePipelineCommandHandler commandHandler =
- new CreatePipelineCommandHandler((leader, tls) -> raftClient);
+ new CreatePipelineCommandHandler((leader, tls) -> raftClient,
+ MoreExecutors.directExecutor());
commandHandler.handle(command, ozoneContainer, stateContext,
connectionManager);
@@ -124,7 +126,8 @@ public class TestCreatePipelineCommandHandler {
.thenReturn(true);
final CreatePipelineCommandHandler commandHandler =
- new CreatePipelineCommandHandler(new OzoneConfiguration());
+ new CreatePipelineCommandHandler(new OzoneConfiguration(),
+ MoreExecutors.directExecutor());
commandHandler.handle(command, ozoneContainer, stateContext,
connectionManager);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]