This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-5763
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 8aeb983d7c1e9f61f7ec1a41133a326dc4b6828e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 28 15:29:01 2021 +0800

    HDDS-5763. Set raft.server.data-stream.async.write.thread.pool.size conf in 
datanode.
---
 .../transport/server/ratis/ContainerStateMachine.java    |  7 +++++--
 .../common/transport/server/ratis/LocalStream.java       | 10 +++++++++-
 .../transport/server/ratis/XceiverServerRatis.java       |  5 +++++
 .../hadoop/hdds/conf/DatanodeRatisServerConfig.java      | 16 ++++++++++++++++
 4 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index dda1fb3..acccac1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -515,8 +515,11 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
         ContainerCommandResponseProto response = runCommand(
             requestProto, context);
-        String path = response.getMessage();
-        return new LocalStream(new StreamDataChannel(Paths.get(path)));
+        final StreamDataChannel channel = new StreamDataChannel(
+            Paths.get(response.getMessage()));
+        final ExecutorService executor = requestProto.hasWriteChunk() ?
+            getChunkExecutor(requestProto.getWriteChunk()) : null;
+        return new LocalStream(channel, executor);
       } catch (IOException e) {
         throw new CompletionException("Failed to create data stream", e);
       }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
index baae013..780f874 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java
@@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 
 class LocalStream implements StateMachine.DataStream {
   private final StateMachine.DataChannel dataChannel;
+  private final Executor executor;
 
-  LocalStream(StateMachine.DataChannel dataChannel) {
+  LocalStream(StateMachine.DataChannel dataChannel, Executor executor) {
     this.dataChannel = dataChannel;
+    this.executor = executor;
   }
 
   @Override
@@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream {
       }
     });
   }
+
+  @Override
+  public Executor getExecutor() {
+    return executor;
+  }
 }
\ No newline at end of file
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 9f7b03f..189c517 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -249,6 +249,11 @@ public final class XceiverServerRatis implements 
XceiverServerSpi {
             .getClientPoolSize();
     RaftServerConfigKeys.DataStream.setClientPoolSize(properties,
         dataStreamClientPoolSize);
+    final int asyncWriteThreadPoolPoolSize =
+        conf.getObject(DatanodeRatisServerConfig.class)
+            .getAsyncWriteThreadPoolPoolSize();
+    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
+        asyncWriteThreadPoolPoolSize);
   }
 
   @SuppressWarnings("checkstyle:methodlength")
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 3132928..c0538bb 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -175,6 +175,22 @@ public class DatanodeRatisServerConfig {
     this.clientPoolSize = clientPoolSize;
   }
 
+  @Config(key = "datastream.async.write.thread.pool.size",
+      defaultValue = "16",
+      type = ConfigType.INT,
+      tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+      description = "The number of threads for handling datastream write."
+  )
+  private int asyncWriteThreadPoolPoolSize;
+
+  public int getAsyncWriteThreadPoolPoolSize() {
+    return asyncWriteThreadPoolPoolSize;
+  }
+
+  public void setAsyncWriteThreadPoolPoolSize(int 
asyncWriteThreadPoolPoolSize) {
+    this.asyncWriteThreadPoolPoolSize = asyncWriteThreadPoolPoolSize;
+  }
+
   @Config(key = "delete.ratis.log.directory",
           defaultValue = "true",
           type = ConfigType.BOOLEAN,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to