ayushtkn commented on code in PR #1747:
URL: https://github.com/apache/hadoop/pull/1747#discussion_r940946727


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java:
##########
@@ -54,12 +69,11 @@ public interface ByteBufferPositionedReadable {
    * stream supports this interface, otherwise they might get a
    * {@link UnsupportedOperationException}.
    * <p>
-   * Implementations should treat 0-length requests as legitimate, and must not
+   * Implementations MUST treat 0-length requests as legitimate, and MUST NOT
    * signal an error upon their receipt.
-   * <p>
-   * This does not change the current offset of a file, and is thread-safe.
-   *
-   * @param position position within file
+   * The {@code position} offset MUST BE zero or positive; if negative
+   * an EOFException SHALL BE raised.

Review Comment:
   Just to be clear, we can't change the behaviour of the stream as well, we 
just need to correct the JavaDoc, changing behaviour to now throw EOF will be 
incompatible 
   The below Javadoc also marks -1 as a legitimate return value
   ```
      * @return the number of bytes read, possibly zero, or -1 if reached
      *         end-of-stream
   ```



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java:
##########
@@ -161,130 +229,264 @@ private void testPreadWithFullByteBuffer(ByteBuffer 
buffer)
    * {@link ByteBuffer#limit()} on the buffer. Validates that only half of the
    * testFile is loaded into the buffer.
    */
-  private void testPreadWithLimitedByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithLimitedByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     // Set the buffer limit to half the size of the file
-    buffer.limit(FILE_SIZE / 2);
+    buffer.limit(HALF_SIZE);
 
     try (FSDataInputStream in = fs.open(testFile)) {
+      in.seek(EOF_POS);
       while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Since we set the buffer limit to half the size of the file, we should
       // have only read half of the file into the buffer
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals(HALF_SIZE, totalBytesRead);
       // Check that the buffer is full and the contents equal the first half of
       // the file
-      assertFalse(buffer.hasRemaining());
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+      assertBufferIsFull();
+      assertBufferEqualsFileContents(0, HALF_SIZE, 0);
+
+      // position hasn't changed
+      assertStreamPosition(in, EOF_POS);
     }
   }
 
   /**
    * Reads half of the testFile into the {@link ByteBuffer} by setting the
    * {@link ByteBuffer#position()} the half the size of the file. Validates 
that
    * only half of the testFile is loaded into the buffer.
+   * <p>
+   * This test interleaves reading from the stream by the classic input
+   * stream API, verifying those bytes are also as expected.
+   * This lets us validate the requirement that these positions reads must
+   * not interfere with the conventional read sequence.
    */
-  private void testPreadWithPositionedByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithPositionedByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     // Set the buffer position to half the size of the file
-    buffer.position(FILE_SIZE / 2);
+    buffer.position(HALF_SIZE);
+    int counter = 0;
 
     try (FSDataInputStream in = fs.open(testFile)) {
+      assertEquals("Byte read from stream",
+          fileContents[counter++], in.read());
       while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
+        assertBufferPosition(totalBytesRead + HALF_SIZE);
+        // read the next byte.
+        assertEquals("Byte read from stream",
+            fileContents[counter++], in.read());
       }
 
       // Since we set the buffer position to half the size of the file, we
       // should have only read half of the file into the buffer
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
+      assertEquals("bytes read",
+          HALF_SIZE, totalBytesRead);
       // Check that the buffer is full and the contents equal the first half of
       // the file
-      assertFalse(buffer.hasRemaining());
-      buffer.position(FILE_SIZE / 2);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
+      assertBufferIsFull();
+      assertBufferEqualsFileContents(HALF_SIZE, HALF_SIZE, 0);
     }
   }
 
