This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c6724537d7 HDDS-10387. Fix parameter number warning in KeyOutputStream
and related classes (#6225)
c6724537d7 is described below
commit c6724537d7413d5244bd3843fe7170d954d5d77e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Feb 19 21:59:47 2024 -0800
HDDS-10387. Fix parameter number warning in KeyOutputStream and related
classes (#6225)
---
.../ozone/client/io/BlockOutputStreamEntry.java | 62 +++++-----
.../client/io/BlockOutputStreamEntryPool.java | 65 +++--------
.../ozone/client/io/ECBlockOutputStreamEntry.java | 101 +----------------
.../client/io/ECBlockOutputStreamEntryPool.java | 49 ++------
.../hadoop/ozone/client/io/ECKeyOutputStream.java | 125 ++++++---------------
.../hadoop/ozone/client/io/KeyOutputStream.java | 72 +++---------
.../client/io/TestECBlockOutputStreamEntry.java | 16 +--
.../hadoop/ozone/client/OzoneOutputStreamStub.java | 7 +-
8 files changed, 117 insertions(+), 380 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 9bdec27f53..c0221d07a5 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -37,6 +37,7 @@ import
org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.util.JavaUtils;
/**
* A BlockOutputStreamEntry manages the data writes into the DataNodes.
@@ -60,33 +61,28 @@ public class BlockOutputStreamEntry extends OutputStream {
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;
- private BufferPool bufferPool;
- private ContainerClientMetrics clientMetrics;
- private StreamBufferArgs streamBufferArgs;
-
- @SuppressWarnings({"parameternumber", "squid:S00107"})
- BlockOutputStreamEntry(
- BlockID blockID, String key,
- XceiverClientFactory xceiverClientManager,
- Pipeline pipeline,
- long length,
- BufferPool bufferPool,
- Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
- ) {
- this.config = config;
+ private final BufferPool bufferPool;
+ private final ContainerClientMetrics clientMetrics;
+ private final StreamBufferArgs streamBufferArgs;
+
+ BlockOutputStreamEntry(Builder b) {
+ this.config = b.config;
this.outputStream = null;
- this.blockID = blockID;
- this.key = key;
- this.xceiverClientManager = xceiverClientManager;
- this.pipeline = pipeline;
- this.token = token;
- this.length = length;
+ this.blockID = b.blockID;
+ this.key = b.key;
+ this.xceiverClientManager = b.xceiverClientManager;
+ this.pipeline = b.pipeline;
+ this.token = b.token;
+ this.length = b.length;
this.currentPosition = 0;
- this.bufferPool = bufferPool;
- this.clientMetrics = clientMetrics;
- this.streamBufferArgs = streamBufferArgs;
+ this.bufferPool = b.bufferPool;
+ this.clientMetrics = b.clientMetrics;
+ this.streamBufferArgs = b.streamBufferArgs;
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + ":" + key + " " +
blockID;
}
/**
@@ -362,6 +358,14 @@ public class BlockOutputStreamEntry extends OutputStream {
private ContainerClientMetrics clientMetrics;
private StreamBufferArgs streamBufferArgs;
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
return this;
@@ -412,13 +416,7 @@ public class BlockOutputStreamEntry extends OutputStream {
}
public BlockOutputStreamEntry build() {
- return new BlockOutputStreamEntry(blockID,
- key,
- xceiverClientManager,
- pipeline,
- length,
- bufferPool,
- token, config, clientMetrics, streamBufferArgs);
+ return new BlockOutputStreamEntry(this);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index d0f3b5728a..4d6026f925 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
@@ -62,7 +61,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
/**
* List of stream entries that are used to write a block of data.
*/
- private final List<BlockOutputStreamEntry> streamEntries;
+ private final List<BlockOutputStreamEntry> streamEntries = new ArrayList<>();
private final OzoneClientConfig config;
/**
* The actual stream entry we are writing into. Note that a stream entry is
@@ -73,7 +72,6 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
- private final String requestID;
/**
* A {@link BufferPool} shared between all
* {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
@@ -86,39 +84,31 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
private final ContainerClientMetrics clientMetrics;
private final StreamBufferArgs streamBufferArgs;
- @SuppressWarnings({"parameternumber", "squid:S00107"})
- public BlockOutputStreamEntryPool(
- OzoneClientConfig config,
- OzoneManagerProtocol omClient,
- String requestId, ReplicationConfig replicationConfig,
- String uploadID, int partNumber,
- boolean isMultipart, OmKeyInfo info,
- boolean unsafeByteBufferConversion,
- XceiverClientFactory xceiverClientFactory, long openID,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
- ) {
- this.config = config;
- this.xceiverClientFactory = xceiverClientFactory;
- streamEntries = new ArrayList<>();
+ public BlockOutputStreamEntryPool(KeyOutputStream.Builder b) {
+ this.config = b.getClientConfig();
+ this.xceiverClientFactory = b.getXceiverManager();
currentStreamIndex = 0;
- this.omClient = omClient;
+ this.omClient = b.getOmClient();
+ final OmKeyInfo info = b.getOpenHandler().getKeyInfo();
this.keyArgs = new OmKeyArgs.Builder().setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName()).setKeyName(info.getKeyName())
-
.setReplicationConfig(replicationConfig).setDataSize(info.getDataSize())
- .setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
- .setMultipartUploadPartNumber(partNumber).build();
- this.requestID = requestId;
- this.openID = openID;
+ .setReplicationConfig(b.getReplicationConfig())
+ .setDataSize(info.getDataSize())
+ .setIsMultipartKey(b.isMultipartKey())
+ .setMultipartUploadID(b.getMultipartUploadID())
+ .setMultipartUploadPartNumber(b.getMultipartNumber())
+ .build();
+ this.openID = b.getOpenHandler().getId();
this.excludeList = createExcludeList();
+ this.streamBufferArgs = b.getStreamBufferArgs();
this.bufferPool =
new BufferPool(streamBufferArgs.getStreamBufferSize(),
(int) (streamBufferArgs.getStreamBufferMaxSize() / streamBufferArgs
.getStreamBufferSize()),
ByteStringConversion
- .createByteBufferConversion(unsafeByteBufferConversion));
- this.clientMetrics = clientMetrics;
- this.streamBufferArgs = streamBufferArgs;
+
.createByteBufferConversion(b.isUnsafeByteBufferConversionEnabled()));
+ this.clientMetrics = b.getClientMetrics();
}
ExcludeList createExcludeList() {
@@ -126,25 +116,6 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
Clock.system(ZoneOffset.UTC));
}
- BlockOutputStreamEntryPool(ContainerClientMetrics clientMetrics,
- OzoneClientConfig clientConfig, StreamBufferArgs streamBufferArgs) {
- streamEntries = new ArrayList<>();
- omClient = null;
- keyArgs = null;
- xceiverClientFactory = null;
- config = clientConfig;
- streamBufferArgs.setStreamBufferFlushDelay(false);
- requestID = null;
- int chunkSize = 0;
- bufferPool = new BufferPool(chunkSize, 1);
-
- currentStreamIndex = 0;
- openID = -1;
- excludeList = createExcludeList();
- this.clientMetrics = clientMetrics;
- this.streamBufferArgs = null;
- }
-
/**
* When a key is opened, it is possible that there are some blocks already
* allocated to it for this open session. In this case, to make use of these
@@ -156,10 +127,8 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
- * @throws IOException
*/
- public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
- long openVersion) throws IOException {
+ public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long
openVersion) {
// server may return any number of blocks, (0 to any)
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 07d0f46069..7f6ce87d60 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -23,17 +23,10 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.hdds.utils.IOUtils;
-import org.apache.hadoop.security.token.Token;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,19 +68,10 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
private int currentStreamIdx = 0;
private long successfulBlkGrpAckedLen;
- @SuppressWarnings({"parameternumber", "squid:S00107"})
- ECBlockOutputStreamEntry(BlockID blockID, String key,
- XceiverClientFactory xceiverClientManager, Pipeline pipeline, long
length,
- BufferPool bufferPool, Token<OzoneBlockTokenIdentifier> token,
- OzoneClientConfig config, ContainerClientMetrics clientMetrics,
- StreamBufferArgs streamBufferArgs) {
- super(blockID, key, xceiverClientManager, pipeline, length, bufferPool,
- token, config, clientMetrics, streamBufferArgs);
- assertInstanceOf(
- pipeline.getReplicationConfig(), ECReplicationConfig.class);
- this.replicationConfig =
- (ECReplicationConfig) pipeline.getReplicationConfig();
- this.length = replicationConfig.getData() * length;
+ ECBlockOutputStreamEntry(Builder b) {
+ super(b);
+ this.replicationConfig =
assertInstanceOf(b.getPipeline().getReplicationConfig(),
ECReplicationConfig.class);
+ this.length = replicationConfig.getData() * b.getLength();
}
@Override
@@ -433,82 +417,9 @@ public class ECBlockOutputStreamEntry extends
BlockOutputStreamEntry {
/**
* Builder class for ChunkGroupOutputStreamEntry.
* */
- public static class Builder {
- private BlockID blockID;
- private String key;
- private XceiverClientFactory xceiverClientManager;
- private Pipeline pipeline;
- private long length;
- private BufferPool bufferPool;
- private Token<OzoneBlockTokenIdentifier> token;
- private OzoneClientConfig config;
- private ContainerClientMetrics clientMetrics;
- private StreamBufferArgs streamBufferArgs;
-
- public ECBlockOutputStreamEntry.Builder setBlockID(BlockID bID) {
- this.blockID = bID;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setKey(String keys) {
- this.key = keys;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setXceiverClientManager(
- XceiverClientFactory
- xClientManager) {
- this.xceiverClientManager = xClientManager;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setPipeline(Pipeline ppln) {
- this.pipeline = ppln;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setLength(long len) {
- this.length = len;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setBufferPool(BufferPool pool) {
- this.bufferPool = pool;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setConfig(
- OzoneClientConfig clientConfig) {
- this.config = clientConfig;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setToken(
- Token<OzoneBlockTokenIdentifier> bToken) {
- this.token = bToken;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setClientMetrics(
- ContainerClientMetrics containerClientMetrics) {
- this.clientMetrics = containerClientMetrics;
- return this;
- }
-
- public ECBlockOutputStreamEntry.Builder setStreamBufferArgs(
- StreamBufferArgs args) {
- this.streamBufferArgs = args;
- return this;
- }
-
+ public static class Builder extends BlockOutputStreamEntry.Builder {
public ECBlockOutputStreamEntry build() {
- return new ECBlockOutputStreamEntry(blockID,
- key,
- xceiverClientManager,
- pipeline,
- length,
- bufferPool,
- token, config, clientMetrics, streamBufferArgs);
+ return new ECBlockOutputStreamEntry(this);
}
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index e551605d84..e278097a49 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -17,19 +17,7 @@
*/
package org.apache.hadoop.ozone.client.io;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-
-import java.time.Clock;
-import java.time.ZoneOffset;
/**
* {@link BlockOutputStreamEntryPool} is responsible to manage OM communication
@@ -44,37 +32,14 @@ import java.time.ZoneOffset;
* @see ECBlockOutputStreamEntry
*/
public class ECBlockOutputStreamEntryPool extends BlockOutputStreamEntryPool {
-
- @SuppressWarnings({"parameternumber", "squid:S00107"})
- public ECBlockOutputStreamEntryPool(OzoneClientConfig config,
- OzoneManagerProtocol omClient,
- String requestId,
- ReplicationConfig replicationConfig,
- String uploadID,
- int partNumber,
- boolean isMultipart,
- OmKeyInfo info,
- boolean unsafeByteBufferConversion,
- XceiverClientFactory xceiverClientFactory,
- long openID,
- ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs)
{
- super(config, omClient, requestId, replicationConfig, uploadID, partNumber,
- isMultipart, info, unsafeByteBufferConversion, xceiverClientFactory,
- openID, clientMetrics, streamBufferArgs);
- assert replicationConfig instanceof ECReplicationConfig;
- }
-
- @Override
- ExcludeList createExcludeList() {
- return new ExcludeList(getConfig().getExcludeNodesExpiryTime(),
- Clock.system(ZoneOffset.UTC));
+ public ECBlockOutputStreamEntryPool(ECKeyOutputStream.Builder builder) {
+ super(builder);
}
@Override
- BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
- return
- new ECBlockOutputStreamEntry.Builder()
- .setBlockID(subKeyInfo.getBlockID())
+ ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+ final ECBlockOutputStreamEntry.Builder b = new
ECBlockOutputStreamEntry.Builder();
+ b.setBlockID(subKeyInfo.getBlockID())
.setKey(getKeyName())
.setXceiverClientManager(getXceiverClientFactory())
.setPipeline(subKeyInfo.getPipeline())
@@ -83,8 +48,8 @@ public class ECBlockOutputStreamEntryPool extends
BlockOutputStreamEntryPool {
.setBufferPool(getBufferPool())
.setToken(subKeyInfo.getToken())
.setClientMetrics(getClientMetrics())
- .setStreamBufferArgs(getStreamBufferArgs())
- .build();
+ .setStreamBufferArgs(getStreamBufferArgs());
+ return b.build();
}
@Override
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index b5c36474ff..878558073f 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -17,12 +17,28 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.ozone.om.protocol.S3Auth;
+import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -35,30 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
-import org.apache.hadoop.io.ByteBufferPool;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.om.protocol.S3Auth;
-import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder;
-import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* ECKeyOutputStream handles the EC writes by writing the data into underlying
* block output streams chunk by chunk.
@@ -100,22 +92,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private long offset;
// how much data has been ingested into the stream
private long writeOffset;
- private final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool;
-
- @VisibleForTesting
- public List<BlockOutputStreamEntry> getStreamEntries() {
- return blockOutputStreamEntryPool.getStreamEntries();
- }
-
- @VisibleForTesting
- public XceiverClientFactory getXceiverClientFactory() {
- return blockOutputStreamEntryPool.getXceiverClientFactory();
- }
-
- @VisibleForTesting
- public List<OmKeyLocationInfo> getLocationInfoList() {
- return blockOutputStreamEntryPool.getLocationInfoList();
- }
@VisibleForTesting
public void insertFlushCheckpoint(long version) throws IOException {
@@ -128,8 +104,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
}
private ECKeyOutputStream(Builder builder) {
- super(builder.getReplicationConfig(), builder.getClientMetrics(),
- builder.getClientConfig(), builder.getStreamBufferArgs());
+ super(builder.getReplicationConfig(), new
ECBlockOutputStreamEntryPool(builder));
this.config = builder.getClientConfig();
this.bufferPool = builder.getByteBufferPool();
// For EC, cell/chunk size and buffer size can be same for now.
@@ -140,16 +115,6 @@ public final class ECKeyOutputStream extends
KeyOutputStream
ecChunkSize, numDataBlks, numParityBlks, bufferPool);
chunkIndex = 0;
ecStripeQueue = new ArrayBlockingQueue<>(config.getEcStripeQueueSize());
- OmKeyInfo info = builder.getOpenHandler().getKeyInfo();
- blockOutputStreamEntryPool =
- new ECBlockOutputStreamEntryPool(config,
- builder.getOmClient(), builder.getRequestID(),
- builder.getReplicationConfig(),
- builder.getMultipartUploadID(), builder.getMultipartNumber(),
- builder.isMultipartKey(),
- info, builder.isUnsafeByteBufferConversionEnabled(),
- builder.getXceiverManager(), builder.getOpenHandler().getId(),
- builder.getClientMetrics(), builder.getStreamBufferArgs());
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
@@ -164,22 +129,9 @@ public final class ECKeyOutputStream extends
KeyOutputStream
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
- /**
- * When a key is opened, it is possible that there are some blocks already
- * allocated to it for this open session. In this case, to make use of these
- * blocks, we need to add these blocks to stream entries. But, a key's
version
- * also includes blocks from previous versions, we need to avoid adding these
- * old blocks to stream entries, because these old blocks should not be
picked
- * for write. To do this, the following method checks that, only those
- * blocks created in this particular open version are added to stream
entries.
- *
- * @param version the set of blocks that are pre-allocated.
- * @param openVersion the version corresponding to the pre-allocation.
- * @throws IOException
- */
- public void addPreallocateBlocks(OmKeyLocationInfoGroup version,
- long openVersion) throws IOException {
- blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
+ @Override
+ protected ECBlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
+ return (ECBlockOutputStreamEntryPool)
super.getBlockOutputStreamEntryPool();
}
/**
@@ -218,6 +170,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
final ByteBuffer[] dataBuffers = stripe.getDataBuffers();
offset -= Arrays.stream(dataBuffers).mapToInt(Buffer::limit).sum();
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
final ECBlockOutputStreamEntry failedStreamEntry =
blockOutputStreamEntryPool.getCurrentStreamEntry();
failedStreamEntry.resetToFirstEntry();
@@ -256,8 +209,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
private StripeWriteStatus commitStripeWrite(ECChunkBuffers stripe)
throws IOException {
- ECBlockOutputStreamEntry streamEntry =
- blockOutputStreamEntryPool.getCurrentStreamEntry();
+ final ECBlockOutputStreamEntry streamEntry =
getBlockOutputStreamEntryPool().getCurrentStreamEntry();
List<ECBlockOutputStream> failedStreams =
streamEntry.streamsWithWriteFailure();
if (!failedStreams.isEmpty()) {
@@ -297,6 +249,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
List<ECBlockOutputStream> failedStreams) {
// Exclude the failed pipeline
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
blockOutputStreamEntryPool.getExcludeList().addPipeline(pipeline.getId());
// If the failure is NOT caused by other reasons (e.g. container full),
@@ -362,6 +315,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
}
private void writeDataCells(ECChunkBuffers stripe) throws IOException {
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
blockOutputStreamEntryPool.allocateBlockIfNeeded();
ByteBuffer[] dataCells = stripe.getDataBuffers();
for (int i = 0; i < numDataBlks; i++) {
@@ -374,6 +328,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
private void writeParityCells(ECChunkBuffers stripe) {
// Move the stream entry cursor to parity block index
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
blockOutputStreamEntryPool
.getCurrentStreamEntry().forceToFirstParityBlock();
ByteBuffer[] parityCells = stripe.getParityBuffers();
@@ -413,7 +368,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
// The len cannot be bigger than cell buffer size.
assert buffer.limit() <= ecChunkSize : "The buffer size: " +
buffer.limit() + " should not exceed EC chunk size: " + ecChunkSize;
- writeToOutputStream(blockOutputStreamEntryPool.getCurrentStreamEntry(),
+
writeToOutputStream(getBlockOutputStreamEntryPool().getCurrentStreamEntry(),
buffer.array(), buffer.limit(), 0, isParity);
} catch (Exception e) {
markStreamAsFailed(e);
@@ -449,8 +404,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
Preconditions.checkNotNull(t);
boolean containerExclusionException = checkIfContainerToExclude(t);
if (containerExclusionException) {
- blockOutputStreamEntryPool.getExcludeList()
- .addPipeline(streamEntry.getPipeline().getId());
+
getBlockOutputStreamEntryPool().getExcludeList().addPipeline(streamEntry.getPipeline().getId());
}
markStreamAsFailed(exception);
}
@@ -460,7 +414,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
}
private void markStreamAsFailed(Exception e) {
- blockOutputStreamEntryPool.getCurrentStreamEntry().markFailed(e);
+ getBlockOutputStreamEntryPool().getCurrentStreamEntry().markFailed(e);
}
@Override
@@ -470,6 +424,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
private void closeCurrentStreamEntry()
throws IOException {
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
if (!blockOutputStreamEntryPool.isEmpty()) {
while (true) {
try {
@@ -503,6 +458,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
return;
}
closed = true;
+ final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool =
getBlockOutputStreamEntryPool();
try {
if (!closing) {
// If stripe buffer is not empty, encode and flush the stripe.
@@ -614,20 +570,6 @@ public final class ECKeyOutputStream extends
KeyOutputStream
buf.position(limit);
}
- public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
- return blockOutputStreamEntryPool.getCommitUploadPartInfo();
- }
-
- @VisibleForTesting
- public ExcludeList getExcludeList() {
- return blockOutputStreamEntryPool.getExcludeList();
- }
-
- @Override
- public Map<String, String> getMetadata() {
- return this.blockOutputStreamEntryPool.getMetadata();
- }
-
/**
* Builder class of ECKeyOutputStream.
*/
@@ -682,9 +624,8 @@ public final class ECKeyOutputStream extends KeyOutputStream
*/
private void checkNotClosed() throws IOException {
if (closing || closed) {
- throw new IOException(
- ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
- + blockOutputStreamEntryPool.getKeyName());
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
+ + getBlockOutputStreamEntryPool().getKeyName());
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 8b128e9cd9..9ea17cf8b2 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -69,7 +69,6 @@ import org.slf4j.LoggerFactory;
public class KeyOutputStream extends OutputStream
implements Syncable, KeyMetadataAware {
- private OzoneClientConfig config;
private final ReplicationConfig replication;
/**
@@ -105,11 +104,8 @@ public class KeyOutputStream extends OutputStream
*/
private boolean atomicKeyCreation;
- public KeyOutputStream(ReplicationConfig replicationConfig,
- ContainerClientMetrics clientMetrics, OzoneClientConfig clientConfig,
- StreamBufferArgs streamBufferArgs) {
+ public KeyOutputStream(ReplicationConfig replicationConfig,
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
this.replication = replicationConfig;
- this.config = clientConfig;
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
.stream()
@@ -117,18 +113,16 @@ public class KeyOutputStream extends OutputStream
e -> RetryPolicies.TRY_ONCE_THEN_FAIL));
retryCount = 0;
offset = 0;
- blockOutputStreamEntryPool =
- new BlockOutputStreamEntryPool(clientMetrics, clientConfig,
streamBufferArgs);
+ this.blockOutputStreamEntryPool = blockOutputStreamEntryPool;
}
- @VisibleForTesting
- public List<BlockOutputStreamEntry> getStreamEntries() {
- return blockOutputStreamEntryPool.getStreamEntries();
+ protected BlockOutputStreamEntryPool getBlockOutputStreamEntryPool() {
+ return blockOutputStreamEntryPool;
}
@VisibleForTesting
- public XceiverClientFactory getXceiverClientFactory() {
- return blockOutputStreamEntryPool.getXceiverClientFactory();
+ public List<BlockOutputStreamEntry> getStreamEntries() {
+ return blockOutputStreamEntryPool.getStreamEntries();
}
@VisibleForTesting
@@ -146,39 +140,18 @@ public class KeyOutputStream extends OutputStream
return clientID;
}
- @SuppressWarnings({"parameternumber", "squid:S00107"})
- public KeyOutputStream(
- OzoneClientConfig config,
- OpenKeySession handler,
- XceiverClientFactory xceiverClientManager,
- OzoneManagerProtocol omClient,
- String requestId, ReplicationConfig replicationConfig,
- String uploadID, int partNumber, boolean isMultipart,
- boolean unsafeByteBufferConversion,
- ContainerClientMetrics clientMetrics,
- boolean atomicKeyCreation, StreamBufferArgs streamBufferArgs
- ) {
- this.config = config;
- this.replication = replicationConfig;
- blockOutputStreamEntryPool =
- new BlockOutputStreamEntryPool(
- config,
- omClient,
- requestId, replicationConfig,
- uploadID, partNumber,
- isMultipart, handler.getKeyInfo(),
- unsafeByteBufferConversion,
- xceiverClientManager,
- handler.getId(),
- clientMetrics, streamBufferArgs);
+ public KeyOutputStream(Builder b) {
+ this.replication = b.replicationConfig;
+ this.blockOutputStreamEntryPool = new BlockOutputStreamEntryPool(b);
+ final OzoneClientConfig config = b.getClientConfig();
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
this.isException = false;
this.writeOffset = 0;
- this.clientID = handler.getId();
- this.atomicKeyCreation = atomicKeyCreation;
- this.streamBufferArgs = streamBufferArgs;
+ this.clientID = b.getOpenHandler().getId();
+ this.atomicKeyCreation = b.getAtomicKeyCreation();
+ this.streamBufferArgs = b.getStreamBufferArgs();
}
/**
@@ -192,10 +165,8 @@ public class KeyOutputStream extends OutputStream
*
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
- * @throws IOException
*/
- public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup version,
- long openVersion) throws IOException {
+ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup
version, long openVersion) {
blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
}
@@ -729,20 +700,7 @@ public class KeyOutputStream extends OutputStream
}
public KeyOutputStream build() {
- return new KeyOutputStream(
- clientConfig,
- openHandler,
- xceiverManager,
- omClient,
- requestID,
- replicationConfig,
- multipartUploadID,
- multipartNumber,
- isMultipartKey,
- unsafeByteBufferConversion,
- clientMetrics,
- atomicKeyCreation,
- streamBufferArgs);
+ return new KeyOutputStream(this);
}
}
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
index 7760e88e48..718e724e58 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockOutputStreamEntry.java
@@ -63,10 +63,10 @@ public class TestECBlockOutputStreamEntry {
try (XceiverClientManager manager =
new XceiverClientManager(new OzoneConfiguration())) {
HashSet<XceiverClientSpi> clients = new HashSet<>();
- ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
- .setXceiverClientManager(manager)
- .setPipeline(anECPipeline)
- .build();
+ final ECBlockOutputStreamEntry.Builder b = new
ECBlockOutputStreamEntry.Builder();
+ b.setXceiverClientManager(manager)
+ .setPipeline(anECPipeline);
+ final ECBlockOutputStreamEntry entry = b.build();
for (int i = 0; i < nodes.size(); i++) {
clients.add(
manager.acquireClient(
@@ -101,10 +101,10 @@ public class TestECBlockOutputStreamEntry {
try (XceiverClientManager manager =
new XceiverClientManager(new OzoneConfiguration())) {
HashSet<XceiverClientSpi> clients = new HashSet<>();
- ECBlockOutputStreamEntry entry = new ECBlockOutputStreamEntry.Builder()
- .setXceiverClientManager(manager)
- .setPipeline(anECPipeline)
- .build();
+ final ECBlockOutputStreamEntry.Builder b = new
ECBlockOutputStreamEntry.Builder();
+ b.setXceiverClientManager(manager)
+ .setPipeline(anECPipeline);
+ final ECBlockOutputStreamEntry entry = b.build();
for (int i = 0; i < nodes.size(); i++) {
clients.add(
manager.acquireClient(
diff --git
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index da2fb26ec8..ca3caa4ee7 100644
---
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
@@ -81,10 +79,7 @@ public class OzoneOutputStreamStub extends OzoneOutputStream
{
OzoneConfiguration conf = new OzoneConfiguration();
ReplicationConfig replicationConfig =
ReplicationConfig.getDefault(conf);
- OzoneClientConfig ozoneClientConfig =
conf.getObject(OzoneClientConfig.class);
- StreamBufferArgs streamBufferArgs =
- StreamBufferArgs.getDefaultStreamBufferArgs(replicationConfig,
ozoneClientConfig);
- return new KeyOutputStream(replicationConfig, null, ozoneClientConfig,
streamBufferArgs) {
+ return new KeyOutputStream(replicationConfig, null) {
@Override
public synchronized OmMultipartCommitUploadPartInfo
getCommitUploadPartInfo() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]