This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit e186bfc26810ca606ba6ac971d3c4fbb147baba1 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Mon Nov 1 23:39:53 2021 +0800 HDDS-5763. Provide an Executor for each LocalStream in ContainerStateMachine (#2782) --- .../transport/server/ratis/ContainerStateMachine.java | 7 +++++-- .../common/transport/server/ratis/LocalStream.java | 10 +++++++++- .../transport/server/ratis/XceiverServerRatis.java | 5 ----- .../hadoop/hdds/conf/DatanodeRatisServerConfig.java | 17 ----------------- 4 files changed, 14 insertions(+), 25 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 121a6d6bdd..83255e0450 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -527,8 +527,11 @@ public class ContainerStateMachine extends BaseStateMachine { ContainerCommandResponseProto response = runCommand( requestProto, context); - String path = response.getMessage(); - return new LocalStream(new StreamDataChannel(Paths.get(path))); + final StreamDataChannel channel = new StreamDataChannel( + Paths.get(response.getMessage())); + final ExecutorService chunkExecutor = requestProto.hasWriteChunk() ? + getChunkExecutor(requestProto.getWriteChunk()) : null; + return new LocalStream(channel, chunkExecutor); } catch (IOException e) { throw new CompletionException("Failed to create data stream", e); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java index baae013966..780f874398 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/LocalStream.java @@ -23,12 +23,15 @@ import org.apache.ratis.statemachine.StateMachine; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; class LocalStream implements StateMachine.DataStream { private final StateMachine.DataChannel dataChannel; + private final Executor executor; - LocalStream(StateMachine.DataChannel dataChannel) { + LocalStream(StateMachine.DataChannel dataChannel, Executor executor) { this.dataChannel = dataChannel; + this.executor = executor; } @Override @@ -47,4 +50,9 @@ class LocalStream implements StateMachine.DataStream { } }); } + + @Override + public Executor getExecutor() { + return executor; + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 2fcc07fc23..6b0ad0e41e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -237,11 +237,6 @@ public final class XceiverServerRatis implements XceiverServerSpi { .getStreamRequestThreads(); RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties, dataStreamAsyncRequestThreadPoolSize); - int dataStreamWriteRequestThreadPoolSize = - conf.getObject(DatanodeRatisServerConfig.class) - .getStreamWriteThreads(); - RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, - dataStreamWriteRequestThreadPoolSize); int dataStreamClientPoolSize = conf.getObject(DatanodeRatisServerConfig.class) .getClientPoolSize(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java index 3132928abe..058932e769 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java @@ -141,23 +141,6 @@ public class DatanodeRatisServerConfig { this.streamRequestThreads = streamRequestThreads; } - @Config(key = "datastream.write.threads", - defaultValue = "20", - type = ConfigType.INT, - tags = {OZONE, DATANODE, RATIS, DATASTREAM}, - description = "Maximum number of threads in the thread pool for " + - "datastream write." - ) - private int streamWriteThreads; - - public int getStreamWriteThreads() { - return streamWriteThreads; - } - - public void setStreamWriteThreads(int streamWriteThreads) { - this.streamWriteThreads = streamWriteThreads; - } - @Config(key = "datastream.client.pool.size", defaultValue = "10", type = ConfigType.INT, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
