wujinhu commented on code in PR #6904:
URL: https://github.com/apache/hadoop/pull/6904#discussion_r1664012962


##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java:
##########
@@ -316,4 +349,248 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
   public long getExpectNextPos() {
     return this.expectNextPos;
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+      case StreamCapabilities.VECTOREDIO:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Validates range parameters.
+   * In case of OSS we already have contentLength from the first GET request
+   * during an open file operation so failing fast here.
+   * @param range requested range.
+   * @throws EOFException end of file exception.
+   */
+  private void validateRangeRequest(FileRange range) throws EOFException {
+    VectoredReadUtils.validateRangeRequest(range);
+    if (range.getOffset() + range.getLength() > contentLength) {
+      final String errMsg = String.format(
+          "Requested range [%d, %d) is beyond EOF for path %s",
+          range.getOffset(), range.getLength(), key);
+      LOG.warn(errMsg);
+      throw new EOFException(errMsg);
+    }
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
+  /**
+   * Read data into destination buffer from OSS object content.
+   * @param objectContent result from OSS.
+   * @param dest destination buffer.
+   * @param offset start offset of dest buffer.
+   * @param length number of bytes to fill in dest.
+   * @throws IOException any IOE.
+   */
+  private void readByteArray(InputStream objectContent,
+                             byte[] dest,
+                             int offset,
+                             int length) throws IOException {
+    int readBytes = 0;
+    while (readBytes < length) {
+      int readBytesCurr = objectContent.read(dest,
+          offset + readBytes,
+          length - readBytes);
+      readBytes +=readBytesCurr;
+      if (readBytesCurr < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+    }
+  }
+
+  /**
+   * Populates the buffer with data from objectContent
+   * till length. Handles both direct and heap byte buffers.
+   * @param range vector range to populate.
+   * @param buffer buffer to fill.
+   * @param objectContent result retrieved from OSS store.
+   * @throws IOException any IOE.
+   */
+  private void populateBuffer(FileRange range,
+                              ByteBuffer buffer,
+                              InputStream objectContent) throws IOException {
+
+    int length = range.getLength();
+    if (buffer.isDirect()) {
+      VectoredReadUtils.readInDirectBuffer(range, buffer,
+          (position, tmp, offset, currentLength) -> {
+            readByteArray(objectContent, tmp, offset, currentLength);
+            return null;
+          });
+      buffer.flip();
+    } else {
+      readByteArray(objectContent, buffer.array(), 0, length);
+    }
+  }
+
+  /**
+   * Drain unnecessary data in between ranges.
+   * @param objectContent oss data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
+   */
+  private void drainUnnecessaryData(InputStream objectContent, long 
drainQuantity)
+      throws IOException {
+    int drainBytes = 0;
+    int readCount;
+    while (drainBytes < drainQuantity) {
+      if (drainBytes + Constants.DRAIN_BUFFER_SIZE <= drainQuantity) {
+        byte[] drainBuffer = new byte[Constants.DRAIN_BUFFER_SIZE];
+        readCount = objectContent.read(drainBuffer);

Review Comment:
   Good catch! I think we need consider this as what s3a does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to