steveloughran commented on code in PR #4427:
URL: https://github.com/apache/hadoop/pull/4427#discussion_r896806033


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java:
##########
@@ -273,6 +273,7 @@ public boolean hasCapability(String capability) {
       // new capabilities.
       switch (capability.toLowerCase(Locale.ENGLISH)) {
       case StreamCapabilities.IOSTATISTICS:
+      case StreamCapabilities.VECTOREDIO:

Review Comment:
   nice



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java:
##########
@@ -52,4 +54,8 @@ public interface FileRange {
    * @param data the future of the ByteBuffer that will have the data
    */
   void setData(CompletableFuture<ByteBuffer> data);
+
+  static FileRange createFileRange(long offset, int length) {

Review Comment:
   1. javadoc
   2. where tests use the contructor themselves, they should switch to this 
method



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -209,7 +194,38 @@ public static long roundUp(long offset, int chunkSize) {
   }
 
   /**
-   * Sort and merge ranges to optimize the access from the underlying file
+   * Check if the input ranges are overlapping in nature.
+   * We call two ranges to be overlapping when start offset
+   * of second is less than the end offset of first.
+   * End offset is calculated as start offset + length.
+   * @param input list if input ranges.
+   * @return true/false based on logic explained above.
+   */
+  public static List<? extends FileRange> 
validateNonOverlappingAndReturnSortedRanges(
+          List<? extends FileRange> input) {
+
+    if (input.size() <= 1) {
+      return input;
+    }
+    FileRange[] sortedRanges = sortRanges(input);
+    FileRange prev = sortedRanges[0];
+    for(int i=1; i<sortedRanges.length; i++) {

Review Comment:
   nit: add a space



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java:
##########
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.fs;
 
+import java.util.List;

Review Comment:
   these are just for the javadocs, aren't they? bit of a pain.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -68,35 +69,19 @@ public static void validateVectoredReadRanges(List<? 
extends FileRange> ranges)
 
 
   /**
-   * Read fully a list of file ranges asynchronously from this file.
-   * The default iterates through the ranges to read each synchronously, but
-   * the intent is that subclasses can make more efficient readers.
+   * This is the default implementation which iterates through the ranges
+   * to read each synchronously, but the intent is that subclasses
+   * can make more efficient readers.
    * The data or exceptions are pushed into {@link FileRange#getData()}.
    * @param stream the stream to read the data from
    * @param ranges the byte ranges to read
    * @param allocate the byte buffer allocation
-   * @param minimumSeek the minimum number of bytes to seek over
-   * @param maximumRead the largest number of bytes to combine into a single 
read
    */
   public static void readVectored(PositionedReadable stream,
                                   List<? extends FileRange> ranges,
-                                  IntFunction<ByteBuffer> allocate,
-                                  int minimumSeek,
-                                  int maximumRead) {
-    if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
-      for(FileRange range: ranges) {
-        range.setData(readRangeFrom(stream, range, allocate));
-      }
-    } else {
-      for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
-          maximumRead)) {
-        CompletableFuture<ByteBuffer> read =
-            readRangeFrom(stream, range, allocate);
-        for(FileRange child: range.getUnderlying()) {
-          child.setData(read.thenApply(
-              (b) -> sliceTo(b, range.getOffset(), child)));
-        }
-      }
+                                  IntFunction<ByteBuffer> allocate) {
+    for(FileRange range: ranges) {

Review Comment:
   nit: add a space



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -219,24 +235,22 @@ public static long roundUp(long offset, int chunkSize) {
    *   <li>Some file systems want to round ranges to be at checksum 
boundaries.</li>
    * </ul>
    *
-   * @param input the list of input ranges
+   * @param sortedRanges already sorted list of ranges based on offset.
    * @param chunkSize round the start and end points to multiples of chunkSize
    * @param minimumSeek the smallest gap that we should seek over in bytes
    * @param maxSize the largest combined file range in bytes
    * @return the list of sorted CombinedFileRanges that cover the input
    */
-  public static List<CombinedFileRange> sortAndMergeRanges(List<? extends 
FileRange> input,
-                                                           int chunkSize,
-                                                           int minimumSeek,
-                                                           int maxSize) {
-    // sort the ranges by offset
-    FileRange[] ranges = input.toArray(new FileRange[0]);
-    Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
+  public static List<CombinedFileRange> mergeSortedRanges(List<? extends 
FileRange> sortedRanges,
+                                                          int chunkSize,
+                                                          int minimumSeek,
+                                                          int maxSize) {
+
     CombinedFileRange current = null;
-    List<CombinedFileRange> result = new ArrayList<>(ranges.length);
+    List<CombinedFileRange> result = new ArrayList<>(sortedRanges.size());
 
     // now merge together the ones that merge
-    for(FileRange range: ranges) {
+    for(FileRange range: sortedRanges) {

Review Comment:
   nit: add a space



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java:
##########
@@ -209,7 +194,38 @@ public static long roundUp(long offset, int chunkSize) {
   }
 
   /**
-   * Sort and merge ranges to optimize the access from the underlying file
+   * Check if the input ranges are overlapping in nature.
+   * We call two ranges to be overlapping when start offset
+   * of second is less than the end offset of first.
+   * End offset is calculated as start offset + length.
+   * @param input list if input ranges.
+   * @return true/false based on logic explained above.
+   */
+  public static List<? extends FileRange> 
validateNonOverlappingAndReturnSortedRanges(
+          List<? extends FileRange> input) {
+
+    if (input.size() <= 1) {
+      return input;
+    }
+    FileRange[] sortedRanges = sortRanges(input);
+    FileRange prev = sortedRanges[0];
+    for(int i=1; i<sortedRanges.length; i++) {
+      if (sortedRanges[i].getOffset() < prev.getOffset() + prev.getLength()) {
+        throw new UnsupportedOperationException("Overlapping ranges are not 
supported");
+      }
+    }
+    return Arrays.asList(sortedRanges);
+  }
+
+  public static FileRange[] sortRanges(List<? extends FileRange> input) {

Review Comment:
   nit: javadoc; just use L211's text



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws 
Exception {
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  @Test
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testSameRanges() throws Exception {
+    // Same ranges are special case of overlapping only.
     FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleSameRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  protected List<FileRange> getSampleSameRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    return fileRanges;
+  }
+
+  protected List<FileRange> getSampleOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(400, 500));
+    return fileRanges;
+  }
+  protected void validateUnsupportedOperation(FileSystem fs,
+                                            List<? extends FileRange> 
fileRanges)
+          throws Exception {
     CompletableFuture<FSDataInputStream> builder =
             fs.openFile(path(VECTORED_READ_FILE_NAME))
                     .build();
     try (FSDataInputStream in = builder.get()) {
+      LambdaTestUtils.intercept(UnsupportedOperationException.class,
+              () -> in.readVectored(fileRanges, allocate));
+    }
+  }
+
+  @Test
+  public void testSomeRandomNonOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(new FileRangeImpl(500, 100));
+    fileRanges.add(new FileRangeImpl(1000, 200));
+    fileRanges.add(new FileRangeImpl(50, 10));
+    fileRanges.add(new FileRangeImpl(10, 5));
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {

Review Comment:
   try using openFile() in some of the tests too, including with a file read 
policy of vectored



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -53,10 +58,12 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
 
   public static final int DATASET_LEN = 64 * 1024;
-  private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 
'a', 32);
+  protected static final byte[] DATASET = 
ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
   protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
 
-  private final IntFunction<ByteBuffer> allocate;
+  protected final IntFunction<ByteBuffer> allocate;
+
+  private WeakReferencedElasticByteBufferPool pool = new 
WeakReferencedElasticByteBufferPool();

Review Comment:
   final?



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java:
##########
@@ -338,14 +337,19 @@ public void testReadVectored() throws Exception {
     }).when(stream).readFully(ArgumentMatchers.anyLong(),
         ArgumentMatchers.any(ByteBuffer.class));
     // should not merge the ranges
-    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 
100);
+    VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate);
     Mockito.verify(stream, Mockito.times(3))
         .readFully(ArgumentMatchers.anyLong(), 
ArgumentMatchers.any(ByteBuffer.class));
     for(int b=0; b < input.size(); ++b) {
       validateBuffer("buffer " + b, input.get(b).getData().get(), 0);
     }
   }
 
+  /**
+   * TODO: Honestly this test doesn't makes sense much now as it is similar to 
above.
+   * Took time to fix this though. If you guys approve, I will remove.
+   * @throws Exception
+   */

Review Comment:
   cut it. keeps maintenance costs down -and mockito is expensive to maintain



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws 
Exception {
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  @Test
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testSameRanges() throws Exception {
+    // Same ranges are special case of overlapping only.
     FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleSameRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  protected List<FileRange> getSampleSameRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
     fileRanges.add(new FileRangeImpl(8*1024, 1000));
+    return fileRanges;
+  }
+
+  protected List<FileRange> getSampleOverlappingRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(400, 500));
+    return fileRanges;
+  }
+  protected void validateUnsupportedOperation(FileSystem fs,

Review Comment:
   nit, add a gap, maybe even javadocs given it is protected



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for(FileRange range: sortedRanges) {

Review Comment:
   nit: add a space



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1120,14 +1176,14 @@ public void readByteArray(S3ObjectInputStream 
objectContent,
   }
 
   /**
-   * Read data from S3 using a http request.
-   * This also handles if file has been changed while http call
-   * is getting executed. If file has been changed RemoteFileChangedException
-   * is thrown.
+   * Read data from S3 using a http request with retries.
+   * This also handles if file has been changed while http
+   * call is getting executed. If file has been changed

Review Comment:
   nit: "if the file"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -26,14 +27,16 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.impl.FileRangeImpl;

Review Comment:
   switch to the FileRange.createFileRange method



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java:
##########
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs;
+package org.apache.hadoop.fs.impl;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
+import org.apache.hadoop.fs.FileRange;
+

Review Comment:
   1. declare as @Private just to make clear it should not be directly 
created...and say this in the javadoc
   2. should offset and length be final? or are the setters needed?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java:
##########
@@ -459,10 +459,10 @@ public void test_040_PositionedReadHugeFile() throws 
Throwable {
   public void test_045_vectoredIOHugeFile() throws Throwable {
     assumeHugeFileExists();
     List<FileRange> rangeList = new ArrayList<>();
-    rangeList.add(new FileRangeImpl(5856368, 1167716));
-    rangeList.add(new FileRangeImpl(3520861, 1167700));
-    rangeList.add(new FileRangeImpl(8191913, 1167775));
-    rangeList.add(new FileRangeImpl(1520861, 1167700));
+    rangeList.add(new FileRangeImpl(5856368, 116770));

Review Comment:
   use FileRange.createFileRange



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1120,14 +1176,14 @@ public void readByteArray(S3ObjectInputStream 
objectContent,
   }
 
   /**
-   * Read data from S3 using a http request.
-   * This also handles if file has been changed while http call
-   * is getting executed. If file has been changed RemoteFileChangedException
-   * is thrown.
+   * Read data from S3 using a http request with retries.
+   * This also handles if file has been changed while http

Review Comment:
   nit "while the http"



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java:
##########
@@ -141,8 +140,8 @@ public void testSortAndMerge() {
         new FileRangeImpl(1000, 100)
         );
     assertFalse("Ranges are non disjoint", 
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    List<CombinedFileRange> outputList = VectoredReadUtils.sortAndMergeRanges(
-        input, 100, 1001, 2500);
+    List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+            Arrays.asList(sortRanges(input)), 100, 1001, 2500);
     assertEquals("merged range size", 1, outputList.size());

Review Comment:
   asserts on list size are better done in AssertJ, as it will include the list 
in the exception



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -272,26 +350,29 @@ public void testVectoredReadAfterNormalRead() throws 
Exception {
               .isEqualTo(200);
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testMultipleVectoredReads() throws Exception {
     FileSystem fs = getFileSystem();
-    List<FileRange> fileRanges1 = createSomeOverlappingRanges();
-    List<FileRange> fileRanges2 = createSomeOverlappingRanges();
+    List<FileRange> fileRanges1 = createSomeRandomRanges();
+    List<FileRange> fileRanges2 = createSomeRandomRanges();
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges1, allocate);
       in.readVectored(fileRanges2, allocate);
       validateVectoredReadResult(fileRanges2, DATASET);
       validateVectoredReadResult(fileRanges1, DATASET);
+      returnBuffersToPoolPostRead(fileRanges1, pool);
+      returnBuffersToPoolPostRead(fileRanges2, pool);
     }
   }
 
-  protected List<FileRange> createSomeOverlappingRanges() {
+  protected List<FileRange> createSomeRandomRanges() {

Review Comment:
   not really random, "non overlapping" better



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for(FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
       for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,

Review Comment:
   this reference needs to be kept until all the reads have finished, as if its 
finalizer is called it closes the inner streams. Are you doing this? 
   
   It does look like it...just add a comment above the variable explaining why 
it MUST be kept



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -167,36 +195,85 @@ public void testSomeRangesMergedSomeUnmerged() throws 
Exception {
     try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
       in.readVectored(fileRanges, allocate);
       validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  @Test
+  public void testOverlappingRanges() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleOverlappingRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
     }
   }
 
   @Test
   public void testSameRanges() throws Exception {
+    // Same ranges are special case of overlapping only.
     FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = getSampleSameRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, pool);
+    }
+  }
+
+  protected List<FileRange> getSampleSameRanges() {
     List<FileRange> fileRanges = new ArrayList<>();
     fileRanges.add(new FileRangeImpl(8*1024, 1000));

Review Comment:
   add some spaces there, or at least an int variable to something like 8_000



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java:
##########
@@ -19,9 +19,8 @@
 package org.apache.hadoop.fs;
 
 import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.permission.FsPermission;
 import static org.apache.hadoop.fs.FileSystemTestHelper.*;

Review Comment:
   there's a mixup of static and non static imports. This is probably worth 
fixing while you are cleaning up the unused imports, as it's more unusual



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java:
##########
@@ -1136,6 +1138,17 @@ public static void 
validateVectoredReadResult(List<FileRange> fileRanges,
     }
   }
 
+  public static void returnBuffersToPoolPostRead(List<FileRange> fileRanges,

Review Comment:
   javadoc please



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java:
##########
@@ -32,4 +50,38 @@ public TestLocalFSContractVectoredRead(String bufferType) {
   protected AbstractFSContract createContract(Configuration conf) {
     return new LocalFSContract(conf);
   }
+
+  @Test
+  public void testChecksumValidationDuringVectoredRead() throws Exception {
+    Path testPath = path("big_range_checksum");
+    LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
+    byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
+    try (FSDataOutputStream out = localFs.create(testPath, true)){
+      out.write(DATASET_CORRECT);
+    }
+    Path checksumPath = localFs.getChecksumFile(testPath);

Review Comment:
   do think this would be safer through the java file API, or do other tests do 
the same trick here



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -940,90 +949,133 @@ public void readVectored(List<? extends FileRange> 
ranges,
 
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, 
ranges);
     checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+    }
+    List<? extends FileRange> sortedRanges = 
validateNonOverlappingAndReturnSortedRanges(ranges);
     for (FileRange range : ranges) {
       validateRangeRequest(range);
       CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
       range.setData(result);
     }
 
-    if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) {
+    if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
-      for(FileRange range: ranges) {
+      for(FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
       }
     } else {
       LOG.debug("Trying to merge the ranges as they are not disjoint");
-      List<CombinedFileRange> combinedFileRanges = sortAndMergeRanges(ranges,
+      List<CombinedFileRange> combinedFileRanges = 
mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
       LOG.debug("Number of original ranges size {} , Number of combined ranges 
{} ",
               ranges.size(), combinedFileRanges.size());
       for(CombinedFileRange combinedFileRange: combinedFileRanges) {
-        CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
-        ByteBuffer buffer = allocate.apply(combinedFileRange.getLength());
-        combinedFileRange.setData(result);
         unboundedThreadPool.submit(
-            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
buffer));
+            () -> readCombinedRangeAndUpdateChildren(combinedFileRange, 
allocate));
       }
     }
     LOG.debug("Finished submitting vectored read to threadpool" +
             " on path {} for ranges {} ", pathStr, ranges);
   }
 
   /**
-   * Read data in the combinedFileRange and update data in buffers
-   * of all underlying ranges.
-   * @param combinedFileRange combined range.
-   * @param buffer combined buffer.
+   * Read the data from S3 for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
    */
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
-                                                  ByteBuffer buffer) {
-    // Not putting read single range call inside try block as
-    // exception if any occurred during this call will be raised
-    // during awaitFuture call while getting the combined buffer.
-    readSingleRange(combinedFileRange, buffer);
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, pathStr);
+    S3Object objectRange = null;
+    S3ObjectInputStream objectContent = null;
     try {
-      // In case of single range we return the original byte buffer else
-      // we return slice byte buffers for each child ranges.
-      ByteBuffer combinedBuffer = 
FutureIOSupport.awaitFuture(combinedFileRange.getData());
-      if (combinedFileRange.getUnderlying().size() == 1) {
-        
combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer);
-      } else {
-        for (FileRange child : combinedFileRange.getUnderlying()) {
-          updateOriginalRange(child, combinedBuffer, combinedFileRange);
-        }
+      checkIfVectoredIOStopped();
+      final String operationName = "readCombinedFileRange";
+      objectRange = getS3Object(operationName,
+              combinedFileRange.getOffset(),
+              combinedFileRange.getLength());
+      objectContent = objectRange.getObjectContent();
+      if (objectContent == null) {
+        throw new PathIOException(uri,
+                "Null IO stream received during " + operationName);
       }
+      populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
-      LOG.warn("Exception occurred while reading combined range from file {}", 
pathStr, ex);
+      LOG.warn("Exception while reading a range {} from path {} ", 
combinedFileRange, pathStr, ex);
       for(FileRange child : combinedFileRange.getUnderlying()) {
         child.getData().completeExceptionally(ex);
       }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange, objectContent);
+    }
+    LOG.debug("Finished reading range {} from path {} ", combinedFileRange, 
pathStr);
+  }
+
+  /**
+   * Populate underlying buffers of the child ranges.
+   * @param combinedFileRange big combined file range.
+   * @param objectContent data from s3.
+   * @param allocate method to allocate child byte buffers.
+   * @throws IOException any IOE.
+   */
+  private void populateChildBuffers(CombinedFileRange combinedFileRange,
+                                    S3ObjectInputStream objectContent,
+                                    IntFunction<ByteBuffer> allocate) throws 
IOException {
+    // If the combined file range just contains a single child
+    // range, we only have to fill that one child buffer else
+    // we drain the intermediate data between consecutive ranges
+    // and fill the buffers one by one.
+    if (combinedFileRange.getUnderlying().size() == 1) {
+      FileRange child = combinedFileRange.getUnderlying().get(0);
+      ByteBuffer buffer = allocate.apply(child.getLength());
+      populateBuffer(child.getLength(), buffer, objectContent);
+      child.getData().complete(buffer);
+    } else {
+      FileRange prev = null;
+      for (FileRange child : combinedFileRange.getUnderlying()) {
+        if (prev != null && prev.getOffset() + prev.getLength() < 
child.getOffset()) {
+          long drainQuantity = child.getOffset() - prev.getOffset() - 
prev.getLength();
+          drainUnnecessaryData(objectContent, drainQuantity);
+        }
+        ByteBuffer buffer = allocate.apply(child.getLength());
+        populateBuffer(child.getLength(), buffer, objectContent);
+        child.getData().complete(buffer);
+        prev = child;
+      }
     }
   }
 
   /**
-   * Update data in child range from combined range.
-   * @param child child range.
-   * @param combinedBuffer combined buffer.
-   * @param combinedFileRange combined range.
+   * Drain unnecessary data in between ranges.
+   * @param objectContent s3 data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
    */
-  private void updateOriginalRange(FileRange child,
-                                   ByteBuffer combinedBuffer,
-                                   CombinedFileRange combinedFileRange) {
-    LOG.trace("Start Filling original range [{}, {}) from combined range [{}, 
{}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
-    ByteBuffer childBuffer = sliceTo(combinedBuffer, 
combinedFileRange.getOffset(), child);
-    child.getData().complete(childBuffer);
-    LOG.trace("End Filling original range [{}, {}) from combined range [{}, 
{}) ",
-            child.getOffset(), child.getLength(),
-            combinedFileRange.getOffset(), combinedFileRange.getLength());
+  private void drainUnnecessaryData(S3ObjectInputStream objectContent, long 
drainQuantity)

Review Comment:
   are you collecting any stats on this or other vector io reads?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1152,6 +1212,12 @@ private S3Object getS3Object(String operationName, long 
position,
     return objectRange;
   }
 
+  private void checkIfVectoredIOStopped() throws InterruptedIOException {

Review Comment:
   add a javadoc



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -99,4 +102,58 @@ public void testMinSeekAndMaxSizeDefaultValues() throws 
Exception {
       }
     }
   }
+
+  @Test
+  public void testStopVectoredIoOperationsCloseStream() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSomeRandomRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+      in.readVectored(fileRanges, allocate);
+      in.close();
+      LambdaTestUtils.intercept(InterruptedIOException.class,
+        () -> validateVectoredReadResult(fileRanges, DATASET));
+    }
+    // reopening the stream should succeed.
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){
+      in.readVectored(fileRanges, allocate);
+      validateVectoredReadResult(fileRanges, DATASET);
+    }
+  }
+
+  @Test
+  public void testStopVectoredIoOperationsUnbuffer() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = createSomeRandomRanges();
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){

Review Comment:
   try using openfile. might be good to even call getFileStatus and pass that 
in too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to