[ 
https://issues.apache.org/jira/browse/HADOOP-11867?focusedWorklogId=487025&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-487025
 ]

ASF GitHub Bot logged work on HADOOP-11867:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Sep/20 16:00
            Start Date: 21/Sep/20 16:00
    Worklog Time Spent: 10m 
      Work Description: omalley commented on a change in pull request #1830:
URL: https://github.com/apache/hadoop/pull/1830#discussion_r492174484



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AsyncReaderUtils.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.IntFunction;
+
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.PositionedReadable;
+
+public class AsyncReaderUtils {
+  /**
+   * Read fully a list of file ranges asynchronously from this file.
+   * The default iterates through the ranges to read each synchronously, but
+   * the intent is that subclasses can make more efficient readers.
+   * The data or exceptions are pushed into {@link FileRange#getData()}.
+   * @param stream the stream to read the data from
+   * @param ranges the byte ranges to read
+   * @param allocate the byte buffer allocation
+   * @param minimumSeek the minimum number of bytes to seek over
+   * @param maximumRead the largest number of bytes to combine into a single 
read
+   */
+  public static void readAsync(PositionedReadable stream,
+                               List<? extends FileRange> ranges,
+                               IntFunction<ByteBuffer> allocate,
+                               int minimumSeek,
+                               int maximumRead) {
+    if (isOrderedDisjoint(ranges, 1, minimumSeek)) {
+      for(FileRange range: ranges) {
+        range.setData(readRangeFrom(stream, range, allocate));
+      }
+    } else {
+      for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek,
+          maximumRead)) {
+        CompletableFuture<ByteBuffer> read =
+            readRangeFrom(stream, range, allocate);
+        for(FileRange child: range.getUnderlying()) {
+          child.setData(read.thenApply(
+              (b) -> sliceTo(b, range.getOffset(), child)));
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously reads a range from the stream dealing with the combinations
+   * of ByteBuffers buffers and PositionedReadable streams.
+   * @param stream the stream to read from
+   * @param range the range to read
+   * @param allocate the function to allocate ByteBuffers
+   * @return the CompletableFuture that contains the read data
+   */
+  public static CompletableFuture<ByteBuffer> readRangeFrom(PositionedReadable 
stream,
+                                                            FileRange range,
+                                                            
IntFunction<ByteBuffer> allocate) {
+    CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+    try {
+      ByteBuffer buffer = allocate.apply(range.getLength());
+      if (stream instanceof ByteBufferPositionedReadable) {
+        ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(),
+            buffer);
+        buffer.flip();
+      } else {
+        if (buffer.isDirect()) {
+          // if we need to read data from a direct buffer and the stream 
doesn't
+          // support it, we allocate a byte array to use.
+          byte[] tmp = new byte[range.getLength()];
+          stream.readFully(range.getOffset(), tmp, 0, tmp.length);
+          buffer.put(tmp);
+          buffer.flip();
+        } else {
+          stream.readFully(range.getOffset(), buffer.array(),
+              buffer.arrayOffset(), range.getLength());
+        }
+      }
+      result.complete(buffer);
+    } catch (IOException ioe) {
+      result.completeExceptionally(ioe);
+    }
+    return result;
+  }
+
+  /**
+   * Is the given input list:
+   * <ul>
+   *   <li>already sorted by offset</li>
+   *   <li>each range is more than minimumSeek apart</li>
+   *   <li>the start and end of each range is a multiple of chunkSize</li>
+   * </ul>
+   *
+   * @param input the list of input ranges
+   * @param chunkSize the size of the chunks that the offset & end must align 
to
+   * @param minimumSeek the minimum distance between ranges
+   * @return true if we can use the input list as is
+   */
+  public static boolean isOrderedDisjoint(List<? extends FileRange> input,
+                                          int chunkSize,
+                                          int minimumSeek) {
+    long previous = -minimumSeek;
+    for(FileRange range: input) {
+      long offset = range.getOffset();
+      long end = range.getOffset() + range.getLength();
+      if (offset % chunkSize != 0 ||
+              end % chunkSize != 0 ||
+              (offset - previous < minimumSeek)) {
+        return false;
+      }
+      previous = end;
+    }
+    return true;
+  }
+
+  public static long roundDown(long offset, int chunkSize) {
+    if (chunkSize > 1) {
+      return offset - (offset % chunkSize);
+    } else {
+      return offset;
+    }
+  }
+
+  public static long roundUp(long offset, int chunkSize) {
+    if (chunkSize > 1) {
+      long next = offset + chunkSize - 1;
+      return next - (next % chunkSize);
+    } else {
+      return offset;
+    }
+  }
+
+  /**
+   * Sort and merge ranges to optimize the access from the underlying file
+   * system.
+   * The motivations are that:
+   * <ul>
+   *   <li>Upper layers want to pass down logical file ranges.</li>
+   *   <li>Fewer reads have better performance.</li>
+   *   <li>Applications want callbacks as ranges are read.</li>
+   *   <li>Some file systems want to round ranges to be at checksum 
boundaries.</li>
+   * </ul>
+   *
+   * @param input the list of input ranges
+   * @param chunkSize round the start and end points to multiples of chunkSize
+   * @param minimumSeek the smallest gap that we should seek over in bytes
+   * @param maxSize the largest combined file range in bytes
+   * @return the list of sorted CombinedFileRanges that cover the input
+   */
+  public static List<CombinedFileRange> sortAndMergeRanges(List<? extends 
FileRange> input,
+                                                           int chunkSize,
+                                                           int minimumSeek,
+                                                           int maxSize) {
+    // sort the ranges by offset
+    FileRange[] ranges = input.toArray(new FileRange[0]);
+    Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset));
+    CombinedFileRange current = null;
+    List<CombinedFileRange> result = new ArrayList<>(ranges.length);
+
+    // now merge together the ones that merge
+    for(FileRange range: ranges) {
+      long start = roundDown(range.getOffset(), chunkSize);
+      long end = roundUp(range.getOffset() + range.getLength(), chunkSize);
+      if (current == null || !current.merge(start, end, range, minimumSeek, 
maxSize)) {
+        current = new CombinedFileRange(start, end, range);
+        result.add(current);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Slice the data that was read to the user's request.
+   * This function assumes that the user's request is completely subsumed by 
the
+   * read data.
+   * @param readData the buffer with the readData
+   * @param readOffset the offset in the file for the readData
+   * @param request the user's request
+   * @return the readData buffer that is sliced to the user's request
+   */
+  public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
+                                   FileRange request) {
+    int offsetChange = (int) (request.getOffset() - readOffset);
+    int requestLength = request.getLength();
+    // If we need to change the offset or length, make a copy and do it
+    if (offsetChange != 0 || readData.remaining() != requestLength) {
+      readData = readData.slice();

Review comment:
       But it isn't copying the data. It is much closer to ByteBuffer's slice, 
which gives a second view on to the same data buffer. So you get a new 
ByteBuffer object that shares the same underlying memory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 487025)
    Time Spent: 2h 10m  (was: 2h)

> FS API: Add a high-performance vectored Read to FSDataInputStream API
> ---------------------------------------------------------------------
>
>                 Key: HADOOP-11867
>                 URL: https://issues.apache.org/jira/browse/HADOOP-11867
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: fs, fs/azure, fs/s3, hdfs-client
>    Affects Versions: 3.0.0
>            Reporter: Gopal Vijayaraghavan
>            Assignee: Owen O'Malley
>            Priority: Major
>              Labels: performance, pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The most significant way to read from a filesystem in an efficient way is to 
> let the FileSystem implementation handle the seek behaviour underneath the 
> API to be the most efficient as possible.
> A better approach to the seek problem is to provide a sequence of read 
> locations as part of a single call, while letting the system schedule/plan 
> the reads ahead of time.
> This is exceedingly useful for seek-heavy readers on HDFS, since this allows 
> for potentially optimizing away the seek-gaps within the FSDataInputStream 
> implementation.
> For seek+read systems with even more latency than locally-attached disks, 
> something like a {{readFully(long[] offsets, ByteBuffer[] chunks)}} would 
> take of the seeks internally while reading chunk.remaining() bytes into each 
> chunk (which may be {{slice()}}ed off a bigger buffer).
> The base implementation can stub in this as a sequence of seeks + read() into 
> ByteBuffers, without forcing each FS implementation to override this in any 
> way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to