Repository: hadoop Updated Branches: refs/heads/ozone-0.2 1b3f1b89a -> 435c3eacf
HDDS-419. ChunkInputStream bulk read api does not read from all the chunks. Contributed by Lokesh Jain and Mukul Kumar. (cherry picked from commit 6f037468bce7bbda6b9fc01166f2c61ae40b690b) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/435c3eac Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/435c3eac Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/435c3eac Branch: refs/heads/ozone-0.2 Commit: 435c3eacff429ae068fdb20f901d92d7277b54d5 Parents: 1b3f1b8 Author: Xiaoyu Yao <[email protected]> Authored: Fri Sep 14 13:34:29 2018 -0700 Committer: Xiaoyu Yao <[email protected]> Committed: Fri Sep 14 13:46:16 2018 -0700 ---------------------------------------------------------------------- .../hdds/scm/storage/ChunkInputStream.java | 30 +++++++++++++++----- .../ozone/client/io/ChunkGroupInputStream.java | 25 ++++++++-------- .../hadoop/ozone/freon/TestDataValidate.java | 4 +-- .../hadoop/ozone/freon/RandomKeyGenerator.java | 7 +++-- 4 files changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/435c3eac/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index a969b68..a483197 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -121,12 +121,17 @@ public class ChunkInputStream extends InputStream implements Seekable { return 0; } checkOpen(); - int available = prepareRead(len); - if (available == EOF) { - return EOF; + int total = 0; + while (len > 0) { + int available = prepareRead(len); + if (available == EOF) { + return total != 0 ? total : EOF; + } + buffers.get(bufferIndex).get(b, off + total, available); + len -= available; + total += available; } - buffers.get(bufferIndex).get(b, off, available); - return available; + return total; } @Override @@ -196,13 +201,20 @@ public class ChunkInputStream extends InputStream implements Seekable { // next chunk chunkIndex += 1; final ReadChunkResponseProto readChunkResponse; + final ChunkInfo chunkInfo = chunks.get(chunkIndex); try { - readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient, - chunks.get(chunkIndex), blockID, traceID); + readChunkResponse = ContainerProtocolCalls + .readChunk(xceiverClient, chunkInfo, blockID, traceID); } catch (IOException e) { throw new IOException("Unexpected OzoneException: " + e.toString(), e); } ByteString byteString = readChunkResponse.getData(); + if (byteString.size() != chunkInfo.getLen()) { + // Bytes read from chunk should be equal to chunk size. + throw new IOException(String + .format("Inconsistent read for chunk=%s len=%d bytesRead=%d", + chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size())); + } buffers = byteString.asReadOnlyByteBufferList(); bufferIndex = 0; } @@ -260,4 +272,8 @@ public class ChunkInputStream extends InputStream implements Seekable { public boolean seekToNewSource(long targetPos) throws IOException { return false; } + + public BlockID getBlockID() { + return blockID; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/435c3eac/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java index 742cfcc..94966f6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java @@ -115,19 +115,20 @@ public class ChunkGroupInputStream extends InputStream implements Seekable { return totalReadLen == 0 ? EOF : totalReadLen; } ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex); - int readLen = Math.min(len, (int)current.getRemaining()); - int actualLen = current.read(b, off, readLen); - // this means the underlying stream has nothing at all, return - if (actualLen == EOF) { - return totalReadLen > 0 ? totalReadLen : EOF; + int numBytesToRead = Math.min(len, (int)current.getRemaining()); + int numBytesRead = current.read(b, off, numBytesToRead); + if (numBytesRead != numBytesToRead) { + // This implies that there is either data loss or corruption in the + // chunk entries. Even EOF in the current stream would be covered in + // this case. + throw new IOException(String.format( + "Inconsistent read for blockID=%s length=%d numBytesRead=%d", + current.chunkInputStream.getBlockID(), current.length, + numBytesRead)); } - totalReadLen += actualLen; - // this means there is no more data to read beyond this point, return - if (actualLen != readLen) { - return totalReadLen; - } - off += readLen; - len -= readLen; + totalReadLen += numBytesRead; + off += numBytesRead; + len -= numBytesRead; if (current.getRemaining() <= 0) { currentStreamIndex += 1; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/435c3eac/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java index fdce736..a2df50d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java @@ -68,7 +68,7 @@ public class TestDataValidate { randomKeyGenerator.setNumOfKeys(1); randomKeyGenerator.setType(ReplicationType.RATIS); randomKeyGenerator.setFactor(ReplicationFactor.THREE); - randomKeyGenerator.setKeySize(104857600); + randomKeyGenerator.setKeySize(20971520); randomKeyGenerator.setValidateWrites(true); randomKeyGenerator.call(); Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); @@ -84,7 +84,7 @@ public class TestDataValidate { randomKeyGenerator.setNumOfVolumes(1); randomKeyGenerator.setNumOfBuckets(1); randomKeyGenerator.setNumOfKeys(1); - randomKeyGenerator.setKeySize(104857600); + randomKeyGenerator.setKeySize(20971520); randomKeyGenerator.setValidateWrites(true); randomKeyGenerator.call(); Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/435c3eac/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index ee4cc87..d73e37e 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -984,9 +985,9 @@ public final class RandomKeyGenerator implements Callable<Void> { writeValidationFailureCount++; LOG.warn("Data validation error for key {}/{}/{}", kv.bucket.getVolumeName(), kv.bucket, kv.key); - LOG.warn("Expected: {}, Actual: {}", - DFSUtil.bytes2String(kv.value), - DFSUtil.bytes2String(value)); + LOG.warn("Expected checksum: {}, Actual checksum: {}", + DigestUtils.md5Hex(kv.value), + DigestUtils.md5Hex(value)); } } } catch (IOException | InterruptedException ex) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
