This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new a1d7292 HDDS-6010. EC: Create ECBlockInputStreamProxy to choose
between reconstruction and normal reads (#2889)
a1d7292 is described below
commit a1d7292bcf7b698b82308baf1acc50f7f52fa0d4
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Wed Dec 8 09:43:39 2021 +0000
HDDS-6010. EC: Create ECBlockInputStreamProxy to choose between
reconstruction and normal reads (#2889)
---
.../ozone/client/io/BadDataLocationException.java | 56 +++
.../client/io/BlockInputStreamFactoryImpl.java | 12 +-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 19 +-
...oryImpl.java => ECBlockInputStreamFactory.java} | 46 +--
...mpl.java => ECBlockInputStreamFactoryImpl.java} | 58 +--
.../ozone/client/io/ECBlockInputStreamProxy.java | 220 ++++++++++++
.../io/ECBlockReconstructedStripeInputStream.java | 27 ++
.../ozone/client/rpc/read/ECStreamTestUtil.java | 54 ++-
.../rpc/read/TestBlockInputStreamFactoryImpl.java | 4 +-
.../client/rpc/read/TestECBlockInputStream.java | 40 ++-
.../rpc/read/TestECBlockInputStreamProxy.java | 395 +++++++++++++++++++++
.../TestECBlockReconstructedStripeInputStream.java | 75 +++-
12 files changed, 939 insertions(+), 67 deletions(-)
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
new file mode 100644
index 0000000..dc8f7b0
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BadDataLocationException.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import java.io.IOException;
+
+/**
+ * Exception used to indicate a problem with a specific block location,
allowing
+ * the failed location to be communicated back to the caller.
+ */
+public class BadDataLocationException extends IOException {
+
+ private DatanodeDetails failedLocation;
+
+ public BadDataLocationException(DatanodeDetails dn) {
+ super();
+ failedLocation = dn;
+ }
+
+ public BadDataLocationException(DatanodeDetails dn, String message) {
+ super(message);
+ failedLocation = dn;
+ }
+
+ public BadDataLocationException(DatanodeDetails dn, String message,
+ Throwable ex) {
+ super(message, ex);
+ failedLocation = dn;
+ }
+
+ public BadDataLocationException(DatanodeDetails dn, Throwable ex) {
+ super(ex);
+ failedLocation = dn;
+ }
+
+ public DatanodeDetails getFailedLocation() {
+ return failedLocation;
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 1d372a7..6cdb991 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -36,10 +36,17 @@ import java.util.function.Function;
*/
public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+ private ECBlockInputStreamFactory ecBlockStreamFactory;
+
public static BlockInputStreamFactory getInstance() {
return new BlockInputStreamFactoryImpl();
}
+ public BlockInputStreamFactoryImpl() {
+ this.ecBlockStreamFactory =
+ ECBlockInputStreamFactoryImpl.getInstance(this);
+ }
+
/**
* Create a new BlockInputStream based on the replication Config. If the
* replication Config indicates the block is EC, then it will create an
@@ -59,8 +66,9 @@ public class BlockInputStreamFactoryImpl implements
BlockInputStreamFactory {
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
- return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
- verifyChecksum, xceiverFactory, refreshFunction, this);
+ return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
+ blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
+ ecBlockStreamFactory);
} else {
return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 07333d2..5212eea 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -131,9 +131,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
}
protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
- return (int)Math.min(Math.ceil(
- (double)getBlockInfo().getLength() / rConfig.getEcChunkSize()),
- rConfig.getData());
+ return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength());
}
/**
@@ -256,11 +254,16 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
int totalRead = 0;
while(strategy.getTargetLength() > 0 && remaining() > 0) {
- int currentIndex = currentStreamIndex();
- BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
- int read = readFromStream(stream, strategy);
- totalRead += read;
- position += read;
+ try {
+ int currentIndex = currentStreamIndex();
+ BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
+ int read = readFromStream(stream, strategy);
+ totalRead += read;
+ position += read;
+ } catch (IOException ioe) {
+ throw new BadDataLocationException(
+ dataLocations[currentStreamIndex()], ioe);
+ }
}
return totalRead;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
similarity index 52%
copy from
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
copy to
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index 1d372a7..6c39e93 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -18,53 +18,41 @@
package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
+import java.util.List;
import java.util.function.Function;
/**
- * Factory class to create various BlockStream instances.
+ * Interface used by factories which create ECBlockInput streams for
+ * reconstruction or non-reconstruction reads.
*/
-public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
-
- public static BlockInputStreamFactory getInstance() {
- return new BlockInputStreamFactoryImpl();
- }
+public interface ECBlockInputStreamFactory {
/**
- * Create a new BlockInputStream based on the replication Config. If the
- * replication Config indicates the block is EC, then it will create an
- * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+ * Create a new EC InputStream based on the missingLocations boolean. If it
is
+ * set to false, it indicates all locations are available and an
+ * ECBlockInputStream will be created. Otherwise an
+ * ECBlockReconstructedInputStream will be created.
+ * @param missingLocations Indicates if all the data locations are available
+ * or not, controlling the type of stream created
+ * @param failedLocations List of DatanodeDetails indicating locations we
+ * know are bad and should not be used.
* @param repConfig The replication Config
* @param blockInfo The blockInfo representing the block.
- * @param pipeline The pipeline to be used for reading the block
- * @param token The block Access Token
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
* @param refreshFunction Function to refresh the pipeline if needed
* @return BlockExtendedInputStream of the correct type.
*/
- public BlockExtendedInputStream create(ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ BlockExtendedInputStream create(boolean missingLocations,
+ List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
- if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
- return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
- verifyChecksum, xceiverFactory, refreshFunction, this);
- } else {
- return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
- pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
- }
- }
-
+ Function<BlockID, Pipeline> refreshFunction);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
similarity index 50%
copy from
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
copy to
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index 1d372a7..75956c4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -20,50 +20,68 @@ package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
-import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.apache.hadoop.security.token.Token;
+import java.util.List;
import java.util.function.Function;
/**
* Factory class to create various BlockStream instances.
*/
-public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
+public final class ECBlockInputStreamFactoryImpl implements
+ ECBlockInputStreamFactory {
- public static BlockInputStreamFactory getInstance() {
- return new BlockInputStreamFactoryImpl();
+ private final BlockInputStreamFactory inputStreamFactory;
+
+ public static ECBlockInputStreamFactory getInstance(
+ BlockInputStreamFactory streamFactory) {
+ return new ECBlockInputStreamFactoryImpl(streamFactory);
+ }
+
+ private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory)
{
+ this.inputStreamFactory = streamFactory;
}
/**
- * Create a new BlockInputStream based on the replication Config. If the
- * replication Config indicates the block is EC, then it will create an
- * ECBlockInputStream, otherwise a BlockInputStream will be returned.
+ * Create a new EC InputStream based on the missingLocations boolean. If it
is
+ * set to false, it indicates all locations are available and an
+ * ECBlockInputStream will be created. Otherwise an
+ * ECBlockReconstructedInputStream will be created.
+ * @param missingLocations Indicates if all the data locations are available
+ * or not, controlling the type of stream created
+ * @param failedLocations List of DatanodeDetails indicating locations we
+ * know are bad and should not be used.
* @param repConfig The replication Config
* @param blockInfo The blockInfo representing the block.
- * @param pipeline The pipeline to be used for reading the block
- * @param token The block Access Token
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
* @param refreshFunction Function to refresh the pipeline if needed
* @return BlockExtendedInputStream of the correct type.
*/
- public BlockExtendedInputStream create(ReplicationConfig repConfig,
- OmKeyLocationInfo blockInfo, Pipeline pipeline,
- Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+ public BlockExtendedInputStream create(boolean missingLocations,
+ List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
- if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
- return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
- verifyChecksum, xceiverFactory, refreshFunction, this);
+ if (missingLocations) {
+ // We create the reconstruction reader
+ ECBlockReconstructedStripeInputStream sis =
+ new ECBlockReconstructedStripeInputStream(
+ (ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
+ xceiverFactory, refreshFunction, inputStreamFactory);
+ if (failedLocations != null) {
+ sis.addFailedDatanodes(failedLocations);
+ }
+ return new ECBlockReconstructedInputStream(
+ (ECReplicationConfig) repConfig, sis);
} else {
- return new BlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(),
- pipeline, token, verifyChecksum, xceiverFactory, refreshFunction);
+ // Otherwise create the more efficient non-reconstruction reader
+ return new ECBlockInputStream((ECReplicationConfig)repConfig, blockInfo,
+ verifyChecksum, xceiverFactory, refreshFunction, inputStreamFactory);
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
new file mode 100644
index 0000000..2ed173c
--- /dev/null
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Top level class used to read data from EC Encoded blocks. This class
decides,
+ * based on the block availability, whether to use a reconstruction or non
+ * reconstruction read and also handles errors from the non-reconstruction
reads
+ * failing over to a reconstruction read when they happen.
+ */
+public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECBlockInputStreamProxy.class);
+
+ private final ECReplicationConfig repConfig;
+ private final boolean verifyChecksum;
+ private final XceiverClientFactory xceiverClientFactory;
+ private final Function<BlockID, Pipeline> refreshFunction;
+ private final OmKeyLocationInfo blockInfo;
+ private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
+
+ private BlockExtendedInputStream blockReader;
+ private boolean reconstructionReader = false;
+ private List<DatanodeDetails> failedLocations = new ArrayList<>();
+
+ /**
+ * Given the ECReplicationConfig and the block length, calculate how many
+ * data locations the block should have.
+ * @param repConfig The EC Replication Config
+ * @param blockLength The length of the data block in bytes
+ * @return The number of expected data locations
+ */
+ public static int expectedDataLocations(ECReplicationConfig repConfig,
+ long blockLength) {
+ return (int)Math.min(
+ Math.ceil((double)blockLength / repConfig.getEcChunkSize()),
+ repConfig.getData());
+ }
+
+ /**
+ * From ECReplicationConfig and Pipeline with the block locations and
location
+ * indexes, determine the number of data locations available.
+ * @param repConfig The EC Replication Config
+ * @param pipeline The pipeline for the data block, givings its locations and
+ * the index of each location.
+ * @return The number of locations available
+ */
+ public static int availableDataLocations(ECReplicationConfig repConfig,
+ Pipeline pipeline) {
+ Set<Integer> locations = new HashSet<>();
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ int index = pipeline.getReplicaIndex(dn);
+ if (index > 0 && index <= repConfig.getData()) {
+ locations.add(index);
+ }
+ }
+ return locations.size();
+ }
+
+ public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+ this.repConfig = repConfig;
+ this.verifyChecksum = verifyChecksum;
+ this.blockInfo = blockInfo;
+ this.ecBlockInputStreamFactory = streamFactory;
+ this.xceiverClientFactory = xceiverClientFactory;
+ this.refreshFunction = refreshFunction;
+
+ setReaderType();
+ createBlockReader();
+ }
+
+ private synchronized void setReaderType() {
+ int expected = expectedDataLocations(repConfig, getLength());
+ int available = availableDataLocations(repConfig, blockInfo.getPipeline());
+ reconstructionReader = available < expected;
+ }
+
+ private void createBlockReader() {
+ blockReader = ecBlockInputStreamFactory.create(reconstructionReader,
+ failedLocations, repConfig, blockInfo, verifyChecksum,
+ xceiverClientFactory, refreshFunction);
+ }
+
+ @Override
+ public synchronized BlockID getBlockID() {
+ return blockInfo.getBlockID();
+ }
+
+ @Override
+ public synchronized long getRemaining() {
+ return blockReader.getRemaining();
+ }
+
+ @Override
+ public synchronized long getLength() {
+ return blockInfo.getLength();
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len)
+ throws IOException {
+ return read(ByteBuffer.wrap(b, off, len));
+ }
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ if (blockReader.getRemaining() == 0) {
+ return EOF;
+ }
+ int totalRead = 0;
+ long lastPosition = 0;
+ try {
+ while (buf.hasRemaining() && getRemaining() > 0) {
+ buf.mark();
+ lastPosition = blockReader.getPos();
+ totalRead += blockReader.read(buf);
+ }
+ } catch (IOException e) {
+ if (reconstructionReader) {
+ // If we get an error from the reconstruction reader, there
+ // is nothing left to try. It will re-try until it has insufficient
+ // locations internally, so if an error comes here, just re-throw it.
+ throw e;
+ }
+ if (e instanceof BadDataLocationException) {
+ LOG.warn("Failing over to reconstruction read due to an error in " +
+ "ECBlockReader", e);
+ failoverToReconstructionRead(
+ ((BadDataLocationException) e).getFailedLocation(), lastPosition);
+ buf.reset();
+ totalRead += read(buf);
+ } else {
+ throw e;
+ }
+ }
+ return totalRead;
+ }
+
+ private synchronized void failoverToReconstructionRead(
+ DatanodeDetails badLocation, long lastPosition) throws IOException {
+ if (badLocation != null) {
+ failedLocations.add(badLocation);
+ }
+ blockReader.close();
+ reconstructionReader = true;
+ createBlockReader();
+ if (lastPosition != 0) {
+ blockReader.seek(lastPosition);
+ }
+ }
+
+ /**
+ * Should never be called in this class.
+ */
+ @Override
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
+ throw new IOException("Not Implemented");
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ blockReader.unbuffer();
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return blockReader != null ? blockReader.getPos() : 0;
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ try {
+ blockReader.seek(pos);
+ } catch (IOException e) {
+ if (reconstructionReader) {
+ throw e;
+ }
+ failoverToReconstructionRead(null, pos);
+ }
+ }
+}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index b6478c5..a918eb4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -120,6 +120,33 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
}
+ /**
+ * Provide a list of datanodes that are known to be bad, and no attempt will
+ * be made to read from them. If too many failed nodes are passed, then the
+ * reader may not have sufficient locations available to reconstruct the
data.
+ *
+ * Note this call must be made before any attempt it made to read data,
+ * as that is when the reader is initialized. Attempting to call this method
+ * after a read will result in a runtime exception.
+ *
+ * @param dns A list of DatanodeDetails that are known to be bad.
+ */
+ public void addFailedDatanodes(List<DatanodeDetails> dns) {
+ if (initialized) {
+ throw new RuntimeException("Cannot add failed datanodes after the " +
+ "reader has been initialized");
+ }
+ DatanodeDetails[] locations = getDataLocations();
+ for (DatanodeDetails dn : dns) {
+ for (int i = 0; i < locations.length; i++) {
+ if (locations[i] != null && locations[i].equals(dn)) {
+ failedDataIndexes.add(i);
+ break;
+ }
+ }
+ }
+ }
+
protected void init() throws InsufficientLocationsException {
if (!hasSufficientLocations()) {
throw new InsufficientLocationsException("There are insufficient " +
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index aabec31..c2f51c2 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -125,6 +125,12 @@ public final class ECStreamTestUtil {
}
}
+ public static void randomFill(ByteBuffer buf, SplittableRandom rand) {
+ while (buf.remaining() > 0) {
+ buf.put((byte) rand.nextInt(255));
+ }
+ }
+
private static int totalSpaceAvailable(ByteBuffer[] bufs) {
int space = 0;
for (ByteBuffer b : bufs) {
@@ -239,7 +245,7 @@ public final class ECStreamTestUtil {
int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
- blockStreamData.get(repInd - 1));
+ blockStreamData.get(repInd - 1), repInd);
blockStreams.add(stream);
return stream;
}
@@ -256,12 +262,22 @@ public final class ECStreamTestUtil {
private BlockID blockID;
private long length;
private boolean shouldError = false;
+ private int shouldErrorPosition = 0;
+ private boolean shouldErrorOnSeek = false;
+ private IOException errorToThrow = null;
+ private int ecReplicaIndex = 0;
private static final byte EOF = -1;
TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
+ this(blockId, blockLen, data, 0);
+ }
+
+ TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data,
+ int replicaIndex) {
this.blockID = blockId;
this.length = blockLen;
this.data = data;
+ this.ecReplicaIndex = replicaIndex;
data.position(0);
}
@@ -269,8 +285,24 @@ public final class ECStreamTestUtil {
return closed;
}
+ public void setShouldErrorOnSeek(boolean val) {
+ this.shouldErrorOnSeek = val;
+ }
+
public void setShouldError(boolean val) {
shouldError = val;
+ shouldErrorPosition = 0;
+ }
+
+ public void setShouldError(boolean val, int position,
+ IOException errorThrowable) {
+ this.shouldError = val;
+ this.shouldErrorPosition = position;
+ this.errorToThrow = errorThrowable;
+ }
+
+ public int getEcReplicaIndex() {
+ return ecReplicaIndex;
}
@Override
@@ -296,19 +328,30 @@ public final class ECStreamTestUtil {
@Override
public int read(ByteBuffer buf) throws IOException {
- if (shouldError) {
- throw new IOException("Simulated error reading block");
+ if (shouldError && data.position() >= shouldErrorPosition) {
+ throwError();
}
if (getRemaining() == 0) {
return EOF;
}
int toRead = Math.min(buf.remaining(), (int)getRemaining());
for (int i = 0; i < toRead; i++) {
+ if (shouldError && data.position() >= shouldErrorPosition) {
+ throwError();
+ }
buf.put(data.get());
}
return toRead;
};
+ private void throwError() throws IOException {
+ if (errorToThrow != null) {
+ throw errorToThrow;
+ } else {
+ throw new IOException("Simulated error reading block");
+ }
+ }
+
@Override
protected int readWithStrategy(ByteReaderStrategy strategy) throws
IOException {
@@ -330,7 +373,10 @@ public final class ECStreamTestUtil {
}
@Override
- public void seek(long pos) {
+ public void seek(long pos) throws IOException {
+ if (shouldErrorOnSeek) {
+ throw new IOException("Simulated exception");
+ }
data.position((int)pos);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
index 84e1817..badb33b 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@ -30,7 +30,7 @@ import
org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
-import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.junit.Test;
import org.junit.Assert;
@@ -73,7 +73,7 @@ public class TestBlockInputStreamFactoryImpl {
BlockExtendedInputStream stream =
factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
blockInfo.getToken(), true, null, null);
- Assert.assertTrue(stream instanceof ECBlockInputStream);
+ Assert.assertTrue(stream instanceof ECBlockInputStreamProxy);
Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
Assert.assertEquals(stream.getLength(), blockInfo.getLength());
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
index 5a29626..d681c61 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -348,6 +349,34 @@ public class TestECBlockInputStream {
}
}
+ @Test
+ public void testErrorReadingBlockReportsBadLocation() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ OmKeyLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ // Read a full stripe to ensure all streams are created in the stream
+ // factory
+ ByteBuffer buf = ByteBuffer.allocate(3 * ONEMB);
+ int read = ecb.read(buf);
+ Assert.assertEquals(3 * ONEMB, read);
+ // Now make replication index 2 error on the next read
+ streamFactory.getBlockStreams().get(1).setThrowException(true);
+ buf.clear();
+ try {
+ ecb.read(buf);
+ Assert.fail("Exception should be thrown");
+ } catch (IOException e) {
+ Assert.assertTrue(e instanceof BadDataLocationException);
+ Assert.assertEquals(2,
+ keyInfo.getPipeline().getReplicaIndex(
+ ((BadDataLocationException) e).getFailedLocation()));
+ }
+ }
+ }
+
private void validateBufferContents(ByteBuffer buf, int from, int to,
byte val) {
for (int i=from; i<to; i++){
@@ -384,6 +413,7 @@ public class TestECBlockInputStream {
private byte dataVal = 1;
private BlockID blockID;
private long length;
+ private boolean throwException = false;
private static final byte EOF = -1;
@SuppressWarnings("checkstyle:parameternumber")
@@ -397,6 +427,10 @@ public class TestECBlockInputStream {
return closed;
}
+ public void setThrowException(boolean shouldThrow) {
+ this.throwException = shouldThrow;
+ }
+
@Override
public BlockID getBlockID() {
return blockID;
@@ -419,11 +453,15 @@ public class TestECBlockInputStream {
}
@Override
- public int read(ByteBuffer buf) {
+ public int read(ByteBuffer buf) throws IOException {
if (getRemaining() == 0) {
return EOF;
}
+ if (throwException) {
+ throw new IOException("Simulated exception");
+ }
+
int toRead = Math.min(buf.remaining(), (int)getRemaining());
for (int i=0; i<toRead; i++) {
buf.put(dataVal);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
new file mode 100644
index 0000000..f9b5f38
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockInputStreamProxy.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.ozone.client.io.BadDataLocationException;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SplittableRandom;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+/**
+ * Unit tests for the ECBlockInputStreamProxy class.
+ */
+public class TestECBlockInputStreamProxy {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestECBlockInputStreamProxy.class);
+
+ private static final int ONEMB = 1024 * 1024;
+ private ECReplicationConfig repConfig;
+ private TestECBlockInputStreamFactory streamFactory;
+
+ private long randomSeed;
+ private ThreadLocalRandom random = ThreadLocalRandom.current();
+ private SplittableRandom dataGenerator;
+
+ @Before
+ public void setup() {
+ repConfig = new ECReplicationConfig(3, 2);
+ streamFactory = new TestECBlockInputStreamFactory();
+ randomSeed = random.nextLong();
+ dataGenerator = new SplittableRandom(randomSeed);
+ }
+
+ @Test
+ public void testExpectedDataLocations() {
+ Assert.assertEquals(1,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+ Assert.assertEquals(2,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+ Assert.assertEquals(3,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+ Assert.assertEquals(3,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+
+ repConfig = new ECReplicationConfig(6, 3);
+ Assert.assertEquals(1,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
+ Assert.assertEquals(2,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
+ Assert.assertEquals(3,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
+ Assert.assertEquals(6,
+ ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
+ }
+
+ @Test
+ public void testAvailableDataLocations() {
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+ Assert.assertEquals(2, ECBlockInputStreamProxy.availableDataLocations(
+ repConfig, blockInfo.getPipeline()));
+
+ dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
+ blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+ Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
+ repConfig, blockInfo.getPipeline()));
+
+ dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
+ Assert.assertEquals(3, ECBlockInputStreamProxy.availableDataLocations(
+ repConfig, blockInfo.getPipeline()));
+ }
+
+ @Test
+ public void testBlockIDCanBeRetrieved() throws IOException {
+ int blockLength = 1234;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ Assert.assertEquals(blockInfo.getBlockID(), bis.getBlockID());
+ }
+ }
+
+ @Test
+ public void testBlockLengthCanBeRetrieved() throws IOException {
+ int blockLength = 1234;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ Assert.assertEquals(1234, bis.getLength());
+ }
+ }
+
+ @Test
+ public void testBlockRemainingCanBeRetrieved() throws IOException {
+ int blockLength = 12345;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ dataGenerator = new SplittableRandom(randomSeed);
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ Assert.assertEquals(12345, bis.getRemaining());
+ Assert.assertEquals(0, bis.getPos());
+ bis.read(readBuffer);
+ Assert.assertEquals(12345 - 100, bis.getRemaining());
+ Assert.assertEquals(100, bis.getPos());
+ }
+ }
+
+ @Test
+ public void testCorrectStreamCreatedDependingOnDataLocations()
+ throws IOException {
+ int blockLength = 5 * ONEMB;
+ ByteBuffer data = generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ // Not all locations present, so we expect on;y the "missing=true" stream
+ // to be present.
+ Assert.assertTrue(streamFactory.getStreams().containsKey(false));
+ Assert.assertFalse(streamFactory.getStreams().containsKey(true));
+ }
+
+ streamFactory = new TestECBlockInputStreamFactory();
+ streamFactory.setData(data);
+ dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+ blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ // Not all locations present, so we expect on;y the "missing=true" stream
+ // to be present.
+ Assert.assertFalse(streamFactory.getStreams().containsKey(false));
+ Assert.assertTrue(streamFactory.getStreams().containsKey(true));
+ }
+ }
+
+ @Test
+ public void testCanReadNonReconstructionToEOF()
+ throws IOException {
+ int blockLength = 5 * ONEMB;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ dataGenerator = new SplittableRandom(randomSeed);
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ while(true) {
+ int read = bis.read(readBuffer);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+ readBuffer.clear();
+ if (read < 100) {
+ break;
+ }
+ }
+ readBuffer.clear();
+ int read = bis.read(readBuffer);
+ Assert.assertEquals(-1, read);
+ }
+ }
+
+ @Test
+ public void testCanReadReconstructionToEOF()
+ throws IOException {
+ int blockLength = 5 * ONEMB;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ dataGenerator = new SplittableRandom(randomSeed);
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ while(true) {
+ int read = bis.read(readBuffer);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+ readBuffer.clear();
+ if (read < 100) {
+ break;
+ }
+ }
+ readBuffer.clear();
+ int read = bis.read(readBuffer);
+ Assert.assertEquals(-1, read);
+ }
+ }
+
+ @Test
+ public void testCanHandleErrorAndFailOverToReconstruction()
+ throws IOException {
+ int blockLength = 5 * ONEMB;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ DatanodeDetails badDN = blockInfo.getPipeline().getFirstNode();
+
+ dataGenerator = new SplittableRandom(randomSeed);
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ // Perform one read to get the stream created
+ int read = bis.read(readBuffer);
+ Assert.assertEquals(100, read);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+ // Setup an error to be thrown part through a read, so the dataBuffer
+ // will have been advanced by 50 bytes before the error. This tests it
+ // correctly rewinds and the same data is loaded again from the other
+ // stream.
+ streamFactory.getStreams().get(false).setShouldError(true, 151,
+ new BadDataLocationException(badDN, "Simulated Error"));
+ while(true) {
+ readBuffer.clear();
+ read = bis.read(readBuffer);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+ if (read < 100) {
+ break;
+ }
+ }
+ readBuffer.clear();
+ read = bis.read(readBuffer);
+ Assert.assertEquals(-1, read);
+ // Ensure the bad location was passed into the factory to create the
+ // reconstruction reader
+ Assert.assertEquals(badDN, streamFactory.getFailedLocations().get(0));
+ }
+ }
+
+ @Test
+ public void testCanSeekToNewPosition() throws IOException {
+ int blockLength = 5 * ONEMB;
+ generateData(blockLength);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo blockInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+
+ ByteBuffer readBuffer = ByteBuffer.allocate(100);
+ dataGenerator = new SplittableRandom(randomSeed);
+ try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
+ // Perform one read to get the stream created
+ int read = bis.read(readBuffer);
+ Assert.assertEquals(100, read);
+
+ bis.seek(1024);
+ readBuffer.clear();
+ resetAndAdvanceDataGenerator(1024);
+ bis.read(readBuffer);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+ Assert.assertEquals(1124, bis.getPos());
+
+ // Set the non-reconstruction reader to thrown an exception on seek
+ streamFactory.getStreams().get(false).setShouldErrorOnSeek(true);
+ bis.seek(2048);
+ readBuffer.clear();
+ resetAndAdvanceDataGenerator(2048);
+ bis.read(readBuffer);
+ ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
+
+ // Finally, set the recon reader to fail on seek.
+ streamFactory.getStreams().get(true).setShouldErrorOnSeek(true);
+ try {
+ bis.seek(1024);
+ Assert.fail("Seek should have raised an exception");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+ }
+
+ private ByteBuffer generateData(int length) {
+ ByteBuffer data = ByteBuffer.allocate(length);
+ ECStreamTestUtil.randomFill(data, dataGenerator);
+ streamFactory.setData(data);
+ return data;
+ }
+
+ private void resetAndAdvanceDataGenerator(long position) {
+ dataGenerator = new SplittableRandom(randomSeed);
+ for (long i = 0; i < position; i++) {
+ dataGenerator.nextInt(255);
+ }
+ }
+
+ private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
+ OmKeyLocationInfo blockInfo) {
+ return new ECBlockInputStreamProxy(
+ rConfig, blockInfo, true, null, null, streamFactory);
+ }
+
+ private static class TestECBlockInputStreamFactory
+ implements ECBlockInputStreamFactory {
+
+ private ByteBuffer data;
+
+ private Map<Boolean, ECStreamTestUtil.TestBlockInputStream> streams
+ = new HashMap<>();
+
+ private List<DatanodeDetails> failedLocations;
+
+ public void setData(ByteBuffer data) {
+ this.data = data;
+ }
+
+ public Map<Boolean, ECStreamTestUtil.TestBlockInputStream> getStreams() {
+ return streams;
+ }
+
+ public List<DatanodeDetails> getFailedLocations() {
+ return failedLocations;
+ }
+
+ @Override
+ public BlockExtendedInputStream create(boolean missingLocations,
+ List<DatanodeDetails> failedDatanodes,
+ ReplicationConfig repConfig, OmKeyLocationInfo blockInfo,
+ boolean verifyChecksum, XceiverClientFactory xceiverFactory,
+ Function<BlockID, Pipeline> refreshFunction) {
+ this.failedLocations = failedDatanodes;
+ ByteBuffer wrappedBuffer =
+ ByteBuffer.wrap(data.array(), 0, data.capacity());
+ ECStreamTestUtil.TestBlockInputStream is =
+ new ECStreamTestUtil.TestBlockInputStream(blockInfo.getBlockID(),
+ blockInfo.getLength(), wrappedBuffer);
+ streams.put(missingLocations, is);
+ return is;
+ }
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 875d2a7..0dda350 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -65,7 +65,7 @@ public class TestECBlockReconstructedStripeInputStream {
}
@Test
- public void testSufficientLocations() {
+ public void testSufficientLocations() throws IOException {
// One chunk, only 1 location.
OmKeyLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 1, ONEMB);
@@ -94,6 +94,11 @@ public class TestECBlockReconstructedStripeInputStream {
new ECBlockReconstructedStripeInputStream(repConfig,
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertTrue(ecb.hasSufficientLocations());
+ // Set a failed location
+ List<DatanodeDetails> failed = new ArrayList<>();
+ failed.add(keyInfo.getPipeline().getFirstNode());
+ ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
+ Assert.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3 and parity 1.
@@ -104,6 +109,25 @@ public class TestECBlockReconstructedStripeInputStream {
keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
+
+ // Three Chunks, all available but fail 3
+ dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
+ try (ECBlockInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig,
+ keyInfo, true, null, null, new TestBlockInputStreamFactory())) {
+ Assert.assertTrue(ecb.hasSufficientLocations());
+ // Set a failed location
+ List<DatanodeDetails> failed = new ArrayList<>();
+ for (DatanodeDetails dn : dnMap.keySet()) {
+ failed.add(dn);
+ if (failed.size() == 3) {
+ break;
+ }
+ }
+ ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
+ Assert.assertFalse(ecb.hasSufficientLocations());
+ }
}
@Test
@@ -522,6 +546,55 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ @Test
+ public void testFailedLocationsAreNotRead() throws IOException {
+ // Generate the input data for 3 full stripes and generate the parity.
+ int chunkSize = repConfig.getEcChunkSize();
+ int partialStripeSize = chunkSize * 2 - 1;
+ int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 *
chunkSize);
+ ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
+ OmKeyLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ stripeSize() * 3 + partialStripeSize, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ dataGen = new SplittableRandom(randomSeed);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ List<DatanodeDetails> failed = new ArrayList<>();
+ // Set the first 3 DNs as failed
+ for (Map.Entry<DatanodeDetails, Integer> e : dnMap.entrySet()) {
+ if (e.getValue() <= 2) {
+ failed.add(e.getKey());
+ }
+ }
+ ecb.addFailedDatanodes(failed);
+
+ // Read full stripe
+ int read = ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
+ }
+ Assert.assertEquals(stripeSize(), read);
+
+ // Now ensure that streams with repIndexes 1, 2 and 3 have not been
+ // created in the stream factory, indicating we did not read them.
+ List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
+ for (TestBlockInputStream stream : streams) {
+ Assert.assertTrue(stream.getEcReplicaIndex() > 2);
+ }
+ }
+ }
+
private List<Integer> indexesToList(int... indexes) {
List<Integer> list = new ArrayList<>();
for (int i : indexes) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]