This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c990899e40 HDDS-9913. Reduce number of times configuration is loaded
in Ozone client (#5789)
c990899e40 is described below
commit c990899e40b4c69b600119b5c1c721726d2f4555
Author: XiChen <[email protected]>
AuthorDate: Sun Dec 17 00:56:28 2023 +0800
HDDS-9913. Reduce number of times configuration is loaded in Ozone client
(#5789)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 5 +
.../apache/hadoop/hdds/scm/StreamBufferArgs.java | 136 +++++++++++++++++++++
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 22 ++--
.../hdds/scm/storage/ECBlockOutputStream.java | 6 +-
.../hdds/scm/storage/RatisBlockOutputStream.java | 6 +-
.../storage/TestBlockOutputStreamCorrectness.java | 5 +-
.../ECReconstructionCoordinator.java | 5 +-
.../ozone/client/io/BlockOutputStreamEntry.java | 18 ++-
.../client/io/BlockOutputStreamEntryPool.java | 27 ++--
.../ozone/client/io/ECBlockOutputStreamEntry.java | 17 ++-
.../client/io/ECBlockOutputStreamEntryPool.java | 6 +-
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 8 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 31 +++--
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 20 +--
.../hadoop/ozone/client/OzoneOutputStreamStub.java | 11 +-
.../hadoop/ozone/freon/OmMetadataGenerator.java | 4 +-
16 files changed, 269 insertions(+), 58 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 80f86a677a..44af34cb91 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
@@ -254,6 +255,7 @@ public class OzoneClientConfig {
return streamBufferFlushSize;
}
+ @VisibleForTesting
public void setStreamBufferFlushSize(long streamBufferFlushSize) {
this.streamBufferFlushSize = streamBufferFlushSize;
}
@@ -262,6 +264,7 @@ public class OzoneClientConfig {
return streamBufferSize;
}
+ @VisibleForTesting
public void setStreamBufferSize(int streamBufferSize) {
this.streamBufferSize = streamBufferSize;
}
@@ -270,6 +273,7 @@ public class OzoneClientConfig {
return streamBufferFlushDelay;
}
+ @VisibleForTesting
public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
this.streamBufferFlushDelay = streamBufferFlushDelay;
}
@@ -278,6 +282,7 @@ public class OzoneClientConfig {
return streamBufferMaxSize;
}
+ @VisibleForTesting
public void setStreamBufferMaxSize(long streamBufferMaxSize) {
this.streamBufferMaxSize = streamBufferMaxSize;
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
new file mode 100644
index 0000000000..4772cb90fb
--- /dev/null
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/StreamBufferArgs.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+/**
+ * This class encapsulates the arguments that are
+ * required for Ozone client StreamBuffer.
+ */
+public class StreamBufferArgs {
+
+ private int streamBufferSize;
+ private long streamBufferFlushSize;
+ private long streamBufferMaxSize;
+ private boolean streamBufferFlushDelay;
+
+ protected StreamBufferArgs(Builder builder) {
+ this.streamBufferSize = builder.bufferSize;
+ this.streamBufferFlushSize = builder.bufferFlushSize;
+ this.streamBufferMaxSize = builder.bufferMaxSize;
+ this.streamBufferFlushDelay = builder.streamBufferFlushDelay;
+ }
+
+ public int getStreamBufferSize() {
+ return streamBufferSize;
+ }
+
+ public long getStreamBufferFlushSize() {
+ return streamBufferFlushSize;
+ }
+
+ public long getStreamBufferMaxSize() {
+ return streamBufferMaxSize;
+ }
+
+ public boolean isStreamBufferFlushDelay() {
+ return streamBufferFlushDelay;
+ }
+
+ public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
+ this.streamBufferFlushDelay = streamBufferFlushDelay;
+ }
+
+ protected void setStreamBufferSize(int streamBufferSize) {
+ this.streamBufferSize = streamBufferSize;
+ }
+
+ protected void setStreamBufferFlushSize(long streamBufferFlushSize) {
+ this.streamBufferFlushSize = streamBufferFlushSize;
+ }
+
+ protected void setStreamBufferMaxSize(long streamBufferMaxSize) {
+ this.streamBufferMaxSize = streamBufferMaxSize;
+ }
+
+ /**
+ * Builder class for StreamBufferArgs.
+ */
+ public static class Builder {
+ private int bufferSize;
+ private long bufferFlushSize;
+ private long bufferMaxSize;
+ private boolean streamBufferFlushDelay;
+
+ public Builder setBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public Builder setBufferFlushSize(long bufferFlushSize) {
+ this.bufferFlushSize = bufferFlushSize;
+ return this;
+ }
+
+ public Builder setBufferMaxSize(long bufferMaxSize) {
+ this.bufferMaxSize = bufferMaxSize;
+ return this;
+ }
+
+ public Builder setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
+ this.streamBufferFlushDelay = streamBufferFlushDelay;
+ return this;
+ }
+
+ public StreamBufferArgs build() {
+ return new StreamBufferArgs(this);
+ }
+
+ public static Builder getNewBuilder() {
+ return new Builder();
+ }
+ }
+
+ public static StreamBufferArgs getDefaultStreamBufferArgs(
+ ReplicationConfig replicationConfig, OzoneClientConfig clientConfig) {
+ int bufferSize;
+ long flushSize;
+ long bufferMaxSize;
+ boolean streamBufferFlushDelay = clientConfig.isStreamBufferFlushDelay();
+ if (replicationConfig.getReplicationType() ==
HddsProtos.ReplicationType.EC) {
+ bufferSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
+ flushSize = ((ECReplicationConfig) replicationConfig).getEcChunkSize();
+ bufferMaxSize = ((ECReplicationConfig)
replicationConfig).getEcChunkSize();
+ } else {
+ bufferSize = clientConfig.getStreamBufferSize();
+ flushSize = clientConfig.getStreamBufferFlushSize();
+ bufferMaxSize = clientConfig.getStreamBufferMaxSize();
+ }
+
+ return Builder.getNewBuilder()
+ .setBufferSize(bufferSize)
+ .setBufferFlushSize(flushSize)
+ .setBufferMaxSize(bufferMaxSize)
+ .setStreamBufferFlushDelay(streamBufferFlushDelay)
+ .build();
+ }
+}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 9988fb258e..b97165084f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -88,6 +89,7 @@ public class BlockOutputStream extends OutputStream {
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private OzoneClientConfig config;
+ private StreamBufferArgs streamBufferArgs;
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
@@ -134,6 +136,7 @@ public class BlockOutputStream extends OutputStream {
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
public BlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
@@ -141,7 +144,7 @@ public class BlockOutputStream extends OutputStream {
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
@@ -166,12 +169,12 @@ public class BlockOutputStream extends OutputStream {
//number of buffers used before doing a flush
refreshCurrentBuffer();
- flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+ flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() /
streamBufferArgs
.getStreamBufferSize());
Preconditions
.checkArgument(
- (long) flushPeriod * config.getStreamBufferSize() == config
+ (long) flushPeriod * streamBufferArgs.getStreamBufferSize() ==
streamBufferArgs
.getStreamBufferFlushSize());
// A single thread executor handle the responses of async requests
@@ -185,6 +188,7 @@ public class BlockOutputStream extends OutputStream {
config.getBytesPerChecksum());
this.clientMetrics = clientMetrics;
this.pipeline = pipeline;
+ this.streamBufferArgs = streamBufferArgs;
}
void refreshCurrentBuffer() {
@@ -321,7 +325,7 @@ public class BlockOutputStream extends OutputStream {
}
private boolean isBufferPoolFull() {
- return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
+ return bufferPool.computeBufferData() ==
streamBufferArgs.getStreamBufferMaxSize();
}
/**
@@ -339,7 +343,7 @@ public class BlockOutputStream extends OutputStream {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
}
- Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
+ Preconditions.checkArgument(len <=
streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
while (len > 0) {
ChunkBuffer buffer = bufferPool.getBuffer(count);
@@ -355,13 +359,13 @@ public class BlockOutputStream extends OutputStream {
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
- if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
+ if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() ==
0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateFlushLength();
executePutBlock(false, false);
}
- if (writtenDataLength == config.getStreamBufferMaxSize()) {
+ if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
}
}
@@ -518,9 +522,9 @@ public class BlockOutputStream extends OutputStream {
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
- && (!config.isStreamBufferFlushDelay() ||
+ && (!streamBufferArgs.isStreamBufferFlushDelay() ||
writtenDataLength - totalDataFlushedLength
- >= config.getStreamBufferSize())) {
+ >= streamBufferArgs.getStreamBufferSize())) {
handleFlush(false);
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 90cf4743f8..1d7fdc1df6 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -25,6 +25,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -66,6 +67,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
@@ -73,10 +75,10 @@ public class ECBlockOutputStream extends BlockOutputStream {
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
super(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token, clientMetrics);
+ pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index ede7057496..ee708bf0de 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -71,6 +72,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
* @param blockID block ID
* @param bufferPool pool of buffers
*/
+ @SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
BlockID blockID,
XceiverClientFactory xceiverClientManager,
@@ -78,10 +80,10 @@ public class RatisBlockOutputStream extends
BlockOutputStream
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
- bufferPool, config, token, clientMetrics);
+ bufferPool, config, token, clientMetrics, streamBufferArgs);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index a2a0832bdb..29c0798df7 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -36,6 +36,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -103,6 +104,8 @@ public class TestBlockOutputStreamCorrectness {
config.setStreamBufferFlushSize(16 * 1024 * 1024);
config.setChecksumType(ChecksumType.NONE);
config.setBytesPerChecksum(256 * 1024);
+ StreamBufferArgs streamBufferArgs =
+
StreamBufferArgs.getDefaultStreamBufferArgs(pipeline.getReplicationConfig(),
config);
return new RatisBlockOutputStream(
new BlockID(1L, 1L),
@@ -111,7 +114,7 @@ public class TestBlockOutputStreamCorrectness {
bufferPool,
config,
null,
- ContainerClientMetrics.acquire());
+ ContainerClientMetrics.acquire(), streamBufferArgs);
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 80348dbe45..24e76821f9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -223,13 +224,15 @@ public class ECReconstructionCoordinator implements
Closeable {
BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
ECReplicationConfig repConfig, int replicaIndex,
OzoneClientConfig configuration) throws IOException {
+ StreamBufferArgs streamBufferArgs =
+ StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, configuration);
return new ECBlockOutputStream(
blockLocationInfo.getBlockID(),
containerOperationClient.getXceiverClientManager(),
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), configuration,
- blockLocationInfo.getToken(), clientMetrics);
+ blockLocationInfo.getToken(), clientMetrics, streamBufferArgs);
}
@VisibleForTesting
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 2501803fc0..9bdec27f53 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
@@ -61,6 +62,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private BufferPool bufferPool;
private ContainerClientMetrics clientMetrics;
+ private StreamBufferArgs streamBufferArgs;
@SuppressWarnings({"parameternumber", "squid:S00107"})
BlockOutputStreamEntry(
@@ -71,7 +73,7 @@ public class BlockOutputStreamEntry extends OutputStream {
BufferPool bufferPool,
Token<OzoneBlockTokenIdentifier> token,
OzoneClientConfig config,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
this.outputStream = null;
@@ -84,6 +86,7 @@ public class BlockOutputStreamEntry extends OutputStream {
this.currentPosition = 0;
this.bufferPool = bufferPool;
this.clientMetrics = clientMetrics;
+ this.streamBufferArgs = streamBufferArgs;
}
/**
@@ -105,13 +108,17 @@ public class BlockOutputStreamEntry extends OutputStream {
*/
void createOutputStream() throws IOException {
outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token, clientMetrics);
+ pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
}
ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}
+ StreamBufferArgs getStreamBufferArgs() {
+ return streamBufferArgs;
+ }
+
@Override
public void write(int b) throws IOException {
checkStream();
@@ -353,6 +360,7 @@ public class BlockOutputStreamEntry extends OutputStream {
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
+ private StreamBufferArgs streamBufferArgs;
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -398,6 +406,10 @@ public class BlockOutputStreamEntry extends OutputStream {
this.clientMetrics = clientMetrics;
return this;
}
+ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
+ this.streamBufferArgs = streamBufferArgs;
+ return this;
+ }
public BlockOutputStreamEntry build() {
return new BlockOutputStreamEntry(blockID,
@@ -406,7 +418,7 @@ public class BlockOutputStreamEntry extends OutputStream {
pipeline,
length,
bufferPool,
- token, config, clientMetrics);
+ token, config, clientMetrics, streamBufferArgs);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 573e4a8dd3..d0f3b5728a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -27,10 +27,10 @@ import java.util.ListIterator;
import java.util.Map;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -84,6 +84,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
private final long openID;
private final ExcludeList excludeList;
private final ContainerClientMetrics clientMetrics;
+ private final StreamBufferArgs streamBufferArgs;
@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
@@ -94,7 +95,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
boolean isMultipart, OmKeyInfo info,
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory, long openID,
- ContainerClientMetrics clientMetrics
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
) {
this.config = config;
this.xceiverClientFactory = xceiverClientFactory;
@@ -111,12 +112,13 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
this.excludeList = createExcludeList();
this.bufferPool =
- new BufferPool(config.getStreamBufferSize(),
- (int) (config.getStreamBufferMaxSize() / config
+ new BufferPool(streamBufferArgs.getStreamBufferSize(),
+ (int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
.getStreamBufferSize()),
ByteStringConversion
.createByteBufferConversion(unsafeByteBufferConversion));
this.clientMetrics = clientMetrics;
+ this.streamBufferArgs = streamBufferArgs;
}
ExcludeList createExcludeList() {
@@ -124,17 +126,14 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
Clock.system(ZoneOffset.UTC));
}
- BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics) {
+ BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
+ OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
streamEntries = new ArrayList<>();
omClient = null;
keyArgs = null;
xceiverClientFactory = null;
- config =
- new OzoneConfiguration().getObject(OzoneClientConfig.class);
- config.setStreamBufferSize(0);
- config.setStreamBufferMaxSize(0);
- config.setStreamBufferFlushSize(0);
- config.setStreamBufferFlushDelay(false);
+ config = clientConfig;
+ streamBufferArgs.setStreamBufferFlushDelay(false);
requestID = null;
int chunkSize = 0;
bufferPool = new BufferPool(chunkSize, 1);
@@ -143,6 +142,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
openID = -1;
excludeList = createExcludeList();
this.clientMetrics = clientMetrics;
+ this.streamBufferArgs = null;
}
/**
@@ -189,6 +189,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
.setBufferPool(bufferPool)
.setToken(subKeyInfo.getToken())
.setClientMetrics(clientMetrics)
+ .setStreamBufferArgs(streamBufferArgs)
.build();
}
@@ -255,6 +256,10 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
return clientMetrics;
}
+ StreamBufferArgs getStreamBufferArgs() {
+ return streamBufferArgs;
+ }
+
/**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 26d11f3d64..07d0f46069 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
@@ -78,9 +79,10 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
ECBlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, ContainerClientMetrics clientMetrics) {
+ OzoneClientConfig config, ContainerClientMetrics clientMetrics,
+ StreamBufferArgs streamBufferArgs) {
super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
- token, config, clientMetrics);
+ token, config, clientMetrics, streamBufferArgs);
assertInstanceOf(
pipeline.getReplicationConfig(), ECReplicationConfig.class);
this.replicationConfig =
@@ -99,7 +101,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
streams[i] =
new ECBlockOutputStream(getBlockID(), getXceiverClientManager(),
createSingleECBlockPipeline(getPipeline(), nodes.get(i), i +
1),
- getBufferPool(), getConf(), getToken(), getClientMetrics());
+ getBufferPool(), getConf(), getToken(), getClientMetrics(),
getStreamBufferArgs());
}
blockOutputStreams = streams;
}
@@ -441,6 +443,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
private Token<OzoneBlockTokenIdentifier> token;
private OzoneClientConfig config;
private ContainerClientMetrics clientMetrics;
+ private StreamBufferArgs streamBufferArgs;
public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -492,6 +495,12 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
return this;
}
+ public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
+ StreamBufferArgs args) {
+ this.streamBufferArgs = args;
+ return this;
+ }
+
public ECBlockOutputStreamEntry build() {
return new ECBlockOutputStreamEntry(blockID,
key,
@@ -499,7 +508,7 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
pipeline,
length,
bufferPool,
- token, config, clientMetrics);
+ token, config, clientMetrics, streamBufferArgs);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index acc70d0dda..e551605d84 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -56,10 +57,10 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory,
long openID,
- ContainerClientMetrics clientMetrics) {
+ ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs)
{
super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
- openID, clientMetrics);
+ openID, clientMetrics, streamBufferArgs);
assert replicationConfig instanceof ECReplicationConfig;
}
@@ -82,6 +83,7 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
.setClientMetrics(getClientMetrics())
+ .setStreamBufferArgs(getStreamBufferArgs())
.build();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 15ebccda28..b5c36474ff 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -128,14 +128,12 @@ public final class ECKeyOutputStream extends
KeyOutputStream
}
private ECKeyOutputStream(Builder builder) {
- super(builder.getReplicationConfig(), builder.getClientMetrics());
+ super(builder.getReplicationConfig(), builder.getClientMetrics(),
+ builder.getClientConfig(), builder.getStreamBufferArgs());
this.config = builder.getClientConfig();
this.bufferPool = builder.getByteBufferPool();
// For EC, cell/chunk size and buffer size can be same for now.
ecChunkSize = builder.getReplicationConfig().getEcChunkSize();
- this.config.setStreamBufferMaxSize(ecChunkSize);
- this.config.setStreamBufferFlushSize(ecChunkSize);
- this.config.setStreamBufferSize(ecChunkSize);
this.numDataBlks = builder.getReplicationConfig().getData();
this.numParityBlks = builder.getReplicationConfig().getParity();
ecChunkBufferCache = new ECChunkBuffers(
@@ -151,7 +149,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
builder.isMultipartKey(),
info, builder.isUnsafeByteBufferConversionEnabled(),
builder.getXceiverManager(), builder.getOpenHandler().getId(),
- builder.getClientMetrics());
+ builder.getClientMetrics(), builder.getStreamBufferArgs());
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 4e0c4c91fa..8b128e9cd9 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -94,6 +95,7 @@ public class KeyOutputStream extends OutputStream
private final BlockOutputStreamEntryPool blockOutputStreamEntryPool;
private long clientID;
+ private StreamBufferArgs streamBufferArgs;
/**
* Indicates if an atomic write is required. When set to true,
@@ -104,8 +106,10 @@ public class KeyOutputStream extends OutputStream
private boolean atomicKeyCreation;
public KeyOutputStream(ReplicationConfig replicationConfig,
- ContainerClientMetrics clientMetrics) {
+ ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig,
+ StreamBufferArgs streamBufferArgs) {
this.replication = replicationConfig;
+ this.config = clientConfig;
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
@@ -113,7 +117,8 @@ public class KeyOutputStream extends OutputStream
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
- blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(clientMetrics);
+ blockOutputStreamEntryPool =
+ new BlockOutputStreamEntryPool(clientMetrics, clientConfig,
streamBufferArgs);
}
@VisibleForTesting
@@ -151,7 +156,7 @@ public class KeyOutputStream extends OutputStream
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
ContainerClientMetrics clientMetrics,
- boolean atomicKeyCreation
+ boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs
) {
this.config = config;
this.replication = replicationConfig;
@@ -165,7 +170,7 @@ public class KeyOutputStream extends OutputStream
unsafeByteBufferConversion,
xceiverClientManager,
handler.getId(),
- clientMetrics);
+ clientMetrics, streamBufferArgs);
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
@@ -173,6 +178,7 @@ public class KeyOutputStream extends OutputStream
this.writeOffset = 0;
this.clientID = handler.getId();
this.atomicKeyCreation = atomicKeyCreation;
+ this.streamBufferArgs = streamBufferArgs;
}
/**
@@ -279,7 +285,7 @@ public class KeyOutputStream extends OutputStream
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
- Preconditions.checkState(!retry || len <= config
+ Preconditions.checkState(!retry || len <= streamBufferArgs
.getStreamBufferMaxSize());
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
@@ -330,7 +336,7 @@ public class KeyOutputStream extends OutputStream
pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
}
Preconditions.checkArgument(
- bufferedDataLen <= config.getStreamBufferMaxSize());
+ bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize());
Preconditions.checkArgument(
offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
@@ -608,6 +614,7 @@ public class KeyOutputStream extends OutputStream
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
private boolean atomicKeyCreation = false;
+ private StreamBufferArgs streamBufferArgs;
public String getMultipartUploadID() {
return multipartUploadID;
@@ -676,6 +683,15 @@ public class KeyOutputStream extends OutputStream
return this;
}
+ public StreamBufferArgs getStreamBufferArgs() {
+ return streamBufferArgs;
+ }
+
+ public Builder setStreamBufferArgs(StreamBufferArgs streamBufferArgs) {
+ this.streamBufferArgs = streamBufferArgs;
+ return this;
+ }
+
public boolean isUnsafeByteBufferConversionEnabled() {
return unsafeByteBufferConversion;
}
@@ -725,7 +741,8 @@ public class KeyOutputStream extends OutputStream
isMultipartKey,
unsafeByteBufferConversion,
clientMetrics,
- atomicKeyCreation);
+ atomicKeyCreation,
+ streamBufferArgs);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 5302c88520..850ae0d193 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
@@ -207,6 +208,7 @@ public class RpcClient implements ClientProtocol {
private final boolean topologyAwareReadEnabled;
private final boolean checkKeyNameEnabled;
private final OzoneClientConfig clientConfig;
+ private final ReplicationConfigValidator replicationConfigValidator;
private final Cache<URI, KeyProvider> keyProviderCache;
private final boolean getLatestVersionLocation;
private final ByteBufferPool byteBufferPool;
@@ -230,6 +232,8 @@ public class RpcClient implements ClientProtocol {
this.ugi = UserGroupInformation.getCurrentUser();
// Get default acl rights for user and group.
OzoneAclConfig aclConfig = this.conf.getObject(OzoneAclConfig.class);
+ replicationConfigValidator =
+ this.conf.getObject(ReplicationConfigValidator.class);
this.userRights = aclConfig.getUserDefaultRights();
this.groupRights = aclConfig.getGroupDefaultRights();
@@ -1343,9 +1347,7 @@ public class RpcClient implements ClientProtocol {
}
if (replicationConfig != null) {
- ReplicationConfigValidator validator =
- this.conf.getObject(ReplicationConfigValidator.class);
- validator.validate(replicationConfig);
+ replicationConfigValidator.validate(replicationConfig);
}
OmKeyArgs.Builder builder = new OmKeyArgs.Builder()
@@ -1854,7 +1856,7 @@ public class RpcClient implements ClientProtocol {
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(conf.getObject(OzoneClientConfig.class))
+ .setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
@@ -2269,7 +2271,7 @@ public class RpcClient implements ClientProtocol {
.setOmClient(ozoneManagerClient)
.setReplicationConfig(replicationConfig)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(conf.getObject(OzoneClientConfig.class))
+ .setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.build();
keyOutputStream
@@ -2279,6 +2281,7 @@ public class RpcClient implements ClientProtocol {
openKey, keyOutputStream, null);
return new OzoneDataStreamOutput(out != null ? out : keyOutputStream);
}
+
private OzoneOutputStream createOutputStream(OpenKeySession openKey)
throws IOException {
KeyOutputStream keyOutputStream = createKeyOutputStream(openKey)
@@ -2336,6 +2339,8 @@ public class RpcClient implements ClientProtocol {
ReplicationConfig replicationConfig =
openKey.getKeyInfo().getReplicationConfig();
+ StreamBufferArgs streamBufferArgs =
StreamBufferArgs.getDefaultStreamBufferArgs(
+ replicationConfig, clientConfig);
if (replicationConfig.getReplicationType() ==
HddsProtos.ReplicationType.EC) {
builder = new ECKeyOutputStream.Builder()
@@ -2351,9 +2356,10 @@ public class RpcClient implements ClientProtocol {
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
- .setConfig(conf.getObject(OzoneClientConfig.class))
+ .setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
- .setClientMetrics(clientMetrics);
+ .setClientMetrics(clientMetrics)
+ .setStreamBufferArgs(streamBufferArgs);
}
@Override
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index 00a7ba5574..9835160029 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.ozone.client;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -74,8 +76,13 @@ public class OzoneOutputStreamStub extends OzoneOutputStream
{
@Override
public KeyOutputStream getKeyOutputStream() {
- return new KeyOutputStream(
- ReplicationConfig.getDefault(new OzoneConfiguration()), null) {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ ReplicationConfig replicationConfig =
+ ReplicationConfig.getDefault(conf);
+ OzoneClientConfig ozoneClientConfig =
conf.getObject(OzoneClientConfig.class);
+ StreamBufferArgs streamBufferArgs =
+ StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig,
ozoneClientConfig);
+ return new KeyOutputStream(replicationConfig, null, ozoneClientConfig,
streamBufferArgs) {
@Override
public synchronized OmMultipartCommitUploadPartInfo
getCommitUploadPartInfo() {
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
index b214b0968a..c94048e00d 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmMetadataGenerator.java
@@ -326,8 +326,8 @@ public class OmMetadataGenerator extends BaseFreonGenerator
case CREATE_KEY:
keyName = getPath(counter);
getMetrics().timer(operation.name()).time(() -> {
- try (OutputStream stream = bucket.createKey(keyName,
- dataSize.toBytes())) {
+ try (OutputStream stream = bucket.createStreamKey(keyName,
+ dataSize.toBytes(), replicationConfig, new HashMap<>())) {
contentGenerator.write(stream);
}
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]