This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c990899e40 HDDS-9913. Reduce number of times configuration is loaded 
in Ozone client (#5789)
c990899e40 is described below

commit c990899e40b4c69b600119b5c1c721726d2f4555
Author: XiChen <[email protected]>
AuthorDate: Sun Dec 17 00:56:28 2023 +0800

    HDDS-9913. Reduce number of times configuration is loaded in Ozone client 
(#5789)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |   5 +
 .../apache/hadoop/hdds/scm/StreamBufferArgs.java   | 136 +++++++++++++++++++++
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  22 ++--
 .../hdds/scm/storage/ECBlockOutputStream.java      |   6 +-
 .../hdds/scm/storage/RatisBlockOutputStream.java   |   6 +-
 .../storage/TestBlockOutputStreamCorrectness.java  |   5 +-
 .../ECReconstructionCoordinator.java               |   5 +-
 .../ozone/client/io/BlockOutputStreamEntry.java    |  18 ++-
 .../client/io/BlockOutputStreamEntryPool.java      |  27 ++--
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  17 ++-
 .../client/io/ECBlockOutputStreamEntryPool.java    |   6 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |   8 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  31 +++--
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  20 +--
 .../hadoop/ozone/client/OzoneOutputStreamStub.java |  11 +-
 .../hadoop/ozone/freon/OmMetadataGenerator.java    |   4 +-
 16 files changed, 269 insertions(+), 58 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 80f86a677a..44af34cb91 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -254,6 +255,7 @@ public class OzoneClientConfig {
     return streamBufferFlushSize;
   }
 
+  @VisibleForTesting
   public void setStreamBufferFlushSize(long streamBufferFlushSize) {
     this.streamBufferFlushSize = streamBufferFlushSize;
   }
@@ -262,6 +264,7 @@ public class OzoneClientConfig {
     return streamBufferSize;
   }
 
+  @VisibleForTesting
   public void setStreamBufferSize(int streamBufferSize) {
     this.streamBufferSize = streamBufferSize;
   }
@@ -270,6 +273,7 @@ public class OzoneClientConfig {
     return streamBufferFlushDelay;
   }
 
+  @VisibleForTesting
   public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
     this.streamBufferFlushDelay = streamBufferFlushDelay;
   }
@@ -278,6 +282,7 @@ public class OzoneClientConfig {
     return streamBufferMaxSize;
   }
 
+  @VisibleForTesting
   public void setStreamBufferMaxSize(long streamBufferMaxSize) {
     this.streamBufferMaxSize = streamBufferMaxSize;
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
new file mode 100644
index 0000000000..4772cb90fb
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * This class encapsulates the arguments that are
+ * required for Ozone client StreamBuffer.
+ */
+public class StreamBufferArgs {
+
+  private int streamBufferSize;
+  private long streamBufferFlushSize;
+  private long streamBufferMaxSize;
+  private boolean streamBufferFlushDelay;
+
+  protected StreamBufferArgs(Builder builder) {
+    this.streamBufferSize = builder.bufferSize;
+    this.streamBufferFlushSize = builder.bufferFlushSize;
+    this.streamBufferMaxSize = builder.bufferMaxSize;
+    this.streamBufferFlushDelay = builder.streamBufferFlushDelay;
+  }
+
+  public int getStreamBufferSize() {
+    return streamBufferSize;
+  }
+
+  public long getStreamBufferFlushSize() {
+    return streamBufferFlushSize;
+  }
+
+  public long getStreamBufferMaxSize() {
+    return streamBufferMaxSize;
+  }
+
+  public boolean isStreamBufferFlushDelay() {
+    return streamBufferFlushDelay;
+  }
+
+  public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
+    this.streamBufferFlushDelay = streamBufferFlushDelay;
+  }
+
+  protected void setStreamBufferSize(int streamBufferSize) {
+    this.streamBufferSize = streamBufferSize;
+  }
+
+  protected void setStreamBufferFlushSize(long streamBufferFlushSize) {
+    this.streamBufferFlushSize = streamBufferFlushSize;
+  }
+
+  protected void setStreamBufferMaxSize(long streamBufferMaxSize) {
+    this.streamBufferMaxSize = streamBufferMaxSize;
+  }
+
+  /**
+   * Builder class for StreamBufferArgs.
+   */
+  public static class Builder {
+    private int bufferSize;
+    private long bufferFlushSize;
+    private long bufferMaxSize;
+    private boolean streamBufferFlushDelay;
+
+    public Builder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public Builder setBufferFlushSize(long bufferFlushSize) {
+      this.bufferFlushSize = bufferFlushSize;
+      return this;
+    }
+
+    public Builder setBufferMaxSize(long bufferMaxSize) {
+      this.bufferMaxSize = bufferMaxSize;
+      return this;
+    }
+
+    public Builder setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
+      this.streamBufferFlushDelay = streamBufferFlushDelay;
+      return this;
+    }
+
+    public StreamBufferArgs build() {
+      return new StreamBufferArgs(this);
+    }
+
+    public static Builder getNewBuilder() {
+      return new Builder();
+    }
+  }
+
+  public static StreamBufferArgs getDefaultStreamBufferArgs(
+      ReplicationConfig replicationConfig, OzoneClientConfig clientConfig) {
+    int bufferSize;
+    long flushSize;
+    long bufferMaxSize;
+    boolean streamBufferFlushDelay = clientConfig.isStreamBufferFlushDelay();
+    if (replicationConfig.getReplicationType() == 
HddsProtos.ReplicationType.EC) {
+      bufferSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
+      flushSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
+      bufferMaxSize = ((ECReplicationConfig) 
replicationConfig).getEcChunkSize();
+    } else {
+      bufferSize = clientConfig.getStreamBufferSize();
+      flushSize = clientConfig.getStreamBufferFlushSize();
+      bufferMaxSize = clientConfig.getStreamBufferMaxSize();
+    }
+
+    return Builder.getNewBuilder()
+        .setBufferSize(bufferSize)
+        .setBufferFlushSize(flushSize)
+        .setBufferMaxSize(bufferMaxSize)
+        .setStreamBufferFlushDelay(streamBufferFlushDelay)
+        .build();
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 9988fb258e..b97165084f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -88,6 +89,7 @@ public class BlockOutputStream extends OutputStream {
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
   private OzoneClientConfig config;
+  private StreamBufferArgs streamBufferArgs;
 
   private int chunkIndex;
   private final AtomicLong chunkOffset = new AtomicLong();
@@ -134,6 +136,7 @@ public class BlockOutputStream extends OutputStream {
    * @param pipeline             pipeline where block will be written
    * @param bufferPool           pool of buffers
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public BlockOutputStream(
       BlockID blockID,
       XceiverClientFactory xceiverClientManager,
@@ -141,7 +144,7 @@ public class BlockOutputStream extends OutputStream {
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
   ) throws IOException {
     this.xceiverClientFactory = xceiverClientManager;
     this.config = config;
@@ -166,12 +169,12 @@ public class BlockOutputStream extends OutputStream {
 
     //number of buffers used before doing a flush
     refreshCurrentBuffer();
-    flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+    flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / 
streamBufferArgs
         .getStreamBufferSize());
 
     Preconditions
         .checkArgument(
-            (long) flushPeriod * config.getStreamBufferSize() == config
+            (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == 
streamBufferArgs
                 .getStreamBufferFlushSize());
 
     // A single thread executor handle the responses of async requests
@@ -185,6 +188,7 @@ public class BlockOutputStream extends OutputStream {
         config.getBytesPerChecksum());
     this.clientMetrics = clientMetrics;
     this.pipeline = pipeline;
+    this.streamBufferArgs = streamBufferArgs;
   }
 
   void refreshCurrentBuffer() {
@@ -321,7 +325,7 @@ public class BlockOutputStream extends OutputStream {
   }
 
   private boolean isBufferPoolFull() {
-    return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
+    return bufferPool.computeBufferData() == 
streamBufferArgs.getStreamBufferMaxSize();
   }
 
   /**
@@ -339,7 +343,7 @@ public class BlockOutputStream extends OutputStream {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Retrying write length {} for blockID {}", len, blockID);
     }
-    Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
+    Preconditions.checkArgument(len <= 
streamBufferArgs.getStreamBufferMaxSize());
     int count = 0;
     while (len > 0) {
       ChunkBuffer buffer = bufferPool.getBuffer(count);
@@ -355,13 +359,13 @@ public class BlockOutputStream extends OutputStream {
       // the buffer. We should just validate
       // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
       // call for handling full buffer/flush buffer condition.
-      if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
+      if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 
0) {
         // reset the position to zero as now we will be reading the
         // next buffer in the list
         updateFlushLength();
         executePutBlock(false, false);
       }
-      if (writtenDataLength == config.getStreamBufferMaxSize()) {
+      if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
         handleFullBuffer();
       }
     }
@@ -518,9 +522,9 @@ public class BlockOutputStream extends OutputStream {
   public void flush() throws IOException {
     if (xceiverClientFactory != null && xceiverClient != null
         && bufferPool != null && bufferPool.getSize() > 0
-        && (!config.isStreamBufferFlushDelay() ||
+        && (!streamBufferArgs.isStreamBufferFlushDelay() ||
             writtenDataLength - totalDataFlushedLength
-                >= config.getStreamBufferSize())) {
+                >= streamBufferArgs.getStreamBufferSize())) {
       handleFlush(false);
     }
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 90cf4743f8..1d7fdc1df6 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -66,6 +67,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
    * @param pipeline             pipeline where block will be written
    * @param bufferPool           pool of buffers
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public ECBlockOutputStream(
       BlockID blockID,
       XceiverClientFactory xceiverClientManager,
@@ -73,10 +75,10 @@ public class ECBlockOutputStream extends BlockOutputStream {
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
   ) throws IOException {
     super(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token, clientMetrics);
+        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
     // In EC stream, there will be only one node in pipeline.
     this.datanodeDetails = pipeline.getClosestNode();
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index ede7057496..ee708bf0de 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -71,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
    * @param blockID              block ID
    * @param bufferPool           pool of buffers
    */
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public RatisBlockOutputStream(
       BlockID blockID,
       XceiverClientFactory xceiverClientManager,
@@ -78,10 +80,10 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
       BufferPool bufferPool,
       OzoneClientConfig config,
       Token<? extends TokenIdentifier> token,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
   ) throws IOException {
     super(blockID, xceiverClientManager, pipeline,
-        bufferPool, config, token, clientMetrics);
+        bufferPool, config, token, clientMetrics, streamBufferArgs);
     this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index a2a0832bdb..29c0798df7 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -103,6 +104,8 @@ public class TestBlockOutputStreamCorrectness {
     config.setStreamBufferFlushSize(16 * 1024 * 1024);
     config.setChecksumType(ChecksumType.NONE);
     config.setBytesPerChecksum(256 * 1024);
+    StreamBufferArgs streamBufferArgs =
+        
StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(), 
config);
 
     return new RatisBlockOutputStream(
         new BlockID(1L, 1L),
@@ -111,7 +114,7 @@ public class TestBlockOutputStreamCorrectness {
         bufferPool,
         config,
         null,
-        ContainerClientMetrics.acquire());
+        ContainerClientMetrics.acquire(), streamBufferArgs);
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 80348dbe45..24e76821f9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -223,13 +224,15 @@ public class ECReconstructionCoordinator implements 
Closeable {
       BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
       ECReplicationConfig repConfig, int replicaIndex,
       OzoneClientConfig configuration) throws IOException {
+    StreamBufferArgs streamBufferArgs =
+        StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, configuration);
     return new ECBlockOutputStream(
         blockLocationInfo.getBlockID(),
         containerOperationClient.getXceiverClientManager(),
         containerOperationClient.singleNodePipeline(datanodeDetails,
             repConfig, replicaIndex),
         BufferPool.empty(), configuration,
-        blockLocationInfo.getToken(), clientMetrics);
+        blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2501803fc0..9bdec27f53 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
@@ -61,6 +62,7 @@ public class BlockOutputStreamEntry extends OutputStream {
 
   private BufferPool bufferPool;
   private ContainerClientMetrics clientMetrics;
+  private StreamBufferArgs streamBufferArgs;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   BlockOutputStreamEntry(
@@ -71,7 +73,7 @@ public class BlockOutputStreamEntry extends OutputStream {
       BufferPool bufferPool,
       Token<OzoneBlockTokenIdentifier> token,
       OzoneClientConfig config,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
   ) {
     this.config = config;
     this.outputStream = null;
@@ -84,6 +86,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     this.currentPosition = 0;
     this.bufferPool = bufferPool;
     this.clientMetrics = clientMetrics;
+    this.streamBufferArgs = streamBufferArgs;
   }
 
   /**
@@ -105,13 +108,17 @@ public class BlockOutputStreamEntry extends OutputStream {
    */
   void createOutputStream() throws IOException {
     outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
-        pipeline, bufferPool, config, token, clientMetrics);
+        pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
   }
 
   ContainerClientMetrics getClientMetrics() {
     return clientMetrics;
   }
 
+  StreamBufferArgs getStreamBufferArgs() {
+    return streamBufferArgs;
+  }
+
   @Override
   public void write(int b) throws IOException {
     checkStream();
@@ -353,6 +360,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
     private ContainerClientMetrics clientMetrics;
+    private StreamBufferArgs streamBufferArgs;
 
     public Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -398,6 +406,10 @@ public class BlockOutputStreamEntry extends OutputStream {
       this.clientMetrics = clientMetrics;
       return this;
     }
+    public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
+      this.streamBufferArgs = streamBufferArgs;
+      return this;
+    }
 
     public BlockOutputStreamEntry build() {
       return new BlockOutputStreamEntry(blockID,
@@ -406,7 +418,7 @@ public class BlockOutputStreamEntry extends OutputStream {
           pipeline,
           length,
           bufferPool,
-          token, config, clientMetrics);
+          token, config, clientMetrics, streamBufferArgs);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 573e4a8dd3..d0f3b5728a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -27,10 +27,10 @@ import java.util.ListIterator;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -84,6 +84,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   private final long openID;
   private final ExcludeList excludeList;
   private final ContainerClientMetrics clientMetrics;
+  private final StreamBufferArgs streamBufferArgs;
 
   @SuppressWarnings({"parameternumber", "squid:S00107"})
   public BlockOutputStreamEntryPool(
@@ -94,7 +95,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
       boolean isMultipart, OmKeyInfo info,
       boolean unsafeByteBufferConversion,
       XceiverClientFactory xceiverClientFactory, long openID,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
   ) {
     this.config = config;
     this.xceiverClientFactory = xceiverClientFactory;
@@ -111,12 +112,13 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     this.excludeList = createExcludeList();
 
     this.bufferPool =
-        new BufferPool(config.getStreamBufferSize(),
-            (int) (config.getStreamBufferMaxSize() / config
+        new BufferPool(streamBufferArgs.getStreamBufferSize(),
+            (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
                 .getStreamBufferSize()),
             ByteStringConversion
                 .createByteBufferConversion(unsafeByteBufferConversion));
     this.clientMetrics = clientMetrics;
+    this.streamBufferArgs = streamBufferArgs;
   }
 
   ExcludeList createExcludeList() {
@@ -124,17 +126,14 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
         Clock.system(ZoneOffset.UTC));
   }
 
-  BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics) {
+  BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
+      OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
     streamEntries = new ArrayList<>();
     omClient = null;
     keyArgs = null;
     xceiverClientFactory = null;
-    config =
-        new OzoneConfiguration().getObject(OzoneClientConfig.class);
-    config.setStreamBufferSize(0);
-    config.setStreamBufferMaxSize(0);
-    config.setStreamBufferFlushSize(0);
-    config.setStreamBufferFlushDelay(false);
+    config = clientConfig;
+    streamBufferArgs.setStreamBufferFlushDelay(false);
     requestID = null;
     int chunkSize = 0;
     bufferPool = new BufferPool(chunkSize, 1);
@@ -143,6 +142,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     openID = -1;
     excludeList = createExcludeList();
     this.clientMetrics = clientMetrics;
+    this.streamBufferArgs = null;
   }
 
   /**
@@ -189,6 +189,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
             .setBufferPool(bufferPool)
             .setToken(subKeyInfo.getToken())
             .setClientMetrics(clientMetrics)
+            .setStreamBufferArgs(streamBufferArgs)
             .build();
   }
 
@@ -255,6 +256,10 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     return clientMetrics;
   }
 
+  StreamBufferArgs getStreamBufferArgs() {
+    return streamBufferArgs;
+  }
+
   /**
    * Discards the subsequent pre allocated blocks and removes the streamEntries
    * from the streamEntries list for the container which is closed.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 26d11f3d64..07d0f46069 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
@@ -78,9 +79,10 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
   ECBlockOutputStreamEntry(BlockID blockID, String key,
       XceiverClientFactory xceiverClientManager, Pipeline pipeline, long 
length,
       BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
-      OzoneClientConfig config, ContainerClientMetrics clientMetrics) {
+      OzoneClientConfig config, ContainerClientMetrics clientMetrics,
+      StreamBufferArgs streamBufferArgs) {
     super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
-        token, config, clientMetrics);
+        token, config, clientMetrics, streamBufferArgs);
     assertInstanceOf(
         pipeline.getReplicationConfig(), ECReplicationConfig.class);
     this.replicationConfig =
@@ -99,7 +101,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
         streams[i] =
             new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
                 createSingleECBlockPipeline(getPipeline(), nodes.get(i), i + 
1),
-                getBufferPool(), getConf(), getToken(), getClientMetrics());
+                getBufferPool(), getConf(), getToken(), getClientMetrics(), 
getStreamBufferArgs());
       }
       blockOutputStreams = streams;
     }
@@ -441,6 +443,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
     private Token<OzoneBlockTokenIdentifier> token;
     private OzoneClientConfig config;
     private ContainerClientMetrics clientMetrics;
+    private StreamBufferArgs streamBufferArgs;
 
     public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
       this.blockID = bID;
@@ -492,6 +495,12 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
       return this;
     }
 
+    public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
+        StreamBufferArgs args) {
+      this.streamBufferArgs = args;
+      return this;
+    }
+
     public ECBlockOutputStreamEntry build() {
       return new ECBlockOutputStreamEntry(blockID,
           key,
@@ -499,7 +508,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
           pipeline,
           length,
           bufferPool,
-          token, config, clientMetrics);
+          token, config, clientMetrics, streamBufferArgs);
     }
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index acc70d0dda..e551605d84 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -56,10 +57,10 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
       boolean unsafeByteBufferConversion,
       XceiverClientFactory xceiverClientFactory,
       long openID,
-      ContainerClientMetrics clientMetrics) {
+      ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs) 
{
     super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
         isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
-        openID, clientMetrics);
+        openID, clientMetrics, streamBufferArgs);
     assert replicationConfig instanceof ECReplicationConfig;
   }
 
@@ -82,6 +83,7 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
             .setBufferPool(getBufferPool())
             .setToken(subKeyInfo.getToken())
             .setClientMetrics(getClientMetrics())
+            .setStreamBufferArgs(getStreamBufferArgs())
             .build();
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 15ebccda28..b5c36474ff 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -128,14 +128,12 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
   }
 
   private ECKeyOutputStream(Builder builder) {
-    super(builder.getReplicationConfig(), builder.getClientMetrics());
+    super(builder.getReplicationConfig(), builder.getClientMetrics(),
+        builder.getClientConfig(), builder.getStreamBufferArgs());
     this.config = builder.getClientConfig();
     this.bufferPool = builder.getByteBufferPool();
     // For EC, cell/chunk size and buffer size can be same for now.
     ecChunkSize = builder.getReplicationConfig().getEcChunkSize();
-    this.config.setStreamBufferMaxSize(ecChunkSize);
-    this.config.setStreamBufferFlushSize(ecChunkSize);
-    this.config.setStreamBufferSize(ecChunkSize);
     this.numDataBlks = builder.getReplicationConfig().getData();
     this.numParityBlks = builder.getReplicationConfig().getParity();
     ecChunkBufferCache = new ECChunkBuffers(
@@ -151,7 +149,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
             builder.isMultipartKey(),
             info, builder.isUnsafeByteBufferConversionEnabled(),
             builder.getXceiverManager(), builder.getOpenHandler().getId(),
-            builder.getClientMetrics());
+            builder.getClientMetrics(), builder.getStreamBufferArgs());
 
     this.writeOffset = 0;
     this.encoder = CodecUtil.createRawEncoderWithFallback(
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 4e0c4c91fa..8b128e9cd9 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -94,6 +95,7 @@ public class KeyOutputStream extends OutputStream
   private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
 
   private long clientID;
+  private StreamBufferArgs streamBufferArgs;
 
   /**
    * Indicates if an atomic write is required. When set to true,
@@ -104,8 +106,10 @@ public class KeyOutputStream extends OutputStream
   private boolean atomicKeyCreation;
 
   public KeyOutputStream(ReplicationConfig replicationConfig,
-      ContainerClientMetrics clientMetrics) {
+      ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig,
+      StreamBufferArgs streamBufferArgs) {
     this.replication = replicationConfig;
+    this.config = clientConfig;
     closed = false;
     this.retryPolicyMap = HddsClientUtils.getExceptionList()
         .stream()
@@ -113,7 +117,8 @@ public class KeyOutputStream extends OutputStream
             e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
     retryCount = 0;
     offset = 0;
-    blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(clientMetrics);
+    blockOutputStreamEntryPool =
+        new BlockOutputStreamEntryPool(clientMetrics, clientConfig, 
streamBufferArgs);
   }
 
   @VisibleForTesting
@@ -151,7 +156,7 @@ public class KeyOutputStream extends OutputStream
       String uploadID, int partNumber, boolean isMultipart,
       boolean unsafeByteBufferConversion,
       ContainerClientMetrics clientMetrics,
-      boolean atomicKeyCreation
+      boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs
   ) {
     this.config = config;
     this.replication = replicationConfig;
@@ -165,7 +170,7 @@ public class KeyOutputStream extends OutputStream
             unsafeByteBufferConversion,
             xceiverClientManager,
             handler.getId(),
-            clientMetrics);
+            clientMetrics, streamBufferArgs);
     this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval());
     this.retryCount = 0;
@@ -173,6 +178,7 @@ public class KeyOutputStream extends OutputStream
     this.writeOffset = 0;
     this.clientID = handler.getId();
     this.atomicKeyCreation = atomicKeyCreation;
+    this.streamBufferArgs = streamBufferArgs;
   }
 
   /**
@@ -279,7 +285,7 @@ public class KeyOutputStream extends OutputStream
       // to or less than the max length of the buffer allocated.
       // The len specified here is the combined sum of the data length of
       // the buffers
-      Preconditions.checkState(!retry || len <= config
+      Preconditions.checkState(!retry || len <= streamBufferArgs
           .getStreamBufferMaxSize());
       int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
       writeLen = retry ? (int) len : dataWritten;
@@ -330,7 +336,7 @@ public class KeyOutputStream extends OutputStream
           pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
     }
     Preconditions.checkArgument(
-        bufferedDataLen <= config.getStreamBufferMaxSize());
+        bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize());
     Preconditions.checkArgument(
         offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
@@ -608,6 +614,7 @@ public class KeyOutputStream extends OutputStream
     private ReplicationConfig replicationConfig;
     private ContainerClientMetrics clientMetrics;
     private boolean atomicKeyCreation = false;
+    private StreamBufferArgs streamBufferArgs;
 
     public String getMultipartUploadID() {
       return multipartUploadID;
@@ -676,6 +683,15 @@ public class KeyOutputStream extends OutputStream
       return this;
     }
 
+    public StreamBufferArgs getStreamBufferArgs() {
+      return streamBufferArgs;
+    }
+
+    public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
+      this.streamBufferArgs = streamBufferArgs;
+      return this;
+    }
+
     public boolean isUnsafeByteBufferConversionEnabled() {
       return unsafeByteBufferConversion;
     }
@@ -725,7 +741,8 @@ public class KeyOutputStream extends OutputStream
           isMultipartKey,
           unsafeByteBufferConversion,
           clientMetrics,
-          atomicKeyCreation);
+          atomicKeyCreation,
+          streamBufferArgs);
     }
 
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 5302c88520..850ae0d193 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.protocol.StorageType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
@@ -207,6 +208,7 @@ public class RpcClient implements ClientProtocol {
   private final boolean topologyAwareReadEnabled;
   private final boolean checkKeyNameEnabled;
   private final OzoneClientConfig clientConfig;
+  private final ReplicationConfigValidator replicationConfigValidator;
   private final Cache<URI, KeyProvider> keyProviderCache;
   private final boolean getLatestVersionLocation;
   private final ByteBufferPool byteBufferPool;
@@ -230,6 +232,8 @@ public class RpcClient implements ClientProtocol {
     this.ugi = UserGroupInformation.getCurrentUser();
     // Get default acl rights for user and group.
     OzoneAclConfig aclConfig = this.conf.getObject(OzoneAclConfig.class);
+    replicationConfigValidator =
+        this.conf.getObject(ReplicationConfigValidator.class);
     this.userRights = aclConfig.getUserDefaultRights();
     this.groupRights = aclConfig.getGroupDefaultRights();
 
@@ -1343,9 +1347,7 @@ public class RpcClient implements ClientProtocol {
     }
 
     if (replicationConfig != null) {
-      ReplicationConfigValidator validator =
-              this.conf.getObject(ReplicationConfigValidator.class);
-      validator.validate(replicationConfig);
+      replicationConfigValidator.validate(replicationConfig);
     }
 
     OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1854,7 +1856,7 @@ public class RpcClient implements ClientProtocol {
             .setMultipartUploadID(uploadID)
             .setIsMultipartKey(true)
             .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-            .setConfig(conf.getObject(OzoneClientConfig.class))
+            .setConfig(clientConfig)
             .setAtomicKeyCreation(isS3GRequest.get())
             .build();
     keyOutputStream
@@ -2269,7 +2271,7 @@ public class RpcClient implements ClientProtocol {
             .setOmClient(ozoneManagerClient)
             .setReplicationConfig(replicationConfig)
             .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-            .setConfig(conf.getObject(OzoneClientConfig.class))
+            .setConfig(clientConfig)
             .setAtomicKeyCreation(isS3GRequest.get())
             .build();
     keyOutputStream
@@ -2279,6 +2281,7 @@ public class RpcClient implements ClientProtocol {
         openKey, keyOutputStream, null);
     return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
   }
+
   private OzoneOutputStream createOutputStream(OpenKeySession openKey)
       throws IOException {
     KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
@@ -2336,6 +2339,8 @@ public class RpcClient implements ClientProtocol {
 
     ReplicationConfig replicationConfig =
         openKey.getKeyInfo().getReplicationConfig();
+    StreamBufferArgs streamBufferArgs = 
StreamBufferArgs.getDefaultStreamBufferArgs(
+        replicationConfig, clientConfig);
     if (replicationConfig.getReplicationType() ==
         HddsProtos.ReplicationType.EC) {
       builder = new ECKeyOutputStream.Builder()
@@ -2351,9 +2356,10 @@ public class RpcClient implements ClientProtocol {
         .setXceiverClientManager(xceiverClientManager)
         .setOmClient(ozoneManagerClient)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
-        .setConfig(conf.getObject(OzoneClientConfig.class))
+        .setConfig(clientConfig)
         .setAtomicKeyCreation(isS3GRequest.get())
-        .setClientMetrics(clientMetrics);
+        .setClientMetrics(clientMetrics)
+        .setStreamBufferArgs(streamBufferArgs);
   }
 
   @Override
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index 00a7ba5574..9835160029 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.ozone.client;
 
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -74,8 +76,13 @@ public class OzoneOutputStreamStub extends OzoneOutputStream 
{
 
   @Override
   public KeyOutputStream getKeyOutputStream() {
-    return new KeyOutputStream(
-        ReplicationConfig.getDefault(new OzoneConfiguration()), null) {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ReplicationConfig replicationConfig =
+        ReplicationConfig.getDefault(conf);
+    OzoneClientConfig ozoneClientConfig = 
conf.getObject(OzoneClientConfig.class);
+    StreamBufferArgs streamBufferArgs =
+        StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig, 
ozoneClientConfig);
+    return new KeyOutputStream(replicationConfig, null, ozoneClientConfig, 
streamBufferArgs) {
       @Override
       public synchronized OmMultipartCommitUploadPartInfo
           getCommitUploadPartInfo() {
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
index b214b0968a..c94048e00d 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
@@ -326,8 +326,8 @@ public class OmMetadataGenerator extends BaseFreonGenerator
     case CREATE_KEY:
       keyName = getPath(counter);
       getMetrics().timer(operation.name()).time(() -> {
-        try (OutputStream stream = bucket.createKey(keyName,
-            dataSize.toBytes())) {
+        try (OutputStream stream = bucket.createStreamKey(keyName,
+            dataSize.toBytes(), replicationConfig, new HashMap<>())) {
           contentGenerator.write(stream);
         }
         return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to