This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch HDDS-5763 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 8aeb983d7c1e9f61f7ec1a41133a326dc4b6828e Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Thu Oct 28 15:29:01 2021 +0800 HDDS-5763. Set raft.server.data-stream.async.write.thread.pool.size conf in datanode. --- .../transport/server/ratis/ContainerStateMachine.java | 7 +++++-- .../common/transport/server/ratis/LocalStream.java | 10 +++++++++- .../transport/server/ratis/XceiverServerRatis.java | 5 +++++ .../hadoop/hdds/conf/DatanodeRatisServerConfig.java | 16 ++++++++++++++++ 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index dda1fb3..acccac1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -515,8 +515,11 @@ public class ContainerStateMachine extends BaseStateMachine { ContainerCommandResponseProto response = runCommand( requestProto, context); - String path = response.getMessage(); - return new LocalStream(new StreamDataChannel(Paths.get(path))); + final StreamDataChannel channel = new StreamDataChannel( + Paths.get(response.getMessage())); + final ExecutorService executor = requestProto.hasWriteChunk() ? + getChunkExecutor(requestProto.getWriteChunk()) : null; + return new LocalStream(channel, executor); } catch (IOException e) { throw new CompletionException("Failed to create data stream", e); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java index baae013..780f874 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; class LocalStream implements StateMachine.DataStream { private final StateMachine.DataChannel dataChannel; + private final Executor executor; - LocalStream(StateMachine.DataChannel dataChannel) { + LocalStream(StateMachine.DataChannel dataChannel, Executor executor) { this.dataChannel = dataChannel; + this.executor = executor; } @Override @@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream { } }); } + + @Override + public Executor getExecutor() { + return executor; + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 9f7b03f..189c517 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -249,6 +249,11 @@ public final class XceiverServerRatis implements XceiverServerSpi { .getClientPoolSize(); RaftServerConfigKeys.DataStream.setClientPoolSize(properties, dataStreamClientPoolSize); + final int asyncWriteThreadPoolPoolSize = + conf.getObject(DatanodeRatisServerConfig.class) + .getAsyncWriteThreadPoolPoolSize(); + RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, + asyncWriteThreadPoolPoolSize); } @SuppressWarnings("checkstyle:methodlength") diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java index 3132928..c0538bb 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java @@ -175,6 +175,22 @@ public class DatanodeRatisServerConfig { this.clientPoolSize = clientPoolSize; } + @Config(key = "datastream.async.write.thread.pool.size", + defaultValue = "16", + type = ConfigType.INT, + tags = {OZONE, DATANODE, RATIS, DATASTREAM}, + description = "The number of threads for handling datastream write." + ) + private int asyncWriteThreadPoolPoolSize; + + public int getAsyncWriteThreadPoolPoolSize() { + return asyncWriteThreadPoolPoolSize; + } + + public void setAsyncWriteThreadPoolPoolSize(int asyncWriteThreadPoolPoolSize) { + this.asyncWriteThreadPoolPoolSize = asyncWriteThreadPoolPoolSize; + } + @Config(key = "delete.ratis.log.directory", defaultValue = "true", type = ConfigType.BOOLEAN, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
