bharatviswa504 commented on a change in pull request #804: HDDS-1496. Support 
partial chunk reads and checksum verification
URL: https://github.com/apache/hadoop/pull/804#discussion_r289974749
 
 

 ##########
 File path: 
hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
 ##########
 @@ -0,0 +1,198 @@
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class TestChunkInputStream {
+
+  private static final int CHUNK_SIZE = 100;
+  private static final int BYTES_PER_CHECKSUM = 20;
+  private static final String CHUNK_NAME = "dummyChunk";
+  private static final Random RANDOM = new Random();
+  private static Checksum checksum;
+
+  private DummyChunkInputStream chunkStream;
+  private ChunkInfo chunkInfo;
+  private byte[] chunkData;
+
+  @Before
+  public void setup() throws Exception {
+    checksum = new Checksum(ChecksumType.valueOf(
+        OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
+        BYTES_PER_CHECKSUM);
+
+    chunkData = generateRandomData(CHUNK_SIZE);
+
+    chunkInfo = ChunkInfo.newBuilder()
+        .setChunkName(CHUNK_NAME)
+        .setOffset(0)
+        .setLen(CHUNK_SIZE)
+        .setChecksumData(checksum.computeChecksum(
+            chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
+        .build();
+
+    chunkStream = new DummyChunkInputStream(chunkInfo, null, null, null, true);
+  }
+
+  static byte[] generateRandomData(int length) {
+    byte[] bytes = new byte[length];
+    RANDOM.nextBytes(bytes);
+    return bytes;
+  }
+
+  /**
+   * A dummy ChunkInputStream to mock read chunk calls to DN.
+   */
+  public class DummyChunkInputStream extends ChunkInputStream {
+
+    // Stores the read chunk data in each readChunk call
+    List<ByteString> readByteBuffers = new ArrayList<>();
+
+    DummyChunkInputStream(ChunkInfo chunkInfo,
+        BlockID blockId,
+        String traceId,
+        XceiverClientSpi xceiverClient,
+        boolean verifyChecksum) {
+      super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+    }
+
+    public DummyChunkInputStream(ChunkInfo chunkInfo,
+        BlockID blockId,
+        String traceId,
+        XceiverClientSpi xceiverClient,
+        boolean verifyChecksum,
+        byte[] data) {
+      super(chunkInfo, blockId, traceId, xceiverClient, verifyChecksum);
+      chunkData = data;
+    }
+
+    @Override
+    protected ByteString readChunk(ChunkInfo readChunkInfo,
+        List<DatanodeDetails> excludeDns,
+        List<DatanodeDetails> dnListFromReply) {
+      ByteString byteString = ByteString.copyFrom(chunkData,
+          (int) readChunkInfo.getOffset(),
+          (int) readChunkInfo.getLen());
+      readByteBuffers.add(byteString);
+      return byteString;
+    }
+
+    @Override
+    protected List<DatanodeDetails> getDatanodeList() {
+      // return an empty dummy list of size 10
+      return new ArrayList<>(5);
+    }
+
+    @Override
+    protected void checkOpen() {
+      // No action needed
+    }
+  }
+
+  /**
+   * Match readData with the chunkData byte-wise.
+   * @param readData Data read through ChunkInputStream
+   * @param inputDataStartIndex first index (inclusive) in chunkData to compare
+   *                            with read data
+   * @param inputDataEndIndex last index (exclusive) in chunkData to compare
+   *                          with read data
+   */
+  private void matchWithInputData(byte[] readData, int inputDataStartIndex,
+      int inputDataEndIndex) {
+    for (int i = inputDataStartIndex; i < inputDataEndIndex; i++) {
+      Assert.assertEquals(chunkData[i], readData[i - inputDataStartIndex]);
+    }
+  }
+
+  /**
+   * Seek to a position and verify through getPos().
+   */
+  private void seekAndVerify(int pos) throws Exception {
+    chunkStream.seek(pos);
+    Assert.assertEquals("Current position of buffer does not match with the " +
+        "seeked position", pos, chunkStream.getPos());
+  }
+
+  @Test
+  public void testFullChunkRead() throws Exception {
+    byte[] b = new byte[CHUNK_SIZE];
+    chunkStream.read(b, 0 , CHUNK_SIZE);
+
+    matchWithInputData(b, 0, CHUNK_SIZE);
+  }
+
+  @Test
+  public void testPartialChunkRead() throws Exception {
+    int len = CHUNK_SIZE / 2;
+    byte[] b = new byte[len];
+
+    chunkStream.read(b, 0 , len);
+
+    matchWithInputData(b, 0, len);
+
+    // To read chunk data from index 0 to 49 (len = 50), we need to read
+    // chunk from offset 0 to 60 as the checksum boundary is at every 20
+    // bytes. Verify that 60 bytes of chunk data are read and stored in the
+    // buffers.
+    matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+        0, 60);
+
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    seekAndVerify(0);
+
+    try {
+      seekAndVerify(CHUNK_SIZE);
+      Assert.fail("Seeking to Chunk Length should fail.");
+    } catch (EOFException e) {
+      GenericTestUtils.assertExceptionContains("EOF encountered at pos: "
+          + CHUNK_SIZE + " for chunk: " + CHUNK_NAME, e);
+    }
+
+    // Seek before read should update the ChunkInputStream#chunkPosition
+    seekAndVerify(25);
+    Assert.assertEquals(25, chunkStream.getChunkPosition());
+
+    // Read from the seeked position.
+    // Reading from index 25 to 54 should result in the ChunkInputStream
+    // copying chunk data from index 20 to 59 into the buffers (checksum
+    // boundaries).
+    byte[] b = new byte[30];
+    chunkStream.read(b, 0, 30);
+    matchWithInputData(b, 25, 30);
+    matchWithInputData(chunkStream.readByteBuffers.get(0).toByteArray(),
+        20, 60);
+
+    // After read, the position of the chunkStream is evaluated from the
+    // buffers and the chunkPosition should be reset to -1.
+    Assert.assertEquals(-1, chunkStream.getChunkPosition());
+
+    // Seek to a position within the current buffers. Current buffers contain
+    // data from index 20 to 59. ChunkPosition should still not be used to
+    // set the position.
 
 Review comment:
   Can we also have a test case like seek to a position, and then call read 
which we have, and then continue with next read and see we are reading the 
continued data or not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to