[ 
https://issues.apache.org/jira/browse/HADOOP-19211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862727#comment-17862727
 ] 

ASF GitHub Bot commented on HADOOP-19211:
-----------------------------------------

wujinhu commented on code in PR #6904:
URL: https://github.com/apache/hadoop/pull/6904#discussion_r1664008455


##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java:
##########
@@ -316,4 +349,248 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
   public long getExpectNextPos() {
     return this.expectNextPos;
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+      case StreamCapabilities.VECTOREDIO:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Validates range parameters.
+   * In case of OSS we already have contentLength from the first GET request
+   * during an open file operation so failing fast here.
+   * @param range requested range.
+   * @throws EOFException end of file exception.
+   */
+  private void validateRangeRequest(FileRange range) throws EOFException {
+    VectoredReadUtils.validateRangeRequest(range);
+    if (range.getOffset() + range.getLength() > contentLength) {
+      final String errMsg = String.format(
+          "Requested range [%d, %d) is beyond EOF for path %s",
+          range.getOffset(), range.getLength(), key);
+      LOG.warn(errMsg);
+      throw new EOFException(errMsg);
+    }
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
+  /**
+   * Read data into destination buffer from OSS object content.
+   * @param objectContent result from OSS.
+   * @param dest destination buffer.
+   * @param offset start offset of dest buffer.
+   * @param length number of bytes to fill in dest.
+   * @throws IOException any IOE.
+   */
+  private void readByteArray(InputStream objectContent,
+                             byte[] dest,
+                             int offset,
+                             int length) throws IOException {
+    int readBytes = 0;
+    while (readBytes < length) {
+      int readBytesCurr = objectContent.read(dest,
+          offset + readBytes,
+          length - readBytes);
+      readBytes +=readBytesCurr;
+      if (readBytesCurr < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+    }
+  }
+
+  /**
+   * Populates the buffer with data from objectContent
+   * till length. Handles both direct and heap byte buffers.
+   * @param range vector range to populate.
+   * @param buffer buffer to fill.
+   * @param objectContent result retrieved from OSS store.
+   * @throws IOException any IOE.
+   */
+  private void populateBuffer(FileRange range,
+                              ByteBuffer buffer,
+                              InputStream objectContent) throws IOException {
+
+    int length = range.getLength();
+    if (buffer.isDirect()) {
+      VectoredReadUtils.readInDirectBuffer(range, buffer,
+          (position, tmp, offset, currentLength) -> {
+            readByteArray(objectContent, tmp, offset, currentLength);
+            return null;
+          });
+      buffer.flip();
+    } else {
+      readByteArray(objectContent, buffer.array(), 0, length);
+    }
+  }
+
+  /**
+   * Drain unnecessary data in between ranges.
+   * @param objectContent oss data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
+   */
+  private void drainUnnecessaryData(InputStream objectContent, long 
drainQuantity)
+      throws IOException {
+    int drainBytes = 0;
+    int readCount;
+    while (drainBytes < drainQuantity) {
+      if (drainBytes + Constants.DRAIN_BUFFER_SIZE <= drainQuantity) {
+        byte[] drainBuffer = new byte[Constants.DRAIN_BUFFER_SIZE];
+        readCount = objectContent.read(drainBuffer);
+      } else {
+        byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
+        readCount = objectContent.read(drainBuffer);
+      }
+      drainBytes += readCount;
+    }
+    LOG.debug("{} bytes drained from stream ", drainBytes);
+  }
+
+  /**
+   * Populate underlying buffers of the child ranges.
+   * @param combinedFileRange big combined file range.
+   * @param objectContent data from oss.
+   * @param allocate method to allocate child byte buffers.
+   * @throws IOException any IOE.
+   */
+  private void populateChildBuffers(CombinedFileRange combinedFileRange,
+                                    InputStream objectContent,
+                                    IntFunction<ByteBuffer> allocate) throws 
IOException {
+    // If the combined file range just contains a single child
+    // range, we only have to fill that one child buffer else
+    // we drain the intermediate data between consecutive ranges
+    // and fill the buffers one by one.
+    if (combinedFileRange.getUnderlying().size() == 1) {
+      FileRange child = combinedFileRange.getUnderlying().get(0);
+      ByteBuffer buffer = allocate.apply(child.getLength());
+      populateBuffer(child, buffer, objectContent);
+      child.getData().complete(buffer);
+    } else {
+      FileRange prev = null;
+      for (FileRange child : combinedFileRange.getUnderlying()) {
+        if (prev != null && prev.getOffset() + prev.getLength() < 
child.getOffset()) {
+          long drainQuantity = child.getOffset() - prev.getOffset() - 
prev.getLength();
+          drainUnnecessaryData(objectContent, drainQuantity);
+        }
+        ByteBuffer buffer = allocate.apply(child.getLength());
+        populateBuffer(child, buffer, objectContent);
+        child.getData().complete(buffer);
+        prev = child;
+      }
+    }
+  }
+
+  /**
+   * Read data from OSS for this range and populate the buffer.
+   * @param range range of data to read.
+   * @param buffer buffer to fill.
+   */
+  private void readSingleRange(FileRange range, ByteBuffer buffer) {
+    LOG.debug("Start reading range {} from path {} ", range, key);
+    InputStream objectRange = null;
+    try {
+      long position = range.getOffset();
+      int length = range.getLength();
+      objectRange = store.retrieve(key, position, position + length - 1);
+      populateBuffer(range, buffer, objectRange);
+      range.getData().complete(buffer);
+    } catch (Exception ex) {
+      LOG.warn("Exception while reading a range {} from path {} ", range, key, 
ex);
+      range.getData().completeExceptionally(ex);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange);
+    }
+  }
+
+  /**
+   * Read the data from OSS for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
+   */
+  private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, key);
+    InputStream objectRange = null;
+    try {
+      long position = combinedFileRange.getOffset();
+      int length = combinedFileRange.getLength();
+      objectRange = store.retrieve(key, position, position + length - 1);
+      populateChildBuffers(combinedFileRange, objectRange, allocate);
+    } catch (Exception ex) {
+      LOG.debug("Exception while reading a range {} from path {} ", 
combinedFileRange, key, ex);
+      for(FileRange child : combinedFileRange.getUnderlying()) {
+        child.getData().completeExceptionally(ex);
+      }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange);
+    }
+    LOG.debug("Finished reading range {} from path {} ", combinedFileRange, 
key);
+  }
+
+  /**
+   * {@inheritDoc}
+   * Vectored read implementation for {@link AliyunOSSInputStream}.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {
+    LOG.debug("Starting vectored read on path {} for ranges {} ", key, ranges);
+    checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", key);
+    }
+    List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
+        Optional.of(contentLength));
+    for (FileRange range : ranges) {
+      validateRangeRequest(range);

Review Comment:
   OK





> AliyunOSS: Support vectored read API
> ------------------------------------
>
>                 Key: HADOOP-19211
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19211
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/oss
>    Affects Versions: 3.2.4, 3.3.6
>            Reporter: wujinhu
>            Assignee: wujinhu
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to