This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c4dc6a072f HDDS-5865. Make read retry interval and attempts in
BlockInputStream configurable (#6408)
c4dc6a072f is described below
commit c4dc6a072f1171a9a2962ee9a81b91b9d7a6d0ba
Author: SaketaChalamchala <[email protected]>
AuthorDate: Thu Mar 21 04:56:14 2024 -0700
HDDS-5865. Make read retry interval and attempts in BlockInputStream
configurable (#6408)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 33 +++++++
.../hadoop/hdds/scm/storage/BlockInputStream.java | 23 +++--
.../ozone/client/io/BlockInputStreamFactory.java | 6 +-
.../client/io/BlockInputStreamFactoryImpl.java | 13 ++-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 16 +--
.../ozone/client/io/ECBlockInputStreamFactory.java | 6 +-
.../client/io/ECBlockInputStreamFactoryImpl.java | 13 ++-
.../ozone/client/io/ECBlockInputStreamProxy.java | 14 +--
.../io/ECBlockReconstructedStripeInputStream.java | 10 +-
.../hdds/scm/storage/DummyBlockInputStream.java | 9 +-
.../storage/DummyBlockInputStreamWithRetry.java | 9 +-
.../hdds/scm/storage/TestBlockInputStream.java | 24 ++++-
.../hadoop/ozone/client/io/ECStreamTestUtil.java | 6 +-
.../client/io/TestBlockInputStreamFactoryImpl.java | 14 ++-
.../ozone/client/io/TestECBlockInputStream.java | 110 ++++++++++++++++-----
.../client/io/TestECBlockInputStreamProxy.java | 13 ++-
.../io/TestECBlockReconstructedInputStream.java | 10 +-
.../TestECBlockReconstructedStripeInputStream.java | 12 ++-
.../ECReconstructionCoordinator.java | 7 +-
.../hadoop/ozone/client/io/KeyInputStream.java | 34 ++++---
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 15 ++-
.../ozone/client/io/TestKeyInputStreamEC.java | 12 ++-
.../org/apache/hadoop/ozone/TestBlockTokens.java | 7 +-
.../apache/hadoop/ozone/om/TestChunkStreams.java | 9 +-
24 files changed, 304 insertions(+), 121 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 3042b4d847..549735438a 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -144,6 +144,23 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private int retryInterval = 0;
+ @Config(key = "read.max.retries",
+ defaultValue = "3",
+ description = "Maximum number of retries by Ozone Client on "
+ + "encountering connectivity exception when reading a key.",
+ tags = ConfigTag.CLIENT)
+ private int maxReadRetryCount = 3;
+
+ @Config(key = "read.retry.interval",
+ defaultValue = "1",
+ description =
+ "Indicates the time duration in seconds a client will wait "
+ + "before retrying a read key request on encountering "
+ + "a connectivity excepetion from Datanodes . "
+ + "By default the interval is 1 second",
+ tags = ConfigTag.CLIENT)
+ private int readRetryInterval = 1;
+
@Config(key = "checksum.type",
defaultValue = "CRC32",
description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
@@ -326,6 +343,22 @@ public class OzoneClientConfig {
this.retryInterval = retryInterval;
}
+ public int getMaxReadRetryCount() {
+ return maxReadRetryCount;
+ }
+
+ public void setMaxReadRetryCount(int maxReadRetryCount) {
+ this.maxReadRetryCount = maxReadRetryCount;
+ }
+
+ public int getReadRetryInterval() {
+ return readRetryInterval;
+ }
+
+ public void setReadRetryInterval(int readRetryInterval) {
+ this.readRetryInterval = readRetryInterval;
+ }
+
public ChecksumType getChecksumType() {
return ChecksumType.valueOf(checksumType);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 1fb4bf954c..b62415395d 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -36,6 +36,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -76,8 +77,8 @@ public class BlockInputStream extends
BlockExtendedInputStream {
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
// TODO: do we need to change retrypolicy based on exception.
- private final RetryPolicy retryPolicy =
- HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
+ private final RetryPolicy retryPolicy;
+
private int retries;
// List of ChunkInputStreams, one for each chunk in the block
@@ -112,25 +113,29 @@ public class BlockInputStream extends
BlockExtendedInputStream {
private final Function<BlockID, BlockLocationInfo> refreshFunction;
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
tokenRef.set(token);
- this.verifyChecksum = verifyChecksum;
+ this.verifyChecksum = config.isChecksumVerify();
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
+ this.retryPolicy =
+ HddsClientUtils.createRetryPolicy(config.getMaxReadRetryCount(),
+ TimeUnit.SECONDS.toMillis(config.getReadRetryInterval()));
}
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum,
- XceiverClientFactory xceiverClientFactory
+ XceiverClientFactory xceiverClientFactory,
+ OzoneClientConfig config
) {
- this(blockId, blockLen, pipeline, token, verifyChecksum,
- xceiverClientFactory, null);
+ this(blockId, blockLen, pipeline, token,
+ xceiverClientFactory, null, config);
}
/**
* Initialize the BlockInputStream. Get the BlockData (list of chunks) from
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
index bd100214ae..6f8a744f76 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
@@ -48,8 +49,9 @@ public interface BlockInputStreamFactory {
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction);
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 40063f9ce4..6bcdc3c481 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
@@ -76,16 +77,18 @@ public class BlockInputStreamFactoryImpl implements
BlockInputStreamFactory {
*/
public BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
- blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
- ecBlockStreamFactory);
+ blockInfo, xceiverFactory, refreshFunction,
+ ecBlockStreamFactory, config);
} else {
return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
- pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+ pipeline, token, xceiverFactory, refreshFunction,
+ config);
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index e85bf27d53..8dc07f129b 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -60,7 +61,6 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private final int ecChunkSize;
private final long stripeSize;
private final BlockInputStreamFactory streamFactory;
- private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
@@ -75,7 +75,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private long position = 0;
private boolean closed = false;
private boolean seeked = false;
-
+ private OzoneClientConfig config;
protected ECReplicationConfig getRepConfig() {
return repConfig;
}
@@ -108,13 +108,13 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
}
public ECBlockInputStream(ECReplicationConfig repConfig,
- BlockLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
- BlockInputStreamFactory streamFactory) {
+ BlockInputStreamFactory streamFactory,
+ OzoneClientConfig config) {
this.repConfig = repConfig;
this.ecChunkSize = repConfig.getEcChunkSize();
- this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.streamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
@@ -123,6 +123,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
this.blockStreams =
new BlockExtendedInputStream[repConfig.getRequiredNodes()];
+ this.config = config;
this.stripeSize = (long)ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
@@ -191,8 +192,9 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE),
blkInfo, pipeline,
- blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
- ecPipelineRefreshFunction(locationIndex + 1, refreshFunction));
+ blockInfo.getToken(), xceiverClientFactory,
+ ecPipelineRefreshFunction(locationIndex + 1, refreshFunction),
+ config);
blockStreams[locationIndex] = stream;
LOG.debug("{}: created stream [{}]: {}", this, locationIndex, stream);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index 0e2ef22c1e..66e7a31337 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -51,7 +52,8 @@ public interface ECBlockInputStreamFactory {
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
- BlockLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction);
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 36b6539ea8..01d0b0a7b7 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -74,16 +75,17 @@ public final class ECBlockInputStreamFactoryImpl implements
*/
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
- BlockLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo,
XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
if (missingLocations) {
// We create the reconstruction reader
ECBlockReconstructedStripeInputStream sis =
new ECBlockReconstructedStripeInputStream(
- (ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
+ (ECReplicationConfig)repConfig, blockInfo,
xceiverFactory, refreshFunction, inputStreamFactory,
- byteBufferPool, ecReconstructExecutorSupplier.get());
+ byteBufferPool, ecReconstructExecutorSupplier.get(), config);
if (failedLocations != null) {
sis.addFailedDatanodes(failedLocations);
}
@@ -92,7 +94,8 @@ public final class ECBlockInputStreamFactoryImpl implements
} else {
// Otherwise create the more efficient non-reconstruction reader
return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
- verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory);
+ xceiverFactory, refreshFunction, inputStreamFactory,
+ config);
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index 973561616f..68a0337cef 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -49,7 +50,6 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
LoggerFactory.getLogger(ECBlockInputStreamProxy.class);
private final ECReplicationConfig repConfig;
- private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
@@ -59,6 +59,7 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
private boolean reconstructionReader = false;
private List<DatanodeDetails> failedLocations = new ArrayList<>();
private boolean closed = false;
+ private OzoneClientConfig config;
/**
* Given the ECReplicationConfig and the block length, calculate how many
@@ -97,16 +98,17 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
}
public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
- BlockLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
BlockLocationInfo> refreshFunction,
- ECBlockInputStreamFactory streamFactory) {
+ ECBlockInputStreamFactory streamFactory,
+ OzoneClientConfig config) {
this.repConfig = repConfig;
- this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
this.ecBlockInputStreamFactory = streamFactory;
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
+ this.config = config;
setReaderType();
createBlockReader();
@@ -124,8 +126,8 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
.incECReconstructionTotal();
}
blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
- failedLocations, repConfig, blockInfo, verifyChecksum,
- xceiverClientFactory, refreshFunction);
+ failedLocations, repConfig, blockInfo,
+ xceiverClientFactory, refreshFunction, config);
}
@Override
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 142825cb12..31f94e0aca 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -152,14 +153,15 @@ public class ECBlockReconstructedStripeInputStream
extends ECBlockInputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
- BlockLocationInfo blockInfo, boolean verifyChecksum,
+ BlockLocationInfo blockInfo,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
- ExecutorService ecReconstructExecutor) {
- super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
- refreshFunction, streamFactory);
+ ExecutorService ecReconstructExecutor,
+ OzoneClientConfig config) {
+ super(repConfig, blockInfo, xceiverClientFactory,
+ refreshFunction, streamFactory, config);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index 3e7779f0d1..a89097533d 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -24,6 +24,7 @@ import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
@@ -44,13 +45,13 @@ class DummyBlockInputStream extends BlockInputStream {
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
- Map<String, byte[]> chunks) {
- super(blockId, blockLen, pipeline, token, verifyChecksum,
- xceiverClientManager, refreshFunction);
+ Map<String, byte[]> chunks,
+ OzoneClientConfig config) {
+ super(blockId, blockLen, pipeline, token,
+ xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
this.chunks = chunkList;
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 24a3574514..6d12614228 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.client.BlockID;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
@@ -51,12 +52,12 @@ final class DummyBlockInputStreamWithRetry
long blockLen,
Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
- AtomicBoolean isRerfreshed, IOException ioException) {
- super(blockId, blockLen, pipeline, token, verifyChecksum,
+ AtomicBoolean isRerfreshed, IOException ioException,
+ OzoneClientConfig config) {
+ super(blockId, blockLen, pipeline, token,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
try {
@@ -68,7 +69,7 @@ final class DummyBlockInputStreamWithRetry
throw new RuntimeException(e);
}
- }, chunkList, chunkMap);
+ }, chunkList, chunkMap, config);
this.ioException = ioException;
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 7755adc5f3..21b088ce85 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -22,8 +22,10 @@ import com.google.common.primitives.Bytes;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -85,6 +87,8 @@ public class TestBlockInputStream {
private Function<BlockID, BlockLocationInfo> refreshFunction;
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
@BeforeEach
@SuppressWarnings("unchecked")
public void setup() throws Exception {
@@ -92,10 +96,12 @@ public class TestBlockInputStream {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(false);
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
blockStream = new DummyBlockInputStream(blockID, blockSize, pipeline, null,
- false, null, refreshFunction, chunks, chunkDataMap);
+ null, refreshFunction, chunks, chunkDataMap, clientConfig);
}
/**
@@ -259,11 +265,14 @@ public class TestBlockInputStream {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
AtomicBoolean isRefreshed = new AtomicBoolean();
createChunkList(5);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(false);
try (BlockInputStream blockInputStreamWithRetry =
new DummyBlockInputStreamWithRetry(blockID, blockSize,
MockPipeline.createSingleNodePipeline(), null,
- false, null, chunks, chunkDataMap, isRefreshed, null)) {
+ null, chunks, chunkDataMap, isRefreshed, null,
+ clientConfig)) {
assertFalse(isRefreshed.get());
seekAndVerify(50);
byte[] b = new byte[200];
@@ -347,8 +356,10 @@ public class TestBlockInputStream {
private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
- return new DummyBlockInputStream(blockID, blockSize, pipeline, null, false,
- null, refreshFunction, chunks, null) {
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(false);
+ return new DummyBlockInputStream(blockID, blockSize, pipeline, null,
+ null, refreshFunction, chunks, null, clientConfig) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
@@ -400,8 +411,11 @@ public class TestBlockInputStream {
.thenReturn(blockLocationInfo);
when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(false);
BlockInputStream subject = new BlockInputStream(blockID, blockSize,
- pipeline, null, false, clientFactory, refreshFunction) {
+ pipeline, null, clientFactory, refreshFunction,
+ clientConfig) {
@Override
protected List<ChunkInfo> getChunkInfoListUsingClient() {
return chunks;
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
index 41bf46a8ea..049037bc4d 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -258,9 +259,10 @@ public final class ECStreamTestUtil {
public synchronized BlockExtendedInputStream create(
ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
TestBlockInputStream stream = new TestBlockInputStream(
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
index cf3f4f13ef..623f7a4f86 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java
@@ -21,9 +21,11 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
@@ -43,6 +45,8 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
*/
public class TestBlockInputStreamFactoryImpl {
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
@Test
public void testNonECGivesBlockInputStream() {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
@@ -52,9 +56,12 @@ public class TestBlockInputStreamFactoryImpl {
BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
1024 * 1024 * 10);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
- blockInfo.getToken(), true, null, null);
+ blockInfo.getToken(), null, null,
+ clientConfig);
assertInstanceOf(BlockInputStream.class, stream);
assertEquals(stream.getBlockID(), blockInfo.getBlockID());
assertEquals(stream.getLength(), blockInfo.getLength());
@@ -69,9 +76,12 @@ public class TestBlockInputStreamFactoryImpl {
BlockLocationInfo blockInfo =
createKeyLocationInfo(repConfig, 5, 1024 * 1024 * 10);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
- blockInfo.getToken(), true, null, null);
+ blockInfo.getToken(), null, null,
+ clientConfig);
assertInstanceOf(ECBlockInputStreamProxy.class, stream);
assertEquals(stream.getBlockID(), blockInfo.getBlockID());
assertEquals(stream.getLength(), blockInfo.getLength());
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index bd34e7546c..60974b35a9 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -59,6 +61,7 @@ public class TestECBlockInputStream {
private ECReplicationConfig repConfig;
private TestBlockInputStreamFactory streamFactory;
+ private OzoneConfiguration conf = new OzoneConfiguration();
@BeforeEach
public void setup() {
@@ -72,15 +75,19 @@ public class TestECBlockInputStream {
// EC-3-2, 5MB block, so all 3 data locations are needed
BlockLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, null, null, new TestBlockInputStreamFactory(),
+ clientConfig)) {
assertTrue(ecb.hasSufficientLocations());
}
// EC-3-2, very large block, so all 3 data locations are needed
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5, 5000 * ONEMB);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, null, null, new TestBlockInputStreamFactory(),
+ clientConfig)) {
assertTrue(ecb.hasSufficientLocations());
}
@@ -90,7 +97,8 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB - 1, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, null, null, new TestBlockInputStreamFactory(),
+ clientConfig)) {
assertTrue(ecb.hasSufficientLocations());
}
@@ -100,7 +108,8 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 1);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, null, null, new TestBlockInputStreamFactory(),
+ clientConfig)) {
assertFalse(ecb.hasSufficientLocations());
}
@@ -112,7 +121,8 @@ public class TestECBlockInputStream {
dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, 5 * ONEMB, dnMap);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ keyInfo, null, null, new TestBlockInputStreamFactory(),
+ clientConfig)) {
assertFalse(ecb.hasSufficientLocations());
}
}
@@ -124,8 +134,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB - 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
// We expect only 1 block stream and it should have a length passed of
// ONEMB - 100.
@@ -141,8 +154,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB + 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
assertEquals(ONEMB, streams.get(0).getLength());
@@ -157,8 +173,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 2 * ONEMB + 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
assertEquals(ONEMB, streams.get(0).getLength());
@@ -174,8 +193,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 10 * ONEMB + 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
assertEquals(4 * ONEMB, streams.get(0).getLength());
@@ -191,8 +213,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
assertEquals(ONEMB, streams.get(0).getLength());
@@ -206,8 +231,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 9 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.read(buf);
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
assertEquals(3 * ONEMB, streams.get(0).getLength());
@@ -220,8 +248,11 @@ public class TestECBlockInputStream {
public void testSimpleRead() throws IOException {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ByteBuffer buf = ByteBuffer.allocate(100);
@@ -243,8 +274,11 @@ public class TestECBlockInputStream {
public void testSimpleReadUnderOneChunk() throws IOException {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 1, ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ByteBuffer buf = ByteBuffer.allocate(100);
@@ -262,8 +296,11 @@ public class TestECBlockInputStream {
public void testReadPastEOF() throws IOException {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 50);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ByteBuffer buf = ByteBuffer.allocate(100);
@@ -281,8 +318,11 @@ public class TestECBlockInputStream {
100);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
// EC Chunk size is 100 and 3-2. Create a byte buffer to read 3.5 chunks,
// so 350
@@ -316,8 +356,11 @@ public class TestECBlockInputStream {
ONEMB);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
assertThrows(EOFException.class, () -> ecb.seek(1000));
}
}
@@ -328,8 +371,11 @@ public class TestECBlockInputStream {
ONEMB);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 100);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
// When seek more than the length, should throw EOFException.
assertThrows(EOFException.class, () -> ecb.seek(101));
}
@@ -341,8 +387,11 @@ public class TestECBlockInputStream {
ONEMB);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 0);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.seek(0);
assertEquals(0, ecb.getPos());
assertEquals(0, ecb.getRemaining());
@@ -355,8 +404,11 @@ public class TestECBlockInputStream {
ONEMB);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
ecb.seek(ONEMB - 1);
assertEquals(ONEMB - 1, ecb.getPos());
assertEquals(ONEMB * 4 + 1, ecb.getRemaining());
@@ -384,8 +436,11 @@ public class TestECBlockInputStream {
ONEMB);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
// Read a full stripe to ensure all streams are created in the stream
// factory
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
@@ -415,8 +470,11 @@ public class TestECBlockInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 8 * ONEMB, datanodes);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
// Read a full stripe to ensure all streams are created in the stream
// factory
ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
@@ -479,8 +537,11 @@ public class TestECBlockInputStream {
return blockLocation;
};
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
- keyInfo, true, null, null, streamFactory)) {
+ keyInfo, null, null, streamFactory,
+ clientConfig)) {
Pipeline pipeline =
ecb.ecPipelineRefreshFunction(3, refreshFunction)
.apply(blockID)
@@ -513,8 +574,9 @@ public class TestECBlockInputStream {
public synchronized BlockExtendedInputStream create(
ReplicationConfig repConfig, BlockLocationInfo blockInfo,
Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
- boolean verifyChecksum, XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
(byte)blockStreams.size());
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
index 97bf71c204..ca0b9710a9 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
@@ -20,7 +20,9 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -52,6 +54,7 @@ public class TestECBlockInputStreamProxy {
private long randomSeed;
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGenerator;
+ private OzoneConfiguration conf = new OzoneConfiguration();
@BeforeEach
public void setup() {
@@ -342,8 +345,11 @@ public class TestECBlockInputStreamProxy {
private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
BlockLocationInfo blockInfo) {
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
return new ECBlockInputStreamProxy(
- rConfig, blockInfo, true, null, null, streamFactory);
+ rConfig, blockInfo, null, null, streamFactory,
+ clientConfig);
}
private static class TestECBlockInputStreamFactory
@@ -372,8 +378,9 @@ public class TestECBlockInputStreamProxy {
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedDatanodes,
ReplicationConfig repConfig, BlockLocationInfo blockInfo,
- boolean verifyChecksum, XceiverClientFactory xceiverFactory,
- Function<BlockID, BlockLocationInfo> refreshFunction) {
+ XceiverClientFactory xceiverFactory,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ OzoneClientConfig config) {
this.failedLocations = failedDatanodes;
ByteBuffer wrappedBuffer =
ByteBuffer.wrap(data.array(), 0, data.capacity());
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java
index 0425f6943a..6b60bef66a 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedInputStream.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -54,6 +56,7 @@ public class TestECBlockReconstructedInputStream {
private ByteBufferPool bufferPool = new ElasticByteBufferPool();
private ExecutorService ecReconstructExecutor =
Executors.newFixedThreadPool(3);
+ private OzoneConfiguration conf = new OzoneConfiguration();
@BeforeEach
public void setup() throws IOException {
@@ -74,8 +77,11 @@ public class TestECBlockReconstructedInputStream {
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
- return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory, bufferPool, ecReconstructExecutor);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
+ return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+ null, null, streamFactory, bufferPool, ecReconstructExecutor,
+ clientConfig);
}
@Test
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
index f7a4bb0643..e526b12a51 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.client.io;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
@@ -73,7 +75,8 @@ public class TestECBlockReconstructedStripeInputStream {
private ByteBufferPool bufferPool = new ElasticByteBufferPool();
private ExecutorService ecReconstructExecutor =
Executors.newFixedThreadPool(3);
-
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
static List<Set<Integer>> recoveryCases() { // TODO better name
List<Set<Integer>> params = new ArrayList<>();
params.add(emptySet()); // non-recovery
@@ -808,8 +811,11 @@ public class TestECBlockReconstructedStripeInputStream {
private ECBlockReconstructedStripeInputStream createInputStream(
BlockLocationInfo keyInfo) {
- return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory, bufferPool, ecReconstructExecutor);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
+ return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+ null, null, streamFactory, bufferPool, ecReconstructExecutor,
+ clientConfig);
}
private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity)
{
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 90756bbc88..dccc271f6d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -267,12 +267,15 @@ public class ECReconstructionCoordinator implements
Closeable {
return;
}
+ OzoneClientConfig clientConfig = this.ozoneClientConfig;
+ clientConfig.setChecksumVerify(true);
try (ECBlockReconstructedStripeInputStream sis
= new ECBlockReconstructedStripeInputStream(
- repConfig, blockLocationInfo, true,
+ repConfig, blockLocationInfo,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
- this.ecReconstructReadExecutor)) {
+ this.ecReconstructReadExecutor,
+ clientConfig)) {
ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 4843c1c45e..2d40841ee4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -58,9 +59,9 @@ public class KeyInputStream extends MultipartInputStream {
OmKeyInfo keyInfo,
List<OmKeyLocationInfo> blockInfos,
XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum,
Function<OmKeyInfo, OmKeyInfo> retryFunction,
- BlockInputStreamFactory blockStreamFactory) {
+ BlockInputStreamFactory blockStreamFactory,
+ OzoneClientConfig config) {
List<BlockExtendedInputStream> partStreams = new ArrayList<>();
for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) {
if (LOG.isDebugEnabled()) {
@@ -91,9 +92,9 @@ public class KeyInputStream extends MultipartInputStream {
omKeyLocationInfo,
omKeyLocationInfo.getPipeline(),
omKeyLocationInfo.getToken(),
- verifyChecksum,
xceiverClientFactory,
- retry);
+ retry,
+ config);
partStreams.add(stream);
}
return partStreams;
@@ -117,13 +118,13 @@ public class KeyInputStream extends MultipartInputStream {
private static LengthInputStream getFromOmKeyInfo(
OmKeyInfo keyInfo,
XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum,
Function<OmKeyInfo, OmKeyInfo> retryFunction,
BlockInputStreamFactory blockStreamFactory,
- List<OmKeyLocationInfo> locationInfos) {
+ List<OmKeyLocationInfo> locationInfos,
+ OzoneClientConfig config) {
List<BlockExtendedInputStream> streams = createStreams(keyInfo,
- locationInfos, xceiverClientFactory, verifyChecksum, retryFunction,
- blockStreamFactory);
+ locationInfos, xceiverClientFactory, retryFunction,
+ blockStreamFactory, config);
KeyInputStream keyInputStream =
new KeyInputStream(keyInfo.getKeyName(), streams);
return new LengthInputStream(keyInputStream, keyInputStream.getLength());
@@ -134,20 +135,22 @@ public class KeyInputStream extends MultipartInputStream {
*/
public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo,
XceiverClientFactory xceiverClientFactory,
- boolean verifyChecksum, Function<OmKeyInfo, OmKeyInfo> retryFunction,
- BlockInputStreamFactory blockStreamFactory) {
+ Function<OmKeyInfo, OmKeyInfo> retryFunction,
+ BlockInputStreamFactory blockStreamFactory,
+ OzoneClientConfig config) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
- return getFromOmKeyInfo(keyInfo, xceiverClientFactory, verifyChecksum,
- retryFunction, blockStreamFactory, keyLocationInfos);
+ return getFromOmKeyInfo(keyInfo, xceiverClientFactory,
+ retryFunction, blockStreamFactory, keyLocationInfos, config);
}
public static List<LengthInputStream> getStreamsFromKeyInfo(OmKeyInfo
keyInfo,
- XceiverClientFactory xceiverClientFactory, boolean verifyChecksum,
+ XceiverClientFactory xceiverClientFactory,
Function<OmKeyInfo, OmKeyInfo> retryFunction,
- BlockInputStreamFactory blockStreamFactory) {
+ BlockInputStreamFactory blockStreamFactory,
+ OzoneClientConfig config) {
List<OmKeyLocationInfo> keyLocationInfos = keyInfo
.getLatestVersionLocations().getBlocksLatestVersionOnly();
@@ -162,7 +165,8 @@ public class KeyInputStream extends MultipartInputStream {
// Create a KeyInputStream for each part.
for (List<OmKeyLocationInfo> locationInfo : partsToBlocksMap.values()) {
lengthInputStreams.add(getFromOmKeyInfo(keyInfo, xceiverClientFactory,
- verifyChecksum, retryFunction, blockStreamFactory, locationInfo));
+ retryFunction, blockStreamFactory, locationInfo,
+ config));
}
return lengthInputStreams;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 7cd9ed440b..3a4d391b00 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -2257,9 +2257,8 @@ public class RpcClient implements ClientProtocol {
if (feInfo == null) {
LengthInputStream lengthInputStream = KeyInputStream
- .getFromOmKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction,
- blockInputStreamFactory);
+ .getFromOmKeyInfo(keyInfo, xceiverClientManager, retryFunction,
+ blockInputStreamFactory, clientConfig);
try {
final GDPRSymmetricKey gk = getGDPRSymmetricKey(
keyInfo.getMetadata(), Cipher.DECRYPT_MODE);
@@ -2274,9 +2273,8 @@ public class RpcClient implements ClientProtocol {
} else if (!keyInfo.getLatestVersionLocations().isMultipartKey()) {
// Regular Key with FileEncryptionInfo
LengthInputStream lengthInputStream = KeyInputStream
- .getFromOmKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction,
- blockInputStreamFactory);
+ .getFromOmKeyInfo(keyInfo, xceiverClientManager, retryFunction,
+ blockInputStreamFactory, clientConfig);
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
final CryptoInputStream cryptoIn =
new CryptoInputStream(lengthInputStream.getWrappedStream(),
@@ -2286,9 +2284,8 @@ public class RpcClient implements ClientProtocol {
} else {
// Multipart Key with FileEncryptionInfo
List<LengthInputStream> lengthInputStreams = KeyInputStream
- .getStreamsFromKeyInfo(keyInfo, xceiverClientManager,
- clientConfig.isChecksumVerify(), retryFunction,
- blockInputStreamFactory);
+ .getStreamsFromKeyInfo(keyInfo, xceiverClientManager, retryFunction,
+ blockInputStreamFactory, clientConfig);
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java
index 6af5c4b4e0..4d4a1ab4cb 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyInputStreamEC.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
@@ -39,7 +41,6 @@ import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConsts.MB;
import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -49,6 +50,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class TestKeyInputStreamEC {
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
@Test
public void testReadAgainstLargeBlockGroup() throws IOException {
int dataBlocks = 10;
@@ -68,10 +71,13 @@ public class TestKeyInputStreamEC {
BlockInputStreamFactory mockStreamFactory =
mock(BlockInputStreamFactory.class);
when(mockStreamFactory.create(any(), any(), any(), any(),
- anyBoolean(), any(), any())).thenReturn(blockInputStream);
+ any(), any(), any())).thenReturn(blockInputStream);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
try (LengthInputStream kis = KeyInputStream.getFromOmKeyInfo(keyInfo,
- null, true, null, mockStreamFactory)) {
+ null, null, mockStreamFactory,
+ clientConfig)) {
byte[] buf = new byte[100];
int readBytes = kis.read(buf, 0, 100);
assertEquals(100, readBytes);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java
index 0b5dab36d3..26c1868084 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestBlockTokens.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.DefaultConfigManager;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -295,9 +296,11 @@ public final class TestBlockTokens {
Function<OmKeyInfo, OmKeyInfo> retryFunc) throws IOException {
XceiverClientFactory xceiverClientManager =
((RpcClient) client.getProxy()).getXceiverClientManager();
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(false);
try (InputStream is = KeyInputStream.getFromOmKeyInfo(keyInfo,
- xceiverClientManager,
- false, retryFunc, blockInputStreamFactory)) {
+ xceiverClientManager, retryFunc, blockInputStreamFactory,
+ clientConfig)) {
byte[] buf = new byte[100];
int readBytes = is.read(buf, 0, 100);
assertEquals(100, readBytes);
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
index dc425926ac..e0d5ef4084 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import jakarta.annotation.Nonnull;
@@ -34,6 +36,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class TestChunkStreams {
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
@Test
public void testReadGroupInputStream() throws Exception {
String dataString = RandomStringUtils.randomAscii(500);
@@ -90,7 +94,10 @@ public class TestChunkStreams {
}
private BlockInputStream createStream(byte[] buf, int offset) {
- return new BlockInputStream(null, 100, null, null, true, null) {
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(true);
+ return new BlockInputStream(null, 100, null, null, null,
+ clientConfig) {
private long pos;
private final ByteArrayInputStream in =
new ByteArrayInputStream(buf, offset, 100);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]