This is an automated email from the ASF dual-hosted git repository.

msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 289debf903e250a0e0d39e7482051d40aa2f1e09
Author: Hanisha Koneru <[email protected]>
AuthorDate: Thu Mar 18 16:30:40 2021 -0700

    HDDS-4552. Read data from chunk into ByteBuffer[] instead of single 
ByteBuffer. (#1685)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  |   2 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   5 +
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  | 205 +++++++++----
 .../hdds/scm/storage/DummyChunkInputStream.java    |  31 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |  29 +-
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   5 +
 .../ContainerCommandResponseBuilders.java          |  82 +++--
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   4 +-
 .../hadoop/hdds/scm/utils/ClientCommandsUtils.java |  47 +++
 .../apache/hadoop/hdds/scm/utils/package-info.java |  23 ++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   2 +-
 .../org/apache/hadoop/ozone/common/Checksum.java   |  48 +++
 .../apache/hadoop/ozone/common/ChunkBuffer.java    |  14 +
 .../common/ChunkBufferImplWithByteBuffer.java      |   7 +
 .../common/ChunkBufferImplWithByteBufferList.java  |  38 ++-
 .../ozone/common/IncrementalChunkBuffer.java       |   7 +
 .../hadoop/ozone/common/utils/BufferUtils.java     |  99 ++++++
 .../hadoop/ozone/common/utils/package-info.java    |  23 ++
 .../ozone/container/common/helpers/ChunkInfo.java  |  12 +
 .../common/src/main/resources/ozone-default.xml    |  13 +
 .../container/common/helpers/ContainerUtils.java   |  16 +-
 .../server/ratis/ContainerStateMachine.java        |  16 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  48 ++-
 .../container/keyvalue/helpers/ChunkUtils.java     |  11 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  |  14 +
 .../keyvalue/impl/ChunkManagerDispatcher.java      |   2 +-
 .../keyvalue/impl/FilePerBlockStrategy.java        |  43 ++-
 .../keyvalue/impl/FilePerChunkStrategy.java        |  40 ++-
 .../keyvalue/interfaces/BlockManager.java          |   2 +
 .../ozone/container/ContainerTestHelper.java       |   1 +
 .../container/common/TestBlockDeletingService.java |   2 +-
 .../container/common/impl/TestHddsDispatcher.java  |  10 +-
 .../container/keyvalue/ChunkLayoutTestInfo.java    |   2 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   3 +-
 .../container/keyvalue/helpers/TestChunkUtils.java |  24 +-
 .../keyvalue/impl/TestFilePerBlockStrategy.java    |   9 +-
 .../src/main/proto/DatanodeClientProtocol.proto    |  17 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     |   5 +
 .../client/rpc/read/TestChunkInputStream.java      | 120 ++++++++
 .../ozone/client/rpc/read/TestInputStreamBase.java | 246 +++++++++++++++
 .../client/rpc/{ => read}/TestKeyInputStream.java  | 336 ++++++---------------
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |   6 +-
 .../hadoop/ozone/freon/DatanodeChunkValidator.java |  22 +-
 .../ozone/genesis/BenchMarkDatanodeDispatcher.java |   3 +-
 .../ozone/genesis/BenchmarkChunkManager.java       |   2 +-
 45 files changed, 1294 insertions(+), 402 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index c2e0148..f39ec86 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -113,7 +113,7 @@ public class OzoneClientConfig {
       type = ConfigType.SIZE,
       description = "Checksum will be computed for every bytes per checksum "
           + "number of bytes and stored sequentially. The minimum value for "
-          + "this config is 256KB.",
+          + "this config is 16KB.",
       tags = ConfigTag.CLIENT)
   private int bytesPerChecksum = 1024 * 1024;
 
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index d088bb4..7e62202 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -502,4 +502,9 @@ public class BlockInputStream extends InputStream
 
     refreshPipeline(cause);
   }
+
+  @VisibleForTesting
+  public synchronized List<ChunkInputStream> getChunkStreams() {
+    return chunkStreams;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 657269d..dc9123d 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -42,8 +46,11 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 import java.util.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link InputStream} called from BlockInputStream to read a chunk from the
@@ -53,6 +60,9 @@ import java.util.function.Supplier;
 public class ChunkInputStream extends InputStream
     implements Seekable, CanUnbuffer {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ChunkInputStream.class);
+
   private ChunkInfo chunkInfo;
   private final long length;
   private final BlockID blockID;
@@ -66,13 +76,21 @@ public class ChunkInputStream extends InputStream
 
   // Index of the buffers corresponding to the current position of the buffers
   private int bufferIndex;
+  // bufferOffsets[i] stores the index of the first data byte in buffer i
+  // (buffers.get(i)) w.r.t first byte in the buffers.
+  // Let's say each buffer has a capacity of 40 bytes. The bufferOffset for
+  // the first buffer would always be 0 as this would be the first data byte
+  // in buffers. BufferOffset for the 2nd buffer would be 40 as bytes 0-39
+  // would be stored in buffer 0. Hence, bufferOffsets[0] = 0,
+  // bufferOffsets[1] = 40, bufferOffsets[2] = 80, etc.
+  private long[] bufferOffsets = null;
 
   // The offset of the current data residing in the buffers w.r.t the start
   // of chunk data
-  private long bufferOffset;
+  private long bufferOffsetWrtChunkData;
 
   // The number of bytes of chunk data residing in the buffers currently
-  private long bufferLength;
+  private long buffersSize;
 
   // Position of the ChunkInputStream is maintained by this variable (if a
   // seek is performed. This position is w.r.t to the chunk only and not the
@@ -197,7 +215,7 @@ public class ChunkInputStream extends InputStream
     if (buffersHavePosition(pos)) {
       // The bufferPosition is w.r.t the current chunk.
       // Adjust the bufferIndex and position to the seeked position.
-      adjustBufferPosition(pos - bufferOffset);
+      adjustBufferPosition(pos - bufferOffsetWrtChunkData);
     } else {
       chunkPosition = pos;
     }
@@ -212,10 +230,13 @@ public class ChunkInputStream extends InputStream
       return length;
     }
     if (buffersHaveData()) {
-      return bufferOffset + buffers.get(bufferIndex).position();
+      // BufferOffset w.r.t to ChunkData + BufferOffset w.r.t buffers +
+      // Position of current Buffer
+      return bufferOffsetWrtChunkData + bufferOffsets[bufferIndex] +
+          buffers.get(bufferIndex).position();
     }
     if (buffersAllocated()) {
-      return bufferOffset + bufferLength;
+      return bufferOffsetWrtChunkData + buffersSize;
     }
     return 0;
   }
@@ -259,7 +280,7 @@ public class ChunkInputStream extends InputStream
         if (buffersHavePosition(chunkPosition)) {
           // The current buffers have the seeked position. Adjust the buffer
           // index and position to point to the chunkPosition.
-          adjustBufferPosition(chunkPosition - bufferOffset);
+          adjustBufferPosition(chunkPosition - bufferOffsetWrtChunkData);
         } else {
           // Read a required chunk data to fill the buffers with seeked
           // position data
@@ -270,7 +291,7 @@ public class ChunkInputStream extends InputStream
         // Data is available from buffers
         ByteBuffer bb = buffers.get(bufferIndex);
         return len > bb.remaining() ? bb.remaining() : len;
-      } else  if (dataRemainingInChunk()) {
+      } else if (dataRemainingInChunk()) {
         // There is more data in the chunk stream which has not
         // been read into the buffers yet.
         readChunkFromContainer(len);
@@ -301,36 +322,38 @@ public class ChunkInputStream extends InputStream
       startByteIndex = chunkPosition;
     } else {
       // Start reading the chunk from the last chunkPosition onwards.
-      startByteIndex = bufferOffset + bufferLength;
+      startByteIndex = bufferOffsetWrtChunkData + buffersSize;
     }
 
-    // bufferOffset and bufferLength are updated below, but if read fails
+    // bufferOffsetWrtChunkData and buffersSize are updated after the data
+    // is read from Container and put into the buffers, but if read fails
     // and is retried, we need the previous position.  Position is reset after
     // successful read in adjustBufferPosition()
     storePosition();
 
+    long adjustedBuffersOffset, adjustedBuffersLen;
     if (verifyChecksum) {
-      // Update the bufferOffset and bufferLength as per the checksum
-      // boundary requirement.
-      computeChecksumBoundaries(startByteIndex, len);
+      // Adjust the chunk offset and length to include required checksum
+      // boundaries
+      Pair<Long, Long> adjustedOffsetAndLength =
+          computeChecksumBoundaries(startByteIndex, len);
+      adjustedBuffersOffset = adjustedOffsetAndLength.getLeft();
+      adjustedBuffersLen = adjustedOffsetAndLength.getRight();
     } else {
       // Read from the startByteIndex
-      bufferOffset = startByteIndex;
-      bufferLength = len;
+      adjustedBuffersOffset = startByteIndex;
+      adjustedBuffersLen = len;
     }
 
     // Adjust the chunkInfo so that only the required bytes are read from
     // the chunk.
     final ChunkInfo adjustedChunkInfo = ChunkInfo.newBuilder(chunkInfo)
-        .setOffset(bufferOffset + chunkInfo.getOffset())
-        .setLen(bufferLength)
+        .setOffset(chunkInfo.getOffset() + adjustedBuffersOffset)
+        .setLen(adjustedBuffersLen)
         .build();
 
-    ByteString byteString = readChunk(adjustedChunkInfo);
-
-    buffers = byteString.asReadOnlyByteBufferList();
-    bufferIndex = 0;
-    allocated = true;
+    readChunkDataIntoBuffers(adjustedChunkInfo);
+    bufferOffsetWrtChunkData = adjustedBuffersOffset;
 
     // If the stream was seeked to position before, then the buffer
     // position should be adjusted as the reads happen at checksum boundaries.
@@ -339,14 +362,31 @@ public class ChunkInputStream extends InputStream
     //    1. Stream was seeked to a position before the chunk was read
     //    2. Chunk was read from index < the current position to account for
     //    checksum boundaries.
-    adjustBufferPosition(startByteIndex - bufferOffset);
+    adjustBufferPosition(startByteIndex - bufferOffsetWrtChunkData);
+  }
+
+  private void readChunkDataIntoBuffers(ChunkInfo readChunkInfo)
+      throws IOException {
+    buffers = readChunk(readChunkInfo);
+    buffersSize = readChunkInfo.getLen();
+
+    bufferOffsets = new long[buffers.size()];
+    int tempOffset = 0;
+    for (int i = 0; i < buffers.size(); i++) {
+      bufferOffsets[i] = tempOffset;
+      tempOffset += buffers.get(i).limit();
+    }
+
+    bufferIndex = 0;
+    allocated = true;
   }
 
   /**
    * Send RPC call to get the chunk from the container.
    */
   @VisibleForTesting