+  /**
+   * Assert the buffer ranges matches that in the file.
+   * @param bufferPosition buffer position
+   * @param length length of data to check
+   * @param fileOffset offset in file.
+   */
+  private void assertBufferEqualsFileContents(int bufferPosition,
+      int length,
+      int fileOffset) {
+    buffer.position(bufferPosition);
+    byte[] bufferContents = new byte[length];
+    buffer.get(bufferContents);
+    assertArrayEquals(
+        "Buffer data from [" + bufferPosition + "-" + length + "]",
+        bufferContents,
+        Arrays.copyOfRange(fileContents, fileOffset, fileOffset + length));
+  }
+
   /**
    * Reads half of the testFile into the {@link ByteBuffer} by specifying a
    * position for the pread API that is half of the file size. Validates that
    * only half of the testFile is loaded into the buffer.
    */
-  private void testPositionedPreadWithByteBuffer(
-          ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPositionedPreadWithByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
 
     try (FSDataInputStream in = fs.open(testFile)) {
       // Start reading from halfway through the file
-      while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
+      while ((bytesRead = in.read(totalBytesRead + HALF_SIZE,
               buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Since we starting reading halfway through the file, the buffer should
       // only be half full
-      assertEquals(totalBytesRead, FILE_SIZE / 2);
-      assertEquals(buffer.position(), FILE_SIZE / 2);
-      assertTrue(buffer.hasRemaining());
+      assertEquals("bytes read", HALF_SIZE, totalBytesRead);
+      assertBufferPosition(HALF_SIZE);
+      assertBufferIsNotFull();
       // Check that the buffer contents equal the second half of the file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE / 2];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents,
-              Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
+      assertBufferEqualsFileContents(0, HALF_SIZE, HALF_SIZE);
     }
   }
 
   /**
    * Reads the entire testFile using the preadFully API and validates that its
-   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   * contents are properly loaded into the {@link ByteBuffer}.
    */
-  private void testPreadFullyWithByteBuffer(ByteBuffer buffer)
-          throws IOException {
+  @Test
+  public void testPreadFullyWithByteBuffer() throws IOException {
     int totalBytesRead = 0;
     try (FSDataInputStream in = fs.open(testFile)) {
       in.readFully(totalBytesRead, buffer);
       // Make sure the buffer is full
-      assertFalse(buffer.hasRemaining());
+      assertBufferIsFull();
       // Make sure the contents of the read buffer equal the contents of the
       // file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents, fileContents);
+      assertBufferEqualsFileContents(0, FILE_SIZE, 0);
+    }
+  }
+
+  /**
+   * readFully past the end of the file into an empty buffer; expect this
+   * to be a no-op.
+   */
+  @Test
+  public void testPreadFullyPastEOFEmptyByteBuffer() throws IOException {
+    try (FSDataInputStream in = fs.open(testFile)) {
+      in.readFully(FILE_SIZE + 10, emptyBuffer);
+    }
+  }
+
+  /**
+   * Reads from a negative position -expects a failure.
+   * Also uses the new openFile() API to improve its coverage.
+   */
+  @Test
+  public void testPreadFullyNegativeOffset() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      in.seek(QUARTER_SIZE);
+      intercept(EOFException.class, NEGATIVE_POSITION_READ,
+          () -> in.readFully(-1, buffer));
+      // the stream position has not changed.
+      assertStreamPosition(in, QUARTER_SIZE);
+    }
+  }
+
+  /**
+   * Read fully with a start position past the EOF -expects a failure.
+   */
+  @Test
+  public void testPreadFullyPositionPastEOF() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      in.seek(QUARTER_SIZE);
+      intercept(EOFException.class, EOF_IN_READ_FULLY,
+          () -> in.readFully(FILE_SIZE * 2, buffer));
+      // the stream position has not changed.
+      assertStreamPosition(in, QUARTER_SIZE);
+    }
+  }
+
+  /**
+   * Read which goes past the EOF; expects a failure.
+   * The final state of the buffer is undefined; it may fail fast or fail late.
+   * Also uses the new openFile() API to improve its coverage.
+   */
+  @Test
+  public void testPreadFullySpansEOF() throws Exception {
+    try (FSDataInputStream in = fs.openFile(testFile).build().get()) {
+      intercept(EOFException.class, EOF_IN_READ_FULLY,
+          () -> in.readFully(FILE_SIZE - 10, buffer));
+      if (buffer.position() > 0) {
+        // this implementation does not do a range check before the read;
+        // it got partway through before failing.
+        // this is not an error -just inefficient.
+        LOG.warn("Buffer reads began before range checks with {}", in);

Review Comment:
   What is the benefit of having this, is this log gonna help sometime is 
debugging?



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java:
##########
@@ -73,73 +119,95 @@ public static void setup() throws IOException {
     try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
       out.write(fileContents);
     }
+    emptyFile = new Path("/byte-buffer-pread-emptyfile.dat");
+    ContractTestUtils.touch(fs, emptyFile);
   }
 
-  /**
-   * Test preads with {@link java.nio.HeapByteBuffer}s.
-   */
-  @Test
-  public void testPreadWithHeapByteBuffer() throws IOException {
-    testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-    testPreadFullyWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
-  }
-
-  /**
-   * Test preads with {@link java.nio.DirectByteBuffer}s.
-   */
-  @Test
-  public void testPreadWithDirectByteBuffer() throws IOException {
-    testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
-    testPreadFullyWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
+  @Before
+  public void setup() throws IOException {
+    if (useHeap) {
+      buffer = ByteBuffer.allocate(FILE_SIZE);
+      emptyBuffer = ByteBuffer.allocate(0);
+    } else {
+      buffer = ByteBuffer.allocateDirect(FILE_SIZE);
+      emptyBuffer = ByteBuffer.allocateDirect(0);
+    }
   }
 
   /**
    * Reads the entire testFile using the pread API and validates that its
-   * contents are properly loaded into the supplied {@link ByteBuffer}.
+   * contents are properly loaded into the {@link ByteBuffer}.
    */
-  private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
+  @Test
+  public void testPreadWithByteBuffer() throws IOException {
     int bytesRead;
     int totalBytesRead = 0;
     try (FSDataInputStream in = fs.open(testFile)) {
       while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
         totalBytesRead += bytesRead;
         // Check that each call to read changes the position of the ByteBuffer
         // correctly
-        assertEquals(totalBytesRead, buffer.position());
+        assertBufferPosition(totalBytesRead);
       }
 
       // Make sure the buffer is full
-      assertFalse(buffer.hasRemaining());
+      assertBufferIsFull();
       // Make sure the contents of the read buffer equal the contents of the
       // file
-      buffer.position(0);
-      byte[] bufferContents = new byte[FILE_SIZE];
-      buffer.get(bufferContents);
-      assertArrayEquals(bufferContents, fileContents);
+      assertBufferEqualsFileContents(0, FILE_SIZE, 0);
     }
   }
 
+  /**
+   * Assert that the value of {@code buffer.position()} equals
+   * the expected value.
+   * @param pos required position.
+   */
+  private void assertBufferPosition(final int pos) {
+    assertEquals("Buffer position",
+        pos, buffer.position());
+  }
+
+  /**
+   * Assert the stream is at the given position.
+   * @param in stream
+   * @param pos required position
+   * @throws IOException seek() failure.
+   */
+  private void assertStreamPosition(final Seekable in, long pos)
+      throws IOException {
+    assertEquals("Buffer position",

Review Comment:
   This should be ``Stream Position``



##########
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java:
##########
@@ -1684,6 +1685,9 @@ public int read(long position, final ByteBuffer buf) 
throws IOException {
   @Override
   public void readFully(long position, final ByteBuffer buf)
       throws IOException {
+    if (position < 0) {
+      throw new EOFException(NEGATIVE_POSITION_READ);
+    }

Review Comment:
   The exception message will change now, earlier it would be from below
   ```
         if (nbytes < 0) {
           throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
         }
   ```
   Someone asserting on the exception message will get issues, not sure if it 
is something to do in scope of adding new tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to