sodonnel commented on a change in pull request #2797: URL: https://github.com/apache/ozone/pull/2797#discussion_r745537916
########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java ########## @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.XceiverClientFactory; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream; +import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.ozone.erasurecode.CodecRegistry; +import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.ratis.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.function.Function; + +/** + * Class to read EC encoded data from blocks a stripe at a time, when some of + * the data blocks are not available. The public API for this class is: + * + * readStripe(ByteBuffer[] bufs) + * + * The other inherited public APIs will throw a NotImplementedException. This is + * because this class is intended to only read full stripes into a reusable set + * of bytebuffers, and the tradition read APIs do not facilitate this. + * + * The caller should pass an array of ByteBuffers to readStripe() which: + * + * 1. Have EC DataNum buffers in the array. + * 2. Each buffer should have its position set to zero + * 3. Each buffer should have ecChunkSize remaining + * + * These buffers are either read into directly from the data blocks on the + * datanodes, or they will be reconstructed from parity data using the EC + * decoder. + * + * The EC Decoder expects to receive an array of elements matching EC Data + EC + * Parity elements long. Missing or not needed elements should be set to null + * in the array. The elements should be assigned to the array in EC index order. + * + * Assuming we have n missing data locations, where n <= parity locations, the + * ByteBuffers passed in from the client are either assigned to the decoder + * input array, or they are assigned to the decoder output array, where + * reconstructed data is written. The required number of parity buffers will be + * assigned and added to the decoder input so it has sufficient locations to + * reconstruct the data. After reconstruction the byte buffers received will + * have the data for a full stripe populated, either by reading directly from + * the block or by reconstructing the data. + * + * The buffers are returned "ready to read" with the position at zero and + * remaining() indicating how much data was read. If the remaining data is less + * than a full stripe, the client can simply read upto remaining from each + * buffer in turn. If there is a full stripe, each buffer should have ecChunk + * size remaining. + */ +public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(ECBlockReconstructedStripeInputStream.class); + + // List of buffers, data + parity long, needed by the EC decoder. Missing + // or not-need locations will be null. + private ByteBuffer[] decoderInputBuffers; + // Missing chunks are recovered into these buffers. + private ByteBuffer[] decoderOutputBuffers; + // Missing indexes to be recovered into the recovered buffers. Required by the + // EC decoder + private int[] missingIndexes; + // The blockLocation indexes to use to read data into the dataBuffers. + private List<Integer> dataIndexes = new ArrayList<>(); + + private final RawErasureDecoder decoder; + + private boolean initialized = false; + + public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig, + OmKeyLocationInfo blockInfo, boolean verifyChecksum, + XceiverClientFactory xceiverClientFactory, Function<BlockID, + Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) { + super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory, + refreshFunction, streamFactory); + + decoder = CodecRegistry.getInstance() + .getCodecFactory(repConfig.getCodec().toString()) + .createDecoder(repConfig); + } + + protected void init() throws InsufficientLocationsException { + if (!hasSufficientLocations()) { + throw new InsufficientLocationsException("There are not enough " + + "datanodes to read the EC block"); + } + + ECReplicationConfig repConfig = getRepConfig(); + // The EC decoder needs an array data+parity long, with missing or not + // needed indexes set to null. + decoderInputBuffers = new ByteBuffer[ + getRepConfig().getData() + getRepConfig().getParity()]; + DatanodeDetails[] locations = getDataLocations(); + setMissingIndexesAndDataLocations(locations); + List<Integer> parityIndexes = + selectParityIndexes(locations, missingIndexes.length); + // We read from the selected parity blocks, so add them to the data indexes. + dataIndexes.addAll(parityIndexes); + // The decoder inputs originally start as all nulls. Then we populate the + // pieces we have data for. The parity buffers are reused for the block + // so we can allocated them now. + for (Integer i : parityIndexes) { + decoderInputBuffers[i] = allocateBuffer(repConfig); + } + decoderOutputBuffers = new ByteBuffer[missingIndexes.length]; + initialized = true; + } + + /** + * Determine which indexes are missing, taking into account the length of the + * block. For a block shorter than a full EC stripe, it is expected that + * some of the data locations will not be present. + * Populates the missingIndex and dataIndexes instance variables. + * @param locations Available locations for the block group + */ + private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) { + ECReplicationConfig repConfig = getRepConfig(); + int expectedDataBlocks = calculateExpectedDataBlocks(repConfig); + List<Integer> missingInd = new ArrayList<>(); + for (int i = 0; i < repConfig.getData(); i++) { + if (locations[i] == null && i < expectedDataBlocks) { + missingInd.add(i); + } else if (locations[i] != null) { + dataIndexes.add(i); + } + } + missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray(); + } + + private void assignBuffers(ByteBuffer[] bufs) { + ECReplicationConfig repConfig = getRepConfig(); + Preconditions.assertTrue(bufs.length == repConfig.getData()); + int recoveryIndex = 0; + // Here bufs come from the caller and will be filled with data read from + // the blocks or recovered. Therefore, if the index is missing, we assign + // the buffer to the decoder outputs, where data is recovered via EC + // decoding. Otherwise the buffer is set to the input. Note, it may be a + // buffer which needs padded. + for (int i = 0; i < repConfig.getData(); i++) { + if (isMissingIndex(i)) { + decoderOutputBuffers[recoveryIndex++] = bufs[i]; + } else { + decoderInputBuffers[i] = bufs[i]; + } + } + } + + private boolean isMissingIndex(int ind) { + for (int i : missingIndexes) { + if (i == ind) { + return true; + } + } + return false; + } + + /** + * This method should be passed a list of byteBuffers which must contain EC + * Data Number entries. Each Bytebuffer should be at position 0 and have EC + * ChunkSize bytes remaining. After returning, the buffers will contain the + * data for the next stripe in the block. The buffers will be returned + * "ready to read" with their position set to zero and the limit set + * according to how much data they contain. + * + * @param bufs A list of byteBuffers which must contain EC Data Number + * entries. Each Bytebuffer should be at position 0 and have + * EC ChunkSize bytes remaining. + * + * @return The number of bytes read + * @throws IOException + */ + public synchronized int readStripe(ByteBuffer[] bufs) throws IOException { + if (!initialized) { + init(); + } + int toRead = (int)Math.min(getRemaining(), getStripeSize()); + if (toRead == 0) { + return EOF; + } + validateBuffers(bufs); + assignBuffers(bufs); + clearParityBuffers(); + loadDataBuffersFromStream(); Review comment: loadDataBuffersFromStream will have some better error handling when seek is implemented (HDDS-5950) to ensure enough bytes are read. HDDS-5951 is for handing failures as the reader is progressing. We could issue reads in parallel here as a further improvement - HDDS-5952 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