-  protected ByteString readChunk(ChunkInfo readChunkInfo) throws IOException {
+  protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo)
+      throws IOException {
     ReadChunkResponseProto readChunkResponse;
 
     try {
@@ -364,7 +404,18 @@ public class ChunkInputStream extends InputStream
       throw new IOException("Unexpected OzoneException: " + e.toString(), e);
     }
 
-    return readChunkResponse.getData();
+    if (readChunkResponse.hasData()) {
+      return readChunkResponse.getData().asReadOnlyByteBufferList();
+    } else if (readChunkResponse.hasDataBuffers()) {
+      List<ByteString> buffersList = readChunkResponse.getDataBuffers()
+          .getBuffersList();
+      return buffersList.stream()
+          .map(ByteString::asReadOnlyByteBuffer)
+          .collect(Collectors.toList());
+    } else {
+      throw new IOException("Unexpected error while reading chunk data " +
+          "from container. No data returned.");
+    }
   }
 
   private CheckedBiFunction<ContainerCommandRequestProto,
@@ -374,14 +425,31 @@ public class ChunkInputStream extends InputStream
                 request.getReadChunk().getChunkData();
 
             ReadChunkResponseProto readChunkResponse = response.getReadChunk();
-            ByteString byteString = readChunkResponse.getData();
-
-            if (byteString.size() != reqChunkInfo.getLen()) {
-              // Bytes read from chunk should be equal to chunk size.
-              throw new OzoneChecksumException(String
-                  .format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                      reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
-                      byteString.size()));
+            List<ByteString> byteStrings;
+            boolean isV0 = false;
+
+            if (readChunkResponse.hasData()) {
+              ByteString byteString = readChunkResponse.getData();
+              if (byteString.size() != reqChunkInfo.getLen()) {
+                // Bytes read from chunk should be equal to chunk size.
+                throw new OzoneChecksumException(String.format(
+                    "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+                    reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+                    byteString.size()));
+              }
+              byteStrings = new ArrayList<>();
+              byteStrings.add(byteString);
+              isV0 = true;
+            } else {
+              byteStrings = 
readChunkResponse.getDataBuffers().getBuffersList();
+              long buffersLen = BufferUtils.getBuffersLen(byteStrings);
+              if (buffersLen != reqChunkInfo.getLen()) {
+                // Bytes read from chunk should be equal to chunk size.
+                throw new OzoneChecksumException(String.format(
+                    "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+                    reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+                    buffersLen));
+              }
             }
 
             if (verifyChecksum) {
@@ -396,7 +464,8 @@ public class ChunkInputStream extends InputStream
                   chunkInfo.getOffset();
               int bytesPerChecksum = checksumData.getBytesPerChecksum();
               int startIndex = (int) (relativeOffset / bytesPerChecksum);
-              Checksum.verifyChecksum(byteString, checksumData, startIndex);
+              Checksum.verifyChecksum(byteStrings, checksumData, startIndex,
+                  isV0);
             }
           };
 
@@ -413,18 +482,22 @@ public class ChunkInputStream extends InputStream
    *
    * @param startByteIndex the first byte index to be read by client
    * @param dataLen number of bytes to be read from the chunk
+   * @return Adjusted (Chunk Offset, Chunk Length) which needs to be read
+   * from Container
    */
-  private void computeChecksumBoundaries(long startByteIndex, int dataLen) {
+  private Pair<Long, Long> computeChecksumBoundaries(long startByteIndex,
+      int dataLen) {
 
     int bytesPerChecksum = chunkInfo.getChecksumData().getBytesPerChecksum();
     // index of the last byte to be read from chunk, inclusively.
     final long endByteIndex = startByteIndex + dataLen - 1;
 
-    bufferOffset =  (startByteIndex / bytesPerChecksum)
+    long adjustedChunkOffset =  (startByteIndex / bytesPerChecksum)
         * bytesPerChecksum; // inclusive
     final long endIndex = ((endByteIndex / bytesPerChecksum) + 1)
         * bytesPerChecksum; // exclusive
-    bufferLength = Math.min(endIndex, length) - bufferOffset;
+    long adjustedChunkLen = Math.min(endIndex, length) - adjustedChunkOffset;
+    return Pair.of(adjustedChunkOffset, adjustedChunkLen);
   }
 
   /**
@@ -433,18 +506,34 @@ public class ChunkInputStream extends InputStream
    * @param bufferPosition the position to which the buffers must be advanced
    */
   private void adjustBufferPosition(long bufferPosition) {
-    // The bufferPosition is w.r.t the current chunk.
-    // Adjust the bufferIndex and position to the seeked chunkPosition.
-    long tempOffest = 0;
-    for (int i = 0; i < buffers.size(); i++) {
-      if (bufferPosition - tempOffest >= buffers.get(i).capacity()) {
-        tempOffest += buffers.get(i).capacity();
-      } else {
-        bufferIndex = i;
-        break;
-      }
+    // The bufferPosition is w.r.t the current buffers.
+    // Adjust the bufferIndex and position to the seeked bufferPosition.
+    if (bufferIndex >= buffers.size()) {
+      bufferIndex = Arrays.binarySearch(bufferOffsets, bufferPosition);
+    } else if (bufferPosition < bufferOffsets[bufferIndex]) {
+      bufferIndex = Arrays.binarySearch(bufferOffsets, 0, bufferIndex,
+          bufferPosition);
+    } else if (bufferPosition >= bufferOffsets[bufferIndex] +
+        buffers.get(bufferIndex).capacity()) {
+      bufferIndex = Arrays.binarySearch(bufferOffsets, bufferIndex + 1,
+          buffers.size(), bufferPosition);
+    }
+    if (bufferIndex < 0) {
+      bufferIndex = -bufferIndex - 2;
+    }
+
+    buffers.get(bufferIndex).position(
+        (int) (bufferPosition - bufferOffsets[bufferIndex]));
+
+    // Reset buffers > bufferIndex to position 0. We do this to reset any
+    // previous reads/ seeks which might have updated any buffer position.
+    // For buffers < bufferIndex, we do not need to reset the position as it
+    // not required for this read. If a seek was done to a position in the
+    // previous indices, the buffer position reset would be performed in the
+    // seek call.
+    for (int i = bufferIndex + 1; i < buffers.size(); i++) {
+      buffers.get(i).position(0);
     }
-    buffers.get(bufferIndex).position((int) (bufferPosition - tempOffest));
 
     // Reset the chunkPosition as chunk stream has been initialized i.e. the
     // buffers have been allocated.
@@ -499,8 +588,8 @@ public class ChunkInputStream extends InputStream
     // Check if buffers have been allocated
     if (buffersAllocated()) {
       // Check if the current buffers cover the input position
-      return pos >= bufferOffset &&
-          pos < bufferOffset + bufferLength;
+      return pos >= bufferOffsetWrtChunkData &&
+          pos < bufferOffsetWrtChunkData + buffersSize;
     }
     return false;
   }
@@ -514,7 +603,7 @@ public class ChunkInputStream extends InputStream
     if (chunkPosition >= 0) {
       bufferPos = chunkPosition;
     } else {
-      bufferPos = bufferOffset + bufferLength;
+      bufferPos = bufferOffsetWrtChunkData + buffersSize;
     }
 
     return bufferPos < length;
@@ -532,8 +621,9 @@ public class ChunkInputStream extends InputStream
     if (buffersHaveData() || dataRemainingInChunk()) {
       return false;
     } else {
-      Preconditions.checkState(bufferOffset + bufferLength == length,
-          "EOF detected, but not at the last byte of the chunk");
+      Preconditions.checkState(
+          bufferOffsetWrtChunkData + buffersSize == length,
+          "EOF detected but not at the last byte of the chunk");
       return true;
     }
   }
@@ -544,9 +634,9 @@ public class ChunkInputStream extends InputStream
   private void releaseBuffers() {
     buffers = null;
     bufferIndex = 0;
-    // We should not reset bufferOffset and bufferLength here because when
-    // getPos() is called in chunkStreamsEOF() we use these values and
-    // determine whether chunk is read completely or not.
+    // We should not reset bufferOffsetWrtChunkData and buffersSize here
+    // because when getPos() is called in chunkStreamEOF() we use these
+    // values and determine whether chunk is read completely or not.
   }
 
   /**
@@ -579,4 +669,9 @@ public class ChunkInputStream extends InputStream
     releaseBuffers();
     releaseClient();
   }
+
+  @VisibleForTesting
+  public List<ByteBuffer> getCachedBuffers() {
+    return buffers;
+  }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index 5f3210d..8453210 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.storage;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -25,6 +26,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 /**
@@ -48,12 +50,29 @@ public class DummyChunkInputStream extends ChunkInputStream 
{
   }
 
   @Override
-  protected ByteString readChunk(ChunkInfo readChunkInfo) {
-    ByteString byteString = ByteString.copyFrom(chunkData,
-        (int) readChunkInfo.getOffset(),
-        (int) readChunkInfo.getLen());
-    getReadByteBuffers().add(byteString);
-    return byteString;
+  protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
+    int offset = (int) readChunkInfo.getOffset();
+    int remainingToRead = (int) readChunkInfo.getLen();
+
+    int bufferCapacity = readChunkInfo.getChecksumData().getBytesPerChecksum();
+    int bufferLen;
+    readByteBuffers.clear();
+    while (remainingToRead > 0) {
+      if (remainingToRead < bufferCapacity) {
+        bufferLen = remainingToRead;
+      } else {
+        bufferLen = bufferCapacity;
+      }
+      ByteString byteString = ByteString.copyFrom(chunkData,
+          offset, bufferLen);
+
+      readByteBuffers.add(byteString);
+
+      offset += bufferLen;
+      remainingToRead -= bufferLen;
+    }
+
+    return BufferUtils.getReadOnlyByteBuffers(readByteBuffers);
   }
 
   @Override
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index a9296bc..f6d9b8d 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hdds.scm.storage;
 
 import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -94,6 +96,19 @@ public class TestChunkInputStream {
     }
   }
 
+  private void matchWithInputData(List<ByteString> byteStrings,
+      int inputDataStartIndex, int length) {
+    int offset = inputDataStartIndex;
+    int totalBufferLen = 0;
+    for (ByteString byteString : byteStrings) {
+      int bufferLen = byteString.size();
+      matchWithInputData(byteString.toByteArray(), offset, bufferLen);
+      offset += bufferLen;
+      totalBufferLen += bufferLen;
+    }
+    Assert.assertEquals(length, totalBufferLen);
+  }
+
   /**
    * Seek to a position and verify through getPos().
    */
@@ -123,10 +138,9 @@ public class TestChunkInputStream {
     // To read chunk data from index 0 to 49 (len = 50), we need to read
     // chunk from offset 0 to 60 as the checksum boundary is at every 20
     // bytes. Verify that 60 bytes of chunk data are read and stored in the
-    // buffers.
-    matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
-        0, 60);
-
+    // buffers. Since checksum boundary is at every 20 bytes, there should be
+    // 60/20 number of buffers.
+    matchWithInputData(chunkStream.getReadByteBuffers(), 0, 60);
   }
 
   @Test
@@ -152,8 +166,7 @@ public class TestChunkInputStream {
     byte[] b = new byte[30];
     chunkStream.read(b, 0, 30);
     matchWithInputData(b, 25, 30);
-    matchWithInputData(chunkStream.getReadByteBuffers().get(0).toByteArray(),
-        20, 40);
+    matchWithInputData(chunkStream.getReadByteBuffers(), 20, 40);
 
     // After read, the position of the chunkStream is evaluated from the
     // buffers and the chunkPosition should be reset to -1.
@@ -216,8 +229,8 @@ public class TestChunkInputStream {
     ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
         clientFactory, pipelineRef::get, false, null) {
       @Override
-      protected ByteString readChunk(ChunkInfo readChunkInfo) {
-        return ByteString.copyFrom(chunkData);
+      protected List<ByteBuffer> readChunk(ChunkInfo readChunkInfo) {
+        return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList();
       }
     };
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index d2d1112..aecf45c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -133,6 +133,11 @@ public final class ScmConfigKeys {
   // 4 MB by default
   public static final String OZONE_SCM_CHUNK_SIZE_DEFAULT = "4MB";
 
+  public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY =
+      "ozone.chunk.read.buffer.default.size";
+  public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT =
+      "64KB";
+
   public static final String OZONE_SCM_CHUNK_LAYOUT_KEY =
       "ozone.scm.chunk.layout";
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 3d9ded9..3ff2c2e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -18,12 +18,17 @@
 package org.apache.hadoop.hdds.scm.protocolPB;
 
 import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto.Builder;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DataBuffers;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetBlockResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetCommittedBlockLengthResponseProto;
@@ -34,8 +39,11 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkR
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
+import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
+
 /**
  * A set of helper functions to create responses to container commands.
  */
@@ -204,26 +212,46 @@ public final class ContainerCommandResponseBuilders {
 
   /**
    * Gets a response to the read small file call.
-   * @param msg - Msg
-   * @param data  - Data
+   * @param request - Msg
+   * @param dataBuffers  - Data
    * @param info  - Info
    * @return    Response.
    */
   public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
-      ContainerCommandRequestProto msg, ByteString data, ChunkInfo info) {
-
-    Preconditions.checkNotNull(msg);
-
-    ReadChunkResponseProto.Builder readChunk =
-        ReadChunkResponseProto.newBuilder()
-            .setChunkData(info)
-            .setData((data))
-            .setBlockID(msg.getGetSmallFile().getBlock().getBlockID());
+      ContainerCommandRequestProto request, List<ByteString> dataBuffers,
+      ChunkInfo info) {
+
+    Preconditions.checkNotNull(request);
+
+    boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile())
+        .equals(ContainerProtos.ReadChunkVersion.V0);
+
+    ReadChunkResponseProto.Builder readChunk;
+
+    if (isReadChunkV0) {
+      // V0 has all response data in a single ByteBuffer
+      ByteString combinedData = ByteString.EMPTY;
+      for (ByteString buffer : dataBuffers) {
+        combinedData.concat(buffer);
+      }
+      readChunk = ReadChunkResponseProto.newBuilder()
+          .setChunkData(info)
+          .setData(combinedData)
+          .setBlockID(request.getGetSmallFile().getBlock().getBlockID());
+    } else {
+      // V1 splits response data into a list of ByteBuffers
+      readChunk = ReadChunkResponseProto.newBuilder()
+          .setChunkData(info)
+          .setDataBuffers(DataBuffers.newBuilder()
+              .addAllBuffers(dataBuffers)
+              .build())
+          .setBlockID(request.getGetSmallFile().getBlock().getBlockID());
+    }
 
     GetSmallFileResponseProto.Builder getSmallFile =
         GetSmallFileResponseProto.newBuilder().setData(readChunk);
 
-    return getSuccessResponseBuilder(msg)
+    return getSuccessResponseBuilder(request)
         .setCmdType(Type.GetSmallFile)
         .setGetSmallFile(getSmallFile)
         .build();
@@ -250,13 +278,29 @@ public final class ContainerCommandResponseBuilders {
   }
 
   public static ContainerCommandResponseProto getReadChunkResponse(
-      ContainerCommandRequestProto request, ByteString data) {
-
-    ReadChunkResponseProto.Builder response =
-        ReadChunkResponseProto.newBuilder()
-            .setChunkData(request.getReadChunk().getChunkData())
-            .setData(data)
-            .setBlockID(request.getReadChunk().getBlockID());
+      ContainerCommandRequestProto request, ChunkBuffer data,
+      Function<ByteBuffer, ByteString> byteBufferToByteString) {
+
+    boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
+        .equals(ContainerProtos.ReadChunkVersion.V0);
+
+    ReadChunkResponseProto.Builder response;
+
+    if (isReadChunkV0) {
+      // V0 has all response data in a single ByteBuffer
+      response = ReadChunkResponseProto.newBuilder()
+          .setChunkData(request.getReadChunk().getChunkData())
+          .setData(data.toByteString(byteBufferToByteString))
+          .setBlockID(request.getReadChunk().getBlockID());
+    } else {
+      // V1 splits response data into a list of ByteBuffers
+      response = ReadChunkResponseProto.newBuilder()
+          .setChunkData(request.getReadChunk().getChunkData())
+          .setDataBuffers(DataBuffers.newBuilder()
+              .addAllBuffers(data.toByteStringList(byteBufferToByteString))
+              .build())
+          .setBlockID(request.getReadChunk().getBlockID());
+    }
 
     return getSuccessResponseBuilder(request)
         .setReadChunk(response)
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index f681d0d..167ae82 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -226,7 +226,8 @@ public final class ContainerProtocolCalls  {
     ReadChunkRequestProto.Builder readChunkRequest =
         ReadChunkRequestProto.newBuilder()
             .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-            .setChunkData(chunk);
+            .setChunkData(chunk)
+            .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
     String id = xceiverClient.getPipeline().getClosestNode().getUuidString();
     ContainerCommandRequestProto.Builder builder =
         ContainerCommandRequestProto.newBuilder().setCmdType(Type.ReadChunk)
@@ -489,6 +490,7 @@ public final class ContainerProtocolCalls  {
     ContainerProtos.GetSmallFileRequestProto getSmallFileRequest =
         GetSmallFileRequestProto
             .newBuilder().setBlock(getBlock)
+            .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1)
             .build();
     String id = client.getPipeline().getClosestNode().getUuidString();
 
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java
new file mode 100644
index 0000000..d5d1539
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/ClientCommandsUtils.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.utils;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+public final class ClientCommandsUtils {
+
+  /** Utility classes should not be constructed. **/
+  private ClientCommandsUtils() {
+
+  }
+
+  public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
+      ContainerProtos.ReadChunkRequestProto readChunkRequest) {
+    if (readChunkRequest.hasReadChunkVersion()) {
+      return readChunkRequest.getReadChunkVersion();
+    } else {
+      return ContainerProtos.ReadChunkVersion.V0;
+    }
+  }
+
+  public static ContainerProtos.ReadChunkVersion getReadChunkVersion(
+      ContainerProtos.GetSmallFileRequestProto getSmallFileRequest) {
+    if (getSmallFileRequest.hasReadChunkVersion()) {
+      return getSmallFileRequest.getReadChunkVersion();
+    } else {
+      return ContainerProtos.ReadChunkVersion.V0;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java
new file mode 100644
index 0000000..edd6b91
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/utils/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.utils;
+
+/**
+ * This package contains utility classes for the SCM and client protocols.
+ */
\ No newline at end of file
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 873d288..e9843bb 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -333,7 +333,7 @@ public final class OzoneConfigKeys {
       "hdds.datanode.replication.work.dir";
 
 
-  public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 
1024;
+  public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 16 * 1024;
 
   public static final String OZONE_CLIENT_READ_TIMEOUT
           = "ozone.client.read.timeout";
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index db7a31e..76f84c4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumTy
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,6 +145,13 @@ public class Checksum {
     return computeChecksum(ChunkBuffer.wrap(data));
   }
 
+  public ChecksumData computeChecksum(List<ByteString> byteStrings)
+      throws OzoneChecksumException {
+    final List<ByteBuffer> buffers =
+        BufferUtils.getReadOnlyByteBuffers(byteStrings);
+    return computeChecksum(ChunkBuffer.wrap(buffers));
+  }
+
   public ChecksumData computeChecksum(ChunkBuffer data)
       throws OzoneChecksumException {
     if (checksumType == ChecksumType.NONE) {
@@ -242,6 +250,46 @@ public class Checksum {
   }
 
   /**
+   * Computes the ChecksumData for the input byteStrings and verifies that
+   * the checksums match with that of the input checksumData.
+   * @param byteStrings input data buffers list. Each byteString should
+   *                    correspond to one checksum.
+   * @param checksumData checksumData to match with
+   * @param startIndex index of first checksum in checksumData to match with
+   *                   data's computed checksum.
+   * @param isSingleByteString if true, there is only one byteString in the
+   *                           input list and it should be processes
+   *                           accordingly
+   * @throws OzoneChecksumException is thrown if checksums do not match
+   */
+  public static boolean verifyChecksum(List<ByteString> byteStrings,
+      ChecksumData checksumData, int startIndex, boolean isSingleByteString)
+      throws OzoneChecksumException {
+    ChecksumType checksumType = checksumData.getChecksumType();
+    if (checksumType == ChecksumType.NONE) {
+      // Checksum is set to NONE. No further verification is required.
+      return true;
+    }
+
+    if (isSingleByteString) {
+      // The data is a single ByteString (old format).
+      return verifyChecksum(byteStrings.get(0), checksumData, startIndex);
+    }
+
+    // The data is a list of ByteStrings. Each ByteString length should be
+    // the same as the number of bytes per checksum (except the last
+    // ByteString which could be smaller).
+    final List<ByteBuffer> buffers =
+        BufferUtils.getReadOnlyByteBuffers(byteStrings);
+
+    int bytesPerChecksum = checksumData.getBytesPerChecksum();
+    Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
+    final ChecksumData computed = checksum.computeChecksum(
+        ChunkBuffer.wrap(buffers));
+    return checksumData.verifyChecksumDataMatches(computed, startIndex);
+  }
+
+  /**
    * Returns a ChecksumData with type NONE for testing.
    */
   @VisibleForTesting
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 65f8a89..7d069cd 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -144,6 +144,17 @@ public interface ChunkBuffer {
     return toByteStringImpl(b -> applyAndAssertFunction(b, function, this));
   }
 
+  /**
+   * Convert this buffer(s) to a list of {@link ByteString}.
+   * The position and limit of this {@link ChunkBuffer} remains unchanged.
+   * The given function must preserve the position and limit
+   * of the input {@link ByteBuffer}.
+   */
+  default List<ByteString> toByteStringList(
+      Function<ByteBuffer, ByteString> function) {
+    return toByteStringListImpl(b -> applyAndAssertFunction(b, function, 
this));
+  }
+
   // for testing
   default ByteString toByteString() {
     return toByteString(ByteStringConversion::safeWrap);
@@ -151,6 +162,9 @@ public interface ChunkBuffer {
 
   ByteString toByteStringImpl(Function<ByteBuffer, ByteString> function);
 
+  List<ByteString> toByteStringListImpl(
+      Function<ByteBuffer, ByteString> function);
+
   static void assertInt(int expected, int computed, Supplier<String> prefix) {
     if (expected != computed) {
       throw new IllegalStateException(prefix.get()
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index dd81635..afa39f8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.common;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -122,6 +123,12 @@ final class ChunkBufferImplWithByteBuffer implements 
ChunkBuffer {
   }
 
   @Override
+  public List<ByteString> toByteStringListImpl(
+      Function<ByteBuffer, ByteString> f) {
+    return Arrays.asList(f.apply(buffer));
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index e9949cc..94ad552 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.ozone.common;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -110,6 +113,11 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
   }
 
   @Override
+  public boolean hasRemaining() {
+    return position() < limit;
+  }
+
+  @Override
   public ChunkBuffer rewind() {
     buffers.forEach(ByteBuffer::rewind);
     rewindCurrent();
@@ -171,9 +179,29 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
   }
 
   @Override
+  /**
+   * Returns the next buffer in the list irrespective of the bufferSize.
+   */
   public Iterable<ByteBuffer> iterate(int bufferSize) {
-    // currently not necessary; implement if needed
-    throw new UnsupportedOperationException();
+    return () -> new Iterator<ByteBuffer>() {
+      @Override
+      public boolean hasNext() {
+        return hasRemaining();
+      }
+
+      @Override
+      public ByteBuffer next() {
+        if (!hasRemaining()) {
+          throw new NoSuchElementException();
+        }
+        findCurrent();
+        ByteBuffer current = buffers.get(currentIndex);
+        final ByteBuffer duplicated = current.duplicate();
+        duplicated.limit(current.limit());
+        current.position(current.limit());
+        return duplicated;
+      }
+    };
   }
 
   @Override
@@ -194,6 +222,12 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
   }
 
   @Override
+  public List<ByteString> toByteStringListImpl(
+      Function<ByteBuffer, ByteString> f) {
+    return buffers.stream().map(f).collect(Collectors.toList());
+  }
+
+  @Override
   public String toString() {
     return getClass().getSimpleName()
         + ":n=" + buffers.size()
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index e63ab97..ce7b1eb 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.common;
 
 import com.google.common.base.Preconditions;
+import java.util.stream.Collectors;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.IOException;
@@ -274,6 +275,12 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
   }
 
   @Override
+  public List<ByteString> toByteStringListImpl(
+      Function<ByteBuffer, ByteString> f) {
+    return buffers.stream().map(f).collect(Collectors.toList());
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
new file mode 100644
index 0000000..a8be69d
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/BufferUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.common.utils;
+
+import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+public final class BufferUtils {
+
+  /** Utility classes should not be constructed. **/
+  private BufferUtils() {
+
+  }
+
+  /**
+   * Assign an array of ByteBuffers.
+   * @param totalLen total length of all ByteBuffers
+   * @param bufferCapacity max capacity of each ByteBuffer
+   */
+  public static ByteBuffer[] assignByteBuffers(long totalLen,
+      long bufferCapacity) {
+    Preconditions.checkArgument(totalLen > 0, "Buffer Length should be a " +
+        "positive integer.");
+    Preconditions.checkArgument(bufferCapacity > 0, "Buffer Capacity should " +
+        "be a positive integer.");
+
+    int numBuffers = getNumberOfBins(totalLen, bufferCapacity);
+
+    ByteBuffer[] dataBuffers = new ByteBuffer[numBuffers];
+    int buffersAllocated = 0;
+    // For each ByteBuffer (except the last) allocate bufferLen of capacity
+    for (int i = 0; i < numBuffers - 1; i++) {
+      dataBuffers[i] = ByteBuffer.allocate((int) bufferCapacity);
+      buffersAllocated += bufferCapacity;
+    }
+    // For the last ByteBuffer, allocate as much space as is needed to fit
+    // remaining bytes
+    dataBuffers[numBuffers - 1] = ByteBuffer.allocate(
+        (int) (totalLen - buffersAllocated));
+    return dataBuffers;
+  }
+
+  /**
+   * Return a read only ByteBuffer list for the input ByteStrings list.
+   */
+  public static List<ByteBuffer> getReadOnlyByteBuffers(
+      List<ByteString> byteStrings) {
+    List<ByteBuffer> buffers = new ArrayList<>();
+    for (ByteString byteString : byteStrings) {
+      buffers.add(byteString.asReadOnlyByteBuffer());
+    }
+    return buffers;
+  }
+
+  public static ByteString concatByteStrings(List<ByteString> byteStrings) {
+    return byteStrings.stream().reduce(ByteString::concat).orElse(null);
+  }
+
+  /**
+   * Return the summation of the length of all ByteStrings.
+   */
+  public static long getBuffersLen(List<ByteString> buffers) {
+    long length = 0;
+    for (ByteString buffer : buffers) {
+      length += buffer.size();
+    }
+    return length;
+  }
+
+  /**
+   * Return the number of bins required to hold all the elements given a max
+   * capacity for each bin.
+   * @param numElements total number of elements to put in bin
+   * @param maxElementsPerBin max number of elements per bin
+   * @return number of bins
+   */
+  public static int getNumberOfBins(long numElements, long maxElementsPerBin) {
+    return (int) Math.ceil((double) numElements / (double) maxElementsPerBin);
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java
new file mode 100644
index 0000000..11741fa
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/utils/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.common.utils;
+
+/**
+ * This package contains common utility classes for HDDS.
+ */
\ No newline at end of file
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
index 1c73a31..bc97e8e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
@@ -38,6 +38,10 @@ public class ChunkInfo {
   private ChecksumData checksumData;
   private final Map<String, String> metadata;
 
+  // For older clients reading chunks in V0 version (all read data should
+  // reside in one buffer). This variable should be set to true for older
+  // clients to maintain backward wire compatibility.
+  private boolean readDataIntoSingleBuffer = false;
 
   /**
    * Constructs a ChunkInfo.
@@ -182,4 +186,12 @@ public class ChunkInfo {
         ", len=" + len +
         '}';
   }
+
+  public void setReadDataIntoSingleBuffer(boolean readDataIntoSingleBuffer) {
+    this.readDataIntoSingleBuffer = readDataIntoSingleBuffer;
+  }
+
+  public boolean isReadDataIntoSingleBuffer() {
+    return readDataIntoSingleBuffer;
+  }
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 051868e..ae81c84 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -701,6 +701,19 @@
     </description>
   </property>
   <property>
+    <name>ozone.chunk.read.buffer.default.size</name>
+    <value>64KB</value>
+    <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
+    <description>
+      The default read buffer size during read chunk operations when checksum
+      is disabled. Chunk data will be cached in buffers of this capacity.
+
+      For chunk data with checksum, the read buffer size will be the
+      same as the number of bytes per checksum
+      (ozone.client.bytes.per.checksum) corresponding to the chunk.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.chunk.layout</name>
     <value>FILE_PER_BLOCK</value>
     <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index b53fe7e..9f2faff 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -281,10 +281,22 @@ public final class ContainerUtils {
     if (msg.hasReadChunk() || msg.hasGetSmallFile()) {
       ContainerCommandResponseProto.Builder builder = msg.toBuilder();
       if (msg.hasReadChunk()) {
-        builder.getReadChunkBuilder().setData(REDACTED);
+        if (msg.getReadChunk().hasData()) {
+          builder.getReadChunkBuilder().setData(REDACTED);
+        }
+        if (msg.getReadChunk().hasDataBuffers()) {
+          builder.getReadChunkBuilder().getDataBuffersBuilder()
+              .addBuffers(REDACTED);
+        }
       }
       if (msg.hasGetSmallFile()) {
-        builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED);
+        if (msg.getGetSmallFile().getData().hasData()) {
+          builder.getGetSmallFileBuilder().getDataBuilder().setData(REDACTED);
+        }
+        if (msg.getGetSmallFile().getData().hasDataBuffers()) {
+          builder.getGetSmallFileBuilder().getDataBuilder()
+              .getDataBuffersBuilder().addBuffers(REDACTED);
+        }
       }
       return builder.build();
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index de2a058..424c08a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.hdds.utils.Cache;
 import org.apache.hadoop.hdds.utils.ResourceLimitCache;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.util.Time;
@@ -572,7 +573,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     ReadChunkRequestProto.Builder readChunkRequestProto =
         ReadChunkRequestProto.newBuilder()
             .setBlockID(writeChunkRequestProto.getBlockID())
-            .setChunkData(chunkInfo);
+            .setChunkData(chunkInfo)
+            .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
     ContainerCommandRequestProto dataContainerCommandProto =
         ContainerCommandRequestProto.newBuilder(requestProto)
             .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
@@ -595,14 +597,22 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     }
 
     ReadChunkResponseProto responseProto = response.getReadChunk();
+    ByteString data;
+    if (responseProto.hasData()) {
+      data = responseProto.getData();
+    } else {
+      data = BufferUtils.concatByteStrings(
+          responseProto.getDataBuffers().getBuffersList());
+    }
 
-    ByteString data = responseProto.getData();
     // assert that the response has data in it.
     Preconditions
-        .checkNotNull(data, "read chunk data is null for chunk: %s", 
chunkInfo);
+        .checkNotNull(data, "read chunk data is null for chunk: %s",
+            chunkInfo);
     Preconditions.checkState(data.size() == chunkInfo.getLen(),
         "read chunk len=%s does not match chunk expected len=%s for chunk:%s",
         data.size(), chunkInfo.getLen(), chunkInfo);
+
     return data;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 117e973..3b672bf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -92,6 +94,8 @@ import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
+
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -601,8 +605,18 @@ public class KeyValueHandler extends Handler {
         dispatcherContext = new DispatcherContext.Builder().build();
       }
 
-      data = chunkManager
-          .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
+      boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
+          .equals(ContainerProtos.ReadChunkVersion.V0);
+      if (isReadChunkV0) {
+        // For older clients, set ReadDataIntoSingleBuffer to true so that
+        // all the data read from chunk file is returned as a single
+        // ByteString. Older clients cannot process data returned as a list
+        // of ByteStrings.
+        chunkInfo.setReadDataIntoSingleBuffer(true);
+      }
+
+      data = chunkManager.readChunk(kvContainer, blockID, chunkInfo,
+          dispatcherContext);
       metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen());
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -614,8 +628,7 @@ public class KeyValueHandler extends Handler {
 
     Preconditions.checkNotNull(data, "Chunk data is null");
 
-    ByteString byteString = data.toByteString(byteBufferToByteString);
-    return getReadChunkResponse(request, byteString);
+    return getReadChunkResponse(request, data, byteBufferToByteString);
   }
 
   /**
@@ -833,21 +846,32 @@ public class KeyValueHandler extends Handler {
           .getBlockID());
       BlockData responseData = blockManager.getBlock(kvContainer, blockID);
 
-      ContainerProtos.ChunkInfo chunkInfo = null;
-      ByteString dataBuf = ByteString.EMPTY;
+      ContainerProtos.ChunkInfo chunkInfoProto = null;
+      List<ByteString> dataBuffers = new ArrayList<>();
       DispatcherContext dispatcherContext =
           new DispatcherContext.Builder().build();
       for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
         // if the block is committed, all chunks must have been committed.
         // Tmp chunk files won't exist here.
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunk);
+        boolean isReadChunkV0 = getReadChunkVersion(request.getGetSmallFile())
+            .equals(ContainerProtos.ReadChunkVersion.V0);
+        if (isReadChunkV0) {
+          // For older clients, set ReadDataIntoSingleBuffer to true so that
+          // all the data read from chunk file is returned as a single
+          // ByteString. Older clients cannot process data returned as a list
+          // of ByteStrings.
+          chunkInfo.setReadDataIntoSingleBuffer(true);
+        }
         ChunkBuffer data = chunkManager.readChunk(kvContainer, blockID,
-            ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
-        ByteString current = data.toByteString(byteBufferToByteString);
-        dataBuf = dataBuf.concat(current);
-        chunkInfo = chunk;
+            chunkInfo, dispatcherContext);
+        dataBuffers.addAll(data.toByteStringList(byteBufferToByteString));
+        chunkInfoProto = chunk;
       }
-      metrics.incContainerBytesStats(Type.GetSmallFile, dataBuf.size());
-      return getGetSmallFileResponseSuccess(request, dataBuf, chunkInfo);
+      metrics.incContainerBytesStats(Type.GetSmallFile,
+          BufferUtils.getBuffersLen(dataBuffers));
+      return getGetSmallFileResponseSuccess(request, dataBuffers,
+          chunkInfoProto);
     } catch (StorageContainerException e) {
       return ContainerUtils.logAndReturnError(LOG, e, request);
     } catch (IOException ex) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index e1fac6f..f0fbee5 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -167,11 +167,11 @@ public final class ChunkUtils {
   }
 
   /**
-   * Reads data from an existing chunk file.
+   * Reads data from an existing chunk file into a list of ByteBuffers.
    *
    * @param file file where data lives
    */
-  public static void readData(File file, ByteBuffer buf,
+  public static void readData(File file, ByteBuffer[] buffers,
       long offset, long len, VolumeIOStats volumeIOStats)
       throws StorageContainerException {
 
@@ -184,7 +184,7 @@ public final class ChunkUtils {
         try (FileChannel channel = open(path, READ_OPTIONS, NO_ATTRIBUTES);
              FileLock ignored = channel.lock(offset, len, true)) {
 
-          return channel.read(buf, offset);
+          return channel.position(offset).read(buffers);
         } catch (IOException e) {
           throw new UncheckedIOException(e);
         }
@@ -204,7 +204,9 @@ public final class ChunkUtils {
 
     validateReadSize(len, bytesRead);
 
-    buf.flip();
+    for (ByteBuffer buf : buffers) {
+      buf.flip();
+    }
   }
 
   /**
@@ -347,5 +349,4 @@ public final class ChunkUtils {
 
     return CONTAINER_INTERNAL_ERROR;
   }
-
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 17c37c9..fadd359 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -59,6 +61,9 @@ public class BlockManagerImpl implements BlockManager {
   private static final String NO_SUCH_BLOCK_ERR_MSG =
       "Unable to find the block.";
 
+  // Default Read Buffer capacity when Checksum is not present
+  private final long defaultReadBufferCapacity;
+
   /**
    * Constructs a Block Manager.
    *
@@ -67,6 +72,10 @@ public class BlockManagerImpl implements BlockManager {
   public BlockManagerImpl(ConfigurationSource conf) {
     Preconditions.checkNotNull(conf, "Config cannot be null");
     this.config = conf;
+    this.defaultReadBufferCapacity = (long) config.getStorageSize(
+        ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY,
+        ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT,
+        StorageUnit.BYTES);
   }
 
   /**
@@ -244,6 +253,11 @@ public class BlockManagerImpl implements BlockManager {
     }
   }
 
+  @Override
+  public long getDefaultReadBufferCapacity() {
+    return defaultReadBufferCapacity;
+  }
+
   /**
    * Deletes an existing block.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 1c4a917..26113a6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -58,7 +58,7 @@ public class ChunkManagerDispatcher implements ChunkManager {
 
   ChunkManagerDispatcher(boolean sync, BlockManager manager) {
     handlers.put(FILE_PER_CHUNK, new FilePerChunkStrategy(sync, manager));
-    handlers.put(FILE_PER_BLOCK, new FilePerBlockStrategy(sync));
+    handlers.put(FILE_PER_BLOCK, new FilePerBlockStrategy(sync, manager));
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 7e7e5f4..925ef71 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -22,11 +22,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
+import com.google.common.collect.Lists;
+
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -35,6 +39,7 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 
@@ -67,9 +72,12 @@ public class FilePerBlockStrategy implements ChunkManager {
 
   private final boolean doSyncWrite;
   private final OpenFiles files = new OpenFiles();
+  private final long defaultReadBufferCapacity;
 
-  public FilePerBlockStrategy(boolean sync) {
+  public FilePerBlockStrategy(boolean sync, BlockManager manager) {
     doSyncWrite = sync;
+    this.defaultReadBufferCapacity = manager == null ? 0 :
+        manager.getDefaultReadBufferCapacity();
   }
 
   private static void checkLayoutVersion(Container container) {
@@ -145,10 +153,37 @@ public class FilePerBlockStrategy implements ChunkManager 
{
 
     long len = info.getLen();
     long offset = info.getOffset();
-    ByteBuffer data = ByteBuffer.allocate((int) len);
-    ChunkUtils.readData(chunkFile, data, offset, len, volumeIOStats);
 
-    return ChunkBuffer.wrap(data);
+    long bufferCapacity = 0;
+    if (info.isReadDataIntoSingleBuffer()) {
+      // Older client - read all chunk data into one single buffer.
+      bufferCapacity = len;
+    } else {
+      // Set buffer capacity to checksum boundary size so that each buffer
+      // corresponds to one checksum. If checksum is NONE, then set buffer
+      // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
+      ChecksumData checksumData = info.getChecksumData();
+
+      if (checksumData != null) {
+        if (checksumData.getChecksumType() ==
+            ContainerProtos.ChecksumType.NONE) {
+          bufferCapacity = defaultReadBufferCapacity;
+        } else {
+          bufferCapacity = checksumData.getBytesPerChecksum();
+        }
+      }
+    }
+    // If the buffer capacity is 0, set all the data into one ByteBuffer
+    if (bufferCapacity == 0) {
+      bufferCapacity = len;
+    }
+
+    ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
+        bufferCapacity);
+
+    ChunkUtils.readData(chunkFile, dataBuffers, offset, len, volumeIOStats);
+
+    return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers));
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 159b0b2..d7f994e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import com.google.common.base.Preconditions;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -65,10 +68,13 @@ public class FilePerChunkStrategy implements ChunkManager {
 
   private final boolean doSyncWrite;
   private final BlockManager blockManager;
+  private final long defaultReadBufferCapacity;
 
   public FilePerChunkStrategy(boolean sync, BlockManager manager) {
     doSyncWrite = sync;
     blockManager = manager;
+    this.defaultReadBufferCapacity = manager == null ? 0 :
+        manager.getDefaultReadBufferCapacity();
   }
 
   private static void checkLayoutVersion(Container container) {
@@ -222,7 +228,33 @@ public class FilePerChunkStrategy implements ChunkManager {
     }
 
     long len = info.getLen();
-    ByteBuffer data = ByteBuffer.allocate((int) len);
+
+    long bufferCapacity = 0;
+    if (info.isReadDataIntoSingleBuffer()) {
+      // Older client - read all chunk data into one single buffer.
+      bufferCapacity = len;
+    } else {
+      // Set buffer capacity to checksum boundary size so that each buffer
+      // corresponds to one checksum. If checksum is NONE, then set buffer
+      // capacity to default (OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = 64KB).
+      ChecksumData checksumData = info.getChecksumData();
+
+      if (checksumData != null) {
+        if (checksumData.getChecksumType() ==
+            ContainerProtos.ChecksumType.NONE) {
+          bufferCapacity = defaultReadBufferCapacity;
+        } else {
+          bufferCapacity = checksumData.getBytesPerChecksum();
+        }
+      }
+    }
+    // If the buffer capacity is 0, set all the data into one ByteBuffer
+    if (bufferCapacity == 0) {
+      bufferCapacity = len;
+    }
+
+    ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
+        bufferCapacity);
 
     long chunkFileOffset = 0;
     if (info.getOffset() != 0) {
@@ -255,8 +287,8 @@ public class FilePerChunkStrategy implements ChunkManager {
         if (file.exists()) {
           long offset = info.getOffset() - chunkFileOffset;
           Preconditions.checkState(offset >= 0);
-          ChunkUtils.readData(file, data, offset, len, volumeIOStats);
-          return ChunkBuffer.wrap(data);
+          ChunkUtils.readData(file, dataBuffers, offset, len, volumeIOStats);
+          return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers));
         }
       } catch (StorageContainerException ex) {
         //UNABLE TO FIND chunk is not a problem as we will try with the
@@ -264,7 +296,7 @@ public class FilePerChunkStrategy implements ChunkManager {
         if (ex.getResult() != UNABLE_TO_FIND_CHUNK) {
           throw ex;
         }
-        data.clear();
+        dataBuffers = null;
       }
     }
     throw new StorageContainerException(
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index 72b1040..cab1a3a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -90,6 +90,8 @@ public interface BlockManager {
   long getCommittedBlockLength(Container container, BlockID blockID)
       throws IOException;
 
+  long getDefaultReadBufferCapacity();
+
   /**
    * Shutdown ContainerManager.
    */
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index e430a30..a035379 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -245,6 +245,7 @@ public final class ContainerTestHelper {
         ContainerProtos.ReadChunkRequestProto.newBuilder();
     readRequest.setBlockID(request.getBlockID());
     readRequest.setChunkData(request.getChunkData());
+    readRequest.setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
 
     Builder newRequest =
         ContainerCommandRequestProto.newBuilder();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
index 96d4228..d0b3113 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java
@@ -185,7 +185,7 @@ public class TestBlockDeletingService {
       int numOfChunksPerBlock) throws IOException {
     ChunkManager chunkManager;
     if (layout == FILE_PER_BLOCK) {
-      chunkManager = new FilePerBlockStrategy(true);
+      chunkManager = new FilePerBlockStrategy(true, null);
     } else {
       chunkManager = new FilePerChunkStrategy(true, null);
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index e6998fe..bdc5a42 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -178,8 +179,10 @@ public class TestHddsDispatcher {
       response =
           hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), 
null);
       Assert.assertEquals(ContainerProtos.Result.SUCCESS, 
response.getResult());
-      Assert.assertEquals(response.getReadChunk().getData(),
-          writeChunkRequest.getWriteChunk().getData());
+      ByteString responseData = BufferUtils.concatByteStrings(
+          response.getReadChunk().getDataBuffers().getBuffersList());
+      Assert.assertEquals(writeChunkRequest.getWriteChunk().getData(),
+          responseData);
     } finally {
       ContainerMetrics.remove();
       FileUtils.deleteDirectory(new File(testDir));
@@ -358,7 +361,8 @@ public class TestHddsDispatcher {
     ContainerProtos.ReadChunkRequestProto.Builder readChunkRequest =
         ContainerProtos.ReadChunkRequestProto.newBuilder()
             .setBlockID(writeChunk.getBlockID())
-            .setChunkData(writeChunk.getChunkData());
+            .setChunkData(writeChunk.getChunkData())
+            .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
     return ContainerCommandRequestProto.newBuilder()
         .setCmdType(ContainerProtos.Type.ReadChunk)
         .setContainerID(writeChunk.getBlockID().getContainerID())
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
index 55f1f60..ce9c2aa 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/ChunkLayoutTestInfo.java
@@ -81,7 +81,7 @@ public enum ChunkLayoutTestInfo {
   FILE_PER_BLOCK {
     @Override
     public ChunkManager createChunkManager(boolean sync, BlockManager manager) 
{
-      return new FilePerBlockStrategy(sync);
+      return new FilePerBlockStrategy(sync, null);
     }
 
     @Override
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 7267314..3d9f03c 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -202,7 +202,8 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
       break;
     case ReadChunk:
       builder.setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder()
-          .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo).build());
+          .setBlockID(fakeBlockId).setChunkData(fakeChunkInfo)
+          .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1).build());
       break;
     case DeleteChunk:
       builder
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
index 5b64a37..0509463 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -83,8 +84,13 @@ public class TestChunkUtils {
         final int threadNumber = i;
         executor.execute(() -> {
           try {
-            ByteBuffer readBuffer = ByteBuffer.allocate((int) len);
-            ChunkUtils.readData(file, readBuffer, offset, len, stats);
+            ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
+            ChunkUtils.readData(file, readBuffers, offset, len, stats);
+
+            // There should be only one element in readBuffers
+            Assert.assertEquals(1, readBuffers.length);
+            ByteBuffer readBuffer = readBuffers[0];
+
             LOG.info("Read data ({}): {}", threadNumber,
                 new String(readBuffer.array(), UTF_8));
             if (!Arrays.equals(array, readBuffer.array())) {
@@ -161,8 +167,14 @@ public class TestChunkUtils {
       long len = data.limit();
       long offset = 0;
       ChunkUtils.writeData(file, data, offset, len, stats, true);
-      ByteBuffer readBuffer = ByteBuffer.allocate((int) len);
-      ChunkUtils.readData(file, readBuffer, offset, len, stats);
+
+      ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
+      ChunkUtils.readData(file, readBuffers, offset, len, stats);
+
+      // There should be only one element in readBuffers
+      Assert.assertEquals(1, readBuffers.length);
+      ByteBuffer readBuffer = readBuffers[0];
+
       assertArrayEquals(array, readBuffer.array());
       assertEquals(len, readBuffer.remaining());
     } catch (Exception e) {
@@ -193,13 +205,13 @@ public class TestChunkUtils {
     int len = 123;
     int offset = 0;
     File nonExistentFile = new File("nosuchfile");
-    ByteBuffer buf = ByteBuffer.allocate(len);
+    ByteBuffer[] bufs = BufferUtils.assignByteBuffers(len, len);
     VolumeIOStats stats = new VolumeIOStats();
 
     // when
     StorageContainerException e = LambdaTestUtils.intercept(
         StorageContainerException.class,
-        () -> ChunkUtils.readData(nonExistentFile, buf, offset, len, stats));
+        () -> ChunkUtils.readData(nonExistentFile, bufs, offset, len, stats));
 
     // then
     Assert.assertEquals(UNABLE_TO_FIND_CHUNK, e.getResult());
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
index 8ccf9ee..f3be6e2 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestFilePerBlockStrategy.java
@@ -125,12 +125,17 @@ public class TestFilePerBlockStrategy extends 
CommonChunkManagerTestCases {
     subject.writeChunk(container, blockID, info, data, ctx);
 
     ChunkBuffer readData = subject.readChunk(container, blockID, info, ctx);
-    assertEquals(data.rewind(), readData.rewind());
+    // data will be ChunkBufferImplWithByteBuffer and readData will return
+    // ChunkBufferImplWithByteBufferList. Hence, convert both ByteStrings
+    // before comparing.
+    assertEquals(data.rewind().toByteString(),
+        readData.rewind().toByteString());
 
     ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
     ChunkBuffer readData2 = subject.readChunk(container, blockID, info2, ctx);
     assertEquals(length, info2.getLen());
-    assertEquals(data.duplicate(start, start + length), readData2.rewind());
+    assertEquals(data.rewind().toByteString().substring(start, start + length),
+        readData2.rewind().toByteString());
   }
 
   @Override
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 01cc4ff..31947db 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -398,15 +398,29 @@ message  WriteChunkRequestProto  {
 message  WriteChunkResponseProto {
 }
 
+enum ReadChunkVersion {
+  V0 = 0; // Response data is sent in a single ByteBuffer
+  V1 = 1; // Response data is split into multiple buffers
+}
+
 message  ReadChunkRequestProto  {
   required DatanodeBlockID blockID = 1;
   required ChunkInfo chunkData = 2;
+  optional ReadChunkVersion readChunkVersion = 3;
 }
 
 message  ReadChunkResponseProto {
   required DatanodeBlockID blockID = 1;
   required ChunkInfo chunkData = 2;
-  required bytes data = 3;
+  // Chunk data should be returned in one of the two for
+  oneof responseData {
+    bytes data = 3; // Chunk data is returned as single buffer for V0
+    DataBuffers dataBuffers = 4; // Chunk data is returned as a list of buffers
+  }
+}
+
+message DataBuffers {
+  repeated bytes buffers = 1;
 }
 
 message  DeleteChunkRequestProto {
@@ -443,6 +457,7 @@ message PutSmallFileResponseProto {
 
 message GetSmallFileRequestProto {
   required GetBlockRequestProto block = 1;
+  optional ReadChunkVersion readChunkVersion = 2;
 }
 
 message GetSmallFileResponseProto {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index c8bf7db..10d63d0 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -385,4 +385,9 @@ public class KeyInputStream extends InputStream
       is.unbuffer();
     }
   }
+
+  @VisibleForTesting
+  public List<BlockInputStream> getBlockStreams() {
+    return blockStreams;
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
new file mode 100644
index 0000000..2ad91a7
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests {@link ChunkInputStream}.
+ */
+public class TestChunkInputStream extends TestInputStreamBase {
+
+  public TestChunkInputStream(ChunkLayOutVersion layout) {
+    super(layout);
+  }
+
+  /**
+   * Test to verify that data read from chunks is stored in a list of buffers
+   * with max capacity equal to the bytes per checksum.
+   */
+  @Test
+  public void testChunkReadBuffers() throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+    byte[] inputData = writeRandomBytes(keyName, dataLength);
+
+    KeyInputStream keyInputStream = getKeyInputStream(keyName);
+
+    BlockInputStream block0Stream = keyInputStream.getBlockStreams().get(0);
+    block0Stream.initialize();
+
+    ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
+
+    // To read 1 byte of chunk data, ChunkInputStream should get one full
+    // checksum boundary worth of data from Container and store it in buffers.
+    chunk0Stream.read(new byte[1]);
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+        BYTES_PER_CHECKSUM);
+
+    // Read > checksum boundary of data from chunk0
+    int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+    byte[] readData = readDataFromChunk(chunk0Stream, readDataLen, 0);
+    validateData(inputData, 0, readData);
+
+    // The first checksum boundary size of data was already existing in the
+    // ChunkStream buffers. Once that data is read, the next checksum
+    // boundary size of data will be fetched again to read the remaining data.
+    // Hence there should be 1 checksum boundary size of data stored in the
+    // ChunkStreams buffers at the end of the read.
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1,
+        BYTES_PER_CHECKSUM);
+
+    // Seek to a position in the third checksum boundary (so that current
+    // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
+    // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
+    // data being read into the buffers. There should be 2 buffers each with
+    // BYTES_PER_CHECKSUM capacity.
+    readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+    int offset = 2 * BYTES_PER_CHECKSUM + 1;
+    readData = readDataFromChunk(chunk0Stream, readDataLen, offset);
+    validateData(inputData, offset, readData);
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2,
+        BYTES_PER_CHECKSUM);
+
+
+    // Read the full chunk data -1 and verify that all chunk data is read into
+    // buffers. We read CHUNK_SIZE - 1 as otherwise the buffers will be
+    // released once all chunk data is read.
+    readData = readDataFromChunk(chunk0Stream, CHUNK_SIZE - 1, 0);
+    validateData(inputData, 0, readData);
+    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
+        CHUNK_SIZE / BYTES_PER_CHECKSUM, BYTES_PER_CHECKSUM);
+
+    // Read the last byte of chunk and verify that the buffers are released.
+    chunk0Stream.read(new byte[1]);
+    Assert.assertNull("ChunkInputStream did not release buffers after " +
+        "reaching EOF.", chunk0Stream.getCachedBuffers());
+
+  }
+
+  private byte[] readDataFromChunk(ChunkInputStream chunkInputStream,
+      int readDataLength, int offset) throws IOException {
+    byte[] readData = new byte[readDataLength];
+    chunkInputStream.seek(offset);
+    chunkInputStream.read(readData, 0, readDataLength);
+    return readData;
+  }
+
+  private void checkBufferSizeAndCapacity(List<ByteBuffer> buffers,
+      int expectedNumBuffers, long expectedBufferCapacity) {
+    Assert.assertEquals("ChunkInputStream does not have expected number of " +
+        "ByteBuffers", expectedNumBuffers, buffers.size());
+    for (ByteBuffer buffer : buffers) {
+      Assert.assertEquals("ChunkInputStream ByteBuffer capacity is wrong",
+          expectedBufferCapacity, buffer.capacity());
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
new file mode 100644
index 0000000..3735e8a
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import 
org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.KeyInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+@RunWith(Parameterized.class)
+public class TestInputStreamBase {
+
+  private MiniOzoneCluster cluster;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private OzoneClient client;
+  private ObjectStore objectStore;
+
+  private String volumeName;
+  private String bucketName;
+  private String keyString;
+
+  private ChunkLayOutVersion chunkLayout;
+  private static final Random RAND = new Random();
+
+  protected static final int CHUNK_SIZE = 1024 * 1024;          // 1MB
+  protected static final int FLUSH_SIZE = 2 * CHUNK_SIZE;       // 2MB
+  protected static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;   // 4MB
+  protected static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;   // 8MB
+  protected static final int BYTES_PER_CHECKSUM = 256 * 1024;   // 256KB
+
+  @Rule
+  public Timeout timeout = Timeout.seconds(300);
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> parameters() {
+    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  }
+
+  public TestInputStreamBase(ChunkLayOutVersion layout) {
+    this.chunkLayout = layout;
+  }
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * @throws IOException
+   */
+  @Before
+  public void init() throws Exception {
+    OzoneClientConfig config = new OzoneClientConfig();
+    config.setBytesPerChecksum(BYTES_PER_CHECKSUM);
+    conf.setFromObject(config);
+
+    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, 
TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
+    conf.setQuietMode(false);
+    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
+        StorageUnit.MB);
+    conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.toString());
+
+    ReplicationManagerConfiguration repConf =
+        conf.getObject(ReplicationManagerConfiguration.class);
+    repConf.setInterval(Duration.ofSeconds(1));
+    conf.setFromObject(repConf);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(4)
+        .setTotalPipelineNumLimit(5)
+        .setBlockSize(BLOCK_SIZE)
+        .setChunkSize(CHUNK_SIZE)
+        .setStreamBufferFlushSize(FLUSH_SIZE)
+        .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+        .setStreamBufferSizeUnit(StorageUnit.BYTES)
+        .build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getRpcClient(conf);
+    objectStore = client.getObjectStore();
+
+    volumeName = UUID.randomUUID().toString();
+    bucketName = UUID.randomUUID().toString();
+    keyString = UUID.randomUUID().toString();
+
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @After
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  MiniOzoneCluster getCluster() {
+    return cluster;
+  }
+
+  String getVolumeName() {
+    return volumeName;
+  }
+
+  String getBucketName() {
+    return bucketName;
+  }
+
+  KeyInputStream getKeyInputStream(String keyName) throws IOException {
+    return (KeyInputStream) objectStore
+        .getVolume(volumeName)
+        .getBucket(bucketName)
+        .readKey(keyName).getInputStream();
+  }
+
+  String getNewKeyName() {
+    return UUID.randomUUID().toString();
+  }
+
+  byte[] writeKey(String keyName, int dataLength) throws Exception {
+    OzoneOutputStream key = TestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    byte[] inputData = ContainerTestHelper.getFixedLengthString(
+        keyString, dataLength).getBytes(UTF_8);
+    key.write(inputData);
+    key.close();
+
+    return inputData;
+  }
+
+  byte[] writeRandomBytes(String keyName, int dataLength)
+      throws Exception {
+    OzoneOutputStream key = TestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    byte[] inputData = new byte[dataLength];
+    RAND.nextBytes(inputData);
+    key.write(inputData);
+    key.close();
+
+    return inputData;
+  }
+
+  void validateData(byte[] inputData, int offset, byte[] readData) {
+    int readDataLen = readData.length;
+    byte[] expectedData = new byte[readDataLen];
+    System.arraycopy(inputData, (int) offset, expectedData, 0, readDataLen);
+
+    for (int i=0; i < readDataLen; i++) {
+      Assert.assertEquals("Read data at does not match the input data at " +
+              "position " + (offset + i), expectedData[i], readData[i]);
+    }
+  }
+
+  @Test
+  public void testInputStreams() throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE) + 1;
+    writeRandomBytes(keyName, dataLength);
+
+    KeyInputStream keyInputStream = getKeyInputStream(keyName);
+
+    // Verify BlockStreams and ChunkStreams
+    int expectedNumBlockStreams = BufferUtils.getNumberOfBins(
+        dataLength, BLOCK_SIZE);
+    List<BlockInputStream> blockStreams = keyInputStream.getBlockStreams();
+    Assert.assertEquals(expectedNumBlockStreams, blockStreams.size());
+
+    int readBlockLength = 0;
+    for (BlockInputStream blockStream : blockStreams) {
+      int blockStreamLength = Math.min(BLOCK_SIZE,
+          dataLength - readBlockLength);
+      Assert.assertEquals(blockStreamLength, blockStream.getLength());
+
+      int expectedNumChunkStreams =
+          BufferUtils.getNumberOfBins(blockStreamLength, CHUNK_SIZE);
+      blockStream.initialize();
+      List<ChunkInputStream> chunkStreams = blockStream.getChunkStreams();
+      Assert.assertEquals(expectedNumChunkStreams, chunkStreams.size());
+
+      int readChunkLength = 0;
+      for (ChunkInputStream chunkStream : chunkStreams) {
+        int chunkStreamLength = Math.min(CHUNK_SIZE,
+            blockStreamLength - readChunkLength);
+        Assert.assertEquals(chunkStreamLength, chunkStream.getRemaining());
+
+        readChunkLength += chunkStreamLength;
+      }
+
+      readBlockLength += blockStreamLength;
+    }
+  }
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
similarity index 51%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
index 8a81de0..a451375 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestKeyInputStream.java
@@ -15,208 +15,48 @@
  * the License.
  */
 
-package org.apache.hadoop.ozone.client.rpc;
+package org.apache.hadoop.ozone.client.rpc.read;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collection;
+
 import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.OzoneClientConfig;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
-import 
org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.KeyInputStream;
-import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
-import static org.junit.Assert.fail;
-
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
+import static org.junit.Assert.fail;
+
 /**
  * Tests {@link KeyInputStream}.
  */
-@RunWith(Parameterized.class)
-public class TestKeyInputStream {
+public class TestKeyInputStream extends TestInputStreamBase {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(TestKeyInputStream.class);
 
-  private MiniOzoneCluster cluster;
-  private OzoneConfiguration conf = new OzoneConfiguration();
-  private OzoneClient client;
-  private ObjectStore objectStore;
-  private int chunkSize;
-  private int flushSize;
-  private int maxFlushSize;
-  private int blockSize;
-  private String volumeName;
-  private String bucketName;
-  private String keyString;
-  private ChunkLayoutTestInfo chunkLayout;
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> layouts() {
-    return Arrays.asList(new Object[][] {
-        {ChunkLayoutTestInfo.FILE_PER_CHUNK},
-        {ChunkLayoutTestInfo.FILE_PER_BLOCK}
-    });
-  }
-
-  public TestKeyInputStream(ChunkLayoutTestInfo layout) {
-    this.chunkLayout = layout;
-  }
-  /**
-   * Create a MiniDFSCluster for testing.
-   * <p>
-   * Ozone is made active by setting OZONE_ENABLED = true
-   *
-   * @throws IOException
-   */
-  @Before
-  public void init() throws Exception {
-    chunkSize = 256 * 1024 * 2;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    OzoneClientConfig config = new OzoneClientConfig();
-    config.setBytesPerChecksum(256 * 1024 * 1024);
-    conf.setFromObject(config);
-
-
-    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, 
TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
-    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
-    conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT, 1);
-    conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
-        StorageUnit.MB);
-    conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.name());
-
-    ReplicationManagerConfiguration repConf =
-        conf.getObject(ReplicationManagerConfiguration.class);
-    repConf.setInterval(Duration.ofSeconds(1));
-    conf.setFromObject(repConf);
-
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(4)
-        .setTotalPipelineNumLimit(5)
-        .setBlockSize(blockSize)
-        .setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
-        .setStreamBufferMaxSize(maxFlushSize)
-        .setStreamBufferSizeUnit(StorageUnit.BYTES)
-        .build();
-    cluster.waitForClusterToBeReady();
-    //the easiest way to create an open container is creating a key
-    client = OzoneClientFactory.getRpcClient(conf);
-    objectStore = client.getObjectStore();
-    keyString = UUID.randomUUID().toString();
-    volumeName = "test-key-input-stream-volume";
-    bucketName = "test-key-input-stream-bucket";
-    objectStore.createVolume(volumeName);
-    objectStore.getVolume(volumeName).createBucket(bucketName);
-  }
-
-  @Rule
-  public Timeout timeout = Timeout.seconds(300);
-
-  /**
-   * Shutdown MiniDFSCluster.
-   */
-  @After
-  public void shutdown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  private String getKeyName() {
-    return UUID.randomUUID().toString();
-  }
-
-
-  @Test
-  public void testSeekRandomly() throws Exception {
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
-    // write data of more than 2 blocks.
-    int dataLength = (2 * blockSize) + (chunkSize);
-
-    byte[] inputData = writeRandomBytes(key, dataLength);
-
-    KeyInputStream keyInputStream = (KeyInputStream) objectStore
-        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-        .getInputStream();
-
-    // Seek to some where end.
-    validate(keyInputStream, inputData, dataLength-200, 100);
-
-    // Now seek to start.
-    validate(keyInputStream, inputData, 0, 140);
-
-    validate(keyInputStream, inputData, 200, 300);
-
-    validate(keyInputStream, inputData, 30, 500);
-
-    randomSeek(dataLength, keyInputStream, inputData);
-
-    // Read entire key.
-    validate(keyInputStream, inputData, 0, dataLength);
-
-    // Repeat again and check.
-    randomSeek(dataLength, keyInputStream, inputData);
-
-    validate(keyInputStream, inputData, 0, dataLength);
-
-    keyInputStream.close();
+  public TestKeyInputStream(ChunkLayOutVersion layout) {
+    super(layout);
   }
 
   /**
@@ -260,18 +100,42 @@ public class TestKeyInputStream {
       long seek, int readLength) throws Exception {
     keyInputStream.seek(seek);
 
-    byte[] expectedData = new byte[readLength];
-    keyInputStream.read(expectedData, 0, readLength);
+    byte[] readData = new byte[readLength];
+    keyInputStream.read(readData, 0, readLength);
+
+    validateData(inputData, (int) seek, readData);
+  }
+
+  @Test
+  public void testSeekRandomly() throws Exception {
+    String keyName = getNewKeyName();
+    int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
+    byte[] inputData = writeRandomBytes(keyName, dataLength);
 
-    byte[] dest = new byte[readLength];
+    KeyInputStream keyInputStream = getKeyInputStream(keyName);
 
-    System.arraycopy(inputData, (int)seek, dest, 0, readLength);
+    // Seek to some where end.
+    validate(keyInputStream, inputData, dataLength-200, 100);
 
-    for (int i=0; i < readLength; i++) {
-      Assert.assertEquals(expectedData[i], dest[i]);
-    }
-  }
+    // Now seek to start.
+    validate(keyInputStream, inputData, 0, 140);
 
+    validate(keyInputStream, inputData, 200, 300);
+
+    validate(keyInputStream, inputData, 30, 500);
+
+    randomSeek(dataLength, keyInputStream, inputData);
+
+    // Read entire key.
+    validate(keyInputStream, inputData, 0, dataLength);
+
+    // Repeat again and check.
+    randomSeek(dataLength, keyInputStream, inputData);
+
+    validate(keyInputStream, inputData, 0, dataLength);
+
+    keyInputStream.close();
+  }
 
   @Test
   public void testSeek() throws Exception {
@@ -283,23 +147,15 @@ public class TestKeyInputStream {
     long readChunkCount = metrics.getContainerOpCountMetrics(
         ContainerProtos.Type.ReadChunk);
 
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
+    String keyName = getNewKeyName();
     // write data spanning 3 chunks
-    int dataLength = (2 * chunkSize) + (chunkSize / 2);
-    byte[] inputData = ContainerTestHelper.getFixedLengthString(
-        keyString, dataLength).getBytes(UTF_8);
-    key.write(inputData);
-    key.close();
+    int dataLength = (2 * CHUNK_SIZE) + (CHUNK_SIZE / 2);
+    byte[] inputData = writeKey(keyName, dataLength);
 
     Assert.assertEquals(writeChunkCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
 
-    KeyInputStream keyInputStream = (KeyInputStream) objectStore
-        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-        .getInputStream();
+    KeyInputStream keyInputStream = getKeyInputStream(keyName);
 
     // Seek to position 150
     keyInputStream.seek(150);
@@ -310,8 +166,8 @@ public class TestKeyInputStream {
     Assert.assertEquals(readChunkCount, metrics
         .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
 
-    byte[] readData = new byte[chunkSize];
-    keyInputStream.read(readData, 0, chunkSize);
+    byte[] readData = new byte[CHUNK_SIZE];
+    keyInputStream.read(readData, 0, CHUNK_SIZE);
 
     // Since we reading data from index 150 to 250 and the chunk boundary is
     // 100 bytes, we need to read 2 chunks.
@@ -322,29 +178,25 @@ public class TestKeyInputStream {
 
     // Verify that the data read matches with the input data at corresponding
     // indices.
-    for (int i = 0; i < chunkSize; i++) {
-      Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+    for (int i = 0; i < CHUNK_SIZE; i++) {
+      Assert.assertEquals(inputData[CHUNK_SIZE + 50 + i], readData[i]);
     }
   }
 
   @Test
   public void testReadChunk() throws Exception {
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+    String keyName = getNewKeyName();
 
     // write data spanning multiple blocks/chunks
-    int dataLength = 2 * blockSize + (blockSize / 2);
-    byte[] data = writeRandomBytes(key, dataLength);
+    int dataLength = 2 * BLOCK_SIZE + (BLOCK_SIZE / 2);
+    byte[] data = writeRandomBytes(keyName, dataLength);
 
     // read chunk data
-    try (KeyInputStream keyInputStream = (KeyInputStream) objectStore
-        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-        .getInputStream()) {
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
 
-      int[] bufferSizeList = {chunkSize / 4, chunkSize / 2, chunkSize - 1,
-          chunkSize, chunkSize + 1, blockSize - 1, blockSize, blockSize + 1,
-          blockSize * 2};
+      int[] bufferSizeList = {BYTES_PER_CHECKSUM + 1, CHUNK_SIZE / 4,
+          CHUNK_SIZE / 2, CHUNK_SIZE - 1, CHUNK_SIZE, CHUNK_SIZE + 1,
+          BLOCK_SIZE - 1, BLOCK_SIZE, BLOCK_SIZE + 1, BLOCK_SIZE * 2};
       for (int bufferSize : bufferSizeList) {
         assertReadFully(data, keyInputStream, bufferSize, 0);
         keyInputStream.seek(0);
@@ -362,39 +214,35 @@ public class TestKeyInputStream {
     long readChunkCount = metrics.getContainerOpCountMetrics(
         ContainerProtos.Type.ReadChunk);
 
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
+    String keyName = getNewKeyName();
     // write data spanning 3 chunks
-    int dataLength = (2 * chunkSize) + (chunkSize / 2);
-    byte[] inputData = ContainerTestHelper.getFixedLengthString(
-        keyString, dataLength).getBytes(UTF_8);
-    key.write(inputData);
-    key.close();
+    int dataLength = (2 * CHUNK_SIZE) + (CHUNK_SIZE / 2);
+    byte[] inputData = writeKey(keyName, dataLength);
 
     Assert.assertEquals(writeChunkCount + 3,
         metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
 
-    KeyInputStream keyInputStream = (KeyInputStream) objectStore
-        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-        .getInputStream();
+    KeyInputStream keyInputStream = getKeyInputStream(keyName);
 
     // skip 150
-    keyInputStream.skip(70);
+    long skipped = keyInputStream.skip(70);
+    Assert.assertEquals(70, skipped);
     Assert.assertEquals(70, keyInputStream.getPos());
-    keyInputStream.skip(0);
+
+    skipped = keyInputStream.skip(0);
+    Assert.assertEquals(0, skipped);
     Assert.assertEquals(70, keyInputStream.getPos());
-    keyInputStream.skip(80);
 
+    skipped = keyInputStream.skip(80);
+    Assert.assertEquals(80, skipped);
     Assert.assertEquals(150, keyInputStream.getPos());
 
     // Skip operation should not result in any readChunk operation.
     Assert.assertEquals(readChunkCount, metrics
         .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
 
-    byte[] readData = new byte[chunkSize];
-    keyInputStream.read(readData, 0, chunkSize);
+    byte[] readData = new byte[CHUNK_SIZE];
+    keyInputStream.read(readData, 0, CHUNK_SIZE);
 
     // Since we reading data from index 150 to 250 and the chunk boundary is
     // 100 bytes, we need to read 2 chunks.
@@ -405,8 +253,8 @@ public class TestKeyInputStream {
 
     // Verify that the data read matches with the input data at corresponding
     // indices.
-    for (int i = 0; i < chunkSize; i++) {
-      Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+    for (int i = 0; i < CHUNK_SIZE; i++) {
+      Assert.assertEquals(inputData[CHUNK_SIZE + 50 + i], readData[i]);
     }
   }
 
@@ -421,22 +269,19 @@ public class TestKeyInputStream {
   }
 
   private void testReadAfterReplication(boolean doUnbuffer) throws Exception {
-    Assume.assumeTrue(cluster.getHddsDatanodes().size() > 3);
-
-    int dataLength = 2 * chunkSize;
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, dataLength, objectStore, volumeName, 
bucketName);
+    Assume.assumeTrue(getCluster().getHddsDatanodes().size() > 3);
 
-    byte[] data = writeRandomBytes(key, dataLength);
+    int dataLength = 2 * CHUNK_SIZE;
+    String keyName = getNewKeyName();
+    byte[] data = writeRandomBytes(keyName, dataLength);
 
-    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
-        .setBucketName(bucketName)
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(getVolumeName())
+        .setBucketName(getBucketName())
         .setKeyName(keyName)
         .setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(HddsProtos.ReplicationFactor.THREE)
         .build();
-    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    OmKeyInfo keyInfo = getCluster().getOzoneManager().lookupKey(keyArgs);
 
     OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations();
     Assert.assertNotNull(locations);
@@ -444,16 +289,14 @@ public class TestKeyInputStream {
     Assert.assertEquals(1, locationInfoList.size());
     OmKeyLocationInfo loc = locationInfoList.get(0);
     long containerID = loc.getContainerID();
-    Assert.assertEquals(3, countReplicas(containerID, cluster));
+    Assert.assertEquals(3, countReplicas(containerID, getCluster()));
 
-    TestHelper.waitForContainerClose(cluster, containerID);
+    TestHelper.waitForContainerClose(getCluster(), containerID);
 
     List<DatanodeDetails> pipelineNodes = loc.getPipeline().getNodes();
 
     // read chunk data
-    try (KeyInputStream keyInputStream = (KeyInputStream) objectStore
-        .getVolume(volumeName).getBucket(bucketName)
-        .readKey(keyName).getInputStream()) {
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
 
       int b = keyInputStream.read();
       Assert.assertNotEquals(-1, b);
@@ -462,7 +305,7 @@ public class TestKeyInputStream {
         keyInputStream.unbuffer();
       }
 
-      cluster.shutdownHddsDatanode(pipelineNodes.get(0));
+      getCluster().shutdownHddsDatanode(pipelineNodes.get(0));
 
       // check that we can still read it
       assertReadFully(data, keyInputStream, dataLength - 1, 1);
@@ -482,7 +325,7 @@ public class TestKeyInputStream {
     HddsProtos.NodeState health = null;
     try {
       NodeManager nodeManager =
-          cluster.getStorageContainerManager().getScmNodeManager();
+          getCluster().getStorageContainerManager().getScmNodeManager();
       health = nodeManager.getNodeStatus(dn).getHealth();
     } catch (NodeNotFoundException e) {
       fail("Unexpected NodeNotFound exception");
@@ -490,17 +333,7 @@ public class TestKeyInputStream {
     return health;
   }
 
-  private byte[] writeRandomBytes(OutputStream key, int size)
-      throws IOException {
-    byte[] data = new byte[size];
-    Random r = new Random();
-    r.nextBytes(data);
-    key.write(data);
-    key.close();
-    return data;
-  }
-
-  private static void assertReadFully(byte[] data, InputStream in,
+  private void assertReadFully(byte[] data, InputStream in,
       int bufferSize, int totalRead) throws IOException {
 
     byte[] buffer = new byte[bufferSize];
@@ -518,5 +351,4 @@ public class TestKeyInputStream {
     }
     Assert.assertEquals(data.length, totalRead);
   }
-
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 7c8ff90..5c467a0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -103,7 +103,8 @@ public class TestContainerSmallFile {
         "data123".getBytes(UTF_8), null);
     ContainerProtos.GetSmallFileResponseProto response =
         ContainerProtocolCalls.readSmallFile(client, blockID, null);
-    String readData = response.getData().getData().toStringUtf8();
+    String readData = response.getData().getDataBuffers().getBuffersList()
+        .get(0).toStringUtf8();
     Assert.assertEquals("data123", readData);
     xceiverClientManager.releaseClient(client, false);
   }
@@ -204,7 +205,8 @@ public class TestContainerSmallFile {
     blockID1.setBlockCommitSequenceId(bcsId);
     ContainerProtos.GetSmallFileResponseProto response =
         ContainerProtocolCalls.readSmallFile(client, blockID1, null);
-    String readData = response.getData().getData().toStringUtf8();
+    String readData = response.getData().getDataBuffers().getBuffersList()
+        .get(0).toStringUtf8();
     Assert.assertEquals("data123", readData);
     xceiverClientManager.releaseClient(client, false);
   }
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java
index 7300fa5..c06914a 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkValidator.java
@@ -172,9 +172,13 @@ public class DatanodeChunkValidator extends 
BaseFreonGenerator
         xceiverClientSpi.sendCommand(request);
 
     checksum = new Checksum(ContainerProtos.ChecksumType.CRC32, chunkSize);
-    checksumReference = checksum.computeChecksum(
-        response.getReadChunk().getData().toByteArray()
-    );
+    if (response.getReadChunk().hasData()) {
+      checksumReference = checksum.computeChecksum(
+          response.getReadChunk().getData().toByteArray());
+    } else {
+      checksumReference = checksum.computeChecksum(
+          response.getReadChunk().getDataBuffers().getBuffersList());
+    }
 
   }
 
@@ -221,10 +225,14 @@ public class DatanodeChunkValidator extends 
BaseFreonGenerator
         ContainerProtos.ContainerCommandResponseProto response =
             xceiverClientSpi.sendCommand(request);
 
-        ChecksumData checksumOfChunk =
-            checksum.computeChecksum(
-                response.getReadChunk().getData().toByteArray()
-            );
+        ChecksumData checksumOfChunk;
+        if (response.getReadChunk().hasData()) {
+          checksumOfChunk = checksum.computeChecksum(
+              response.getReadChunk().getData().toByteArray());
+        } else {
+          checksumOfChunk = checksum.computeChecksum(
+              response.getReadChunk().getDataBuffers().getBuffersList());
+        }
 
         if (!checksumReference.equals(checksumOfChunk)) {
           throw new IllegalStateException(
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 7ffebec..31c8736 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -203,7 +203,8 @@ public class BenchMarkDatanodeDispatcher {
     ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
         .newBuilder()
         .setBlockID(blockID.getDatanodeBlockIDProtobuf())
-        .setChunkData(getChunkInfo(blockID, chunkName));
+        .setChunkData(getChunkInfo(blockID, chunkName))
+        .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1);
 
     ContainerCommandRequestProto.Builder request = ContainerCommandRequestProto
         .newBuilder();
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java
index 76cd037..88b5613 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkChunkManager.java
@@ -139,7 +139,7 @@ public class BenchmarkChunkManager {
   public void writeSingleFile(BenchmarkState state, Blackhole sink)
       throws StorageContainerException {
 
-    ChunkManager chunkManager = new FilePerBlockStrategy(true);
+    ChunkManager chunkManager = new FilePerBlockStrategy(true, null);
     benchmark(chunkManager, FILE_PER_BLOCK, state, sink);
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to