[ 
https://issues.apache.org/jira/browse/HADOOP-19211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862728#comment-17862728
 ] 

ASF GitHub Bot commented on HADOOP-19211:
-----------------------------------------

wujinhu commented on code in PR #6904:
URL: https://github.com/apache/hadoop/pull/6904#discussion_r1664012962


##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java:
##########
@@ -316,4 +349,248 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
   public long getExpectNextPos() {
     return this.expectNextPos;
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+      case StreamCapabilities.VECTOREDIO:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Validates range parameters.
+   * In case of OSS we already have contentLength from the first GET request
+   * during an open file operation so failing fast here.
+   * @param range requested range.
+   * @throws EOFException end of file exception.
+   */
+  private void validateRangeRequest(FileRange range) throws EOFException {
+    VectoredReadUtils.validateRangeRequest(range);
+    if (range.getOffset() + range.getLength() > contentLength) {
+      final String errMsg = String.format(
+          "Requested range [%d, %d) is beyond EOF for path %s",
+          range.getOffset(), range.getLength(), key);
+      LOG.warn(errMsg);
+      throw new EOFException(errMsg);
+    }
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
+  /**
+   * Read data into destination buffer from OSS object content.
+   * @param objectContent result from OSS.
+   * @param dest destination buffer.
+   * @param offset start offset of dest buffer.
+   * @param length number of bytes to fill in dest.
+   * @throws IOException any IOE.
+   */
+  private void readByteArray(InputStream objectContent,
+                             byte[] dest,
+                             int offset,
+                             int length) throws IOException {
+    int readBytes = 0;
+    while (readBytes < length) {
+      int readBytesCurr = objectContent.read(dest,
+          offset + readBytes,
+          length - readBytes);
+      readBytes +=readBytesCurr;
+      if (readBytesCurr < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+    }
+  }
+
+  /**
+   * Populates the buffer with data from objectContent
+   * till length. Handles both direct and heap byte buffers.
+   * @param range vector range to populate.
+   * @param buffer buffer to fill.
+   * @param objectContent result retrieved from OSS store.
+   * @throws IOException any IOE.
+   */
+  private void populateBuffer(FileRange range,
+                              ByteBuffer buffer,
+                              InputStream objectContent) throws IOException {
+
+    int length = range.getLength();
+    if (buffer.isDirect()) {
+      VectoredReadUtils.readInDirectBuffer(range, buffer,
+          (position, tmp, offset, currentLength) -> {
+            readByteArray(objectContent, tmp, offset, currentLength);
+            return null;
+          });
+      buffer.flip();
+    } else {
+      readByteArray(objectContent, buffer.array(), 0, length);
+    }
+  }
+
+  /**
+   * Drain unnecessary data in between ranges.
+   * @param objectContent oss data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
+   */
+  private void drainUnnecessaryData(InputStream objectContent, long 
drainQuantity)
+      throws IOException {
+    int drainBytes = 0;
+    int readCount;
+    while (drainBytes < drainQuantity) {
+      if (drainBytes + Constants.DRAIN_BUFFER_SIZE <= drainQuantity) {
+        byte[] drainBuffer = new byte[Constants.DRAIN_BUFFER_SIZE];
+        readCount = objectContent.read(drainBuffer);

Review Comment:
   Good catch! I think we need consider this as s3a does.





> AliyunOSS: Support vectored read API
> ------------------------------------
>
>                 Key: HADOOP-19211
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19211
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/oss
>    Affects Versions: 3.2.4, 3.3.6
>            Reporter: wujinhu
>            Assignee: wujinhu
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to