wujinhu commented on code in PR #6904:
URL: https://github.com/apache/hadoop/pull/6904#discussion_r1664008455


##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java:
##########
@@ -316,4 +349,248 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
   public long getExpectNextPos() {
     return this.expectNextPos;
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (toLowerCase(capability)) {
+      case StreamCapabilities.VECTOREDIO:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * Validates range parameters.
+   * In case of OSS we already have contentLength from the first GET request
+   * during an open file operation so failing fast here.
+   * @param range requested range.
+   * @throws EOFException end of file exception.
+   */
+  private void validateRangeRequest(FileRange range) throws EOFException {
+    VectoredReadUtils.validateRangeRequest(range);
+    if (range.getOffset() + range.getLength() > contentLength) {
+      final String errMsg = String.format(
+          "Requested range [%d, %d) is beyond EOF for path %s",
+          range.getOffset(), range.getLength(), key);
+      LOG.warn(errMsg);
+      throw new EOFException(errMsg);
+    }
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
+  /**
+   * Read data into destination buffer from OSS object content.
+   * @param objectContent result from OSS.
+   * @param dest destination buffer.
+   * @param offset start offset of dest buffer.
+   * @param length number of bytes to fill in dest.
+   * @throws IOException any IOE.
+   */
+  private void readByteArray(InputStream objectContent,
+                             byte[] dest,
+                             int offset,
+                             int length) throws IOException {
+    int readBytes = 0;
+    while (readBytes < length) {
+      int readBytesCurr = objectContent.read(dest,
+          offset + readBytes,
+          length - readBytes);
+      readBytes +=readBytesCurr;
+      if (readBytesCurr < 0) {
+        throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+      }
+    }
+  }
+
+  /**
+   * Populates the buffer with data from objectContent
+   * till length. Handles both direct and heap byte buffers.
+   * @param range vector range to populate.
+   * @param buffer buffer to fill.
+   * @param objectContent result retrieved from OSS store.
+   * @throws IOException any IOE.
+   */
+  private void populateBuffer(FileRange range,
+                              ByteBuffer buffer,
+                              InputStream objectContent) throws IOException {
+
+    int length = range.getLength();
+    if (buffer.isDirect()) {
+      VectoredReadUtils.readInDirectBuffer(range, buffer,
+          (position, tmp, offset, currentLength) -> {
+            readByteArray(objectContent, tmp, offset, currentLength);
+            return null;
+          });
+      buffer.flip();
+    } else {
+      readByteArray(objectContent, buffer.array(), 0, length);
+    }
+  }
+
+  /**
+   * Drain unnecessary data in between ranges.
+   * @param objectContent oss data stream.
+   * @param drainQuantity how many bytes to drain.
+   * @throws IOException any IOE.
+   */
+  private void drainUnnecessaryData(InputStream objectContent, long 
drainQuantity)
+      throws IOException {
+    int drainBytes = 0;
+    int readCount;
+    while (drainBytes < drainQuantity) {
+      if (drainBytes + Constants.DRAIN_BUFFER_SIZE <= drainQuantity) {
+        byte[] drainBuffer = new byte[Constants.DRAIN_BUFFER_SIZE];
+        readCount = objectContent.read(drainBuffer);
+      } else {
+        byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
+        readCount = objectContent.read(drainBuffer);
+      }
+      drainBytes += readCount;
+    }
+    LOG.debug("{} bytes drained from stream ", drainBytes);
+  }
+
+  /**
+   * Populate underlying buffers of the child ranges.
+   * @param combinedFileRange big combined file range.
+   * @param objectContent data from oss.
+   * @param allocate method to allocate child byte buffers.
+   * @throws IOException any IOE.
+   */
+  private void populateChildBuffers(CombinedFileRange combinedFileRange,
+                                    InputStream objectContent,
+                                    IntFunction<ByteBuffer> allocate) throws 
IOException {
+    // If the combined file range just contains a single child
+    // range, we only have to fill that one child buffer else
+    // we drain the intermediate data between consecutive ranges
+    // and fill the buffers one by one.
+    if (combinedFileRange.getUnderlying().size() == 1) {
+      FileRange child = combinedFileRange.getUnderlying().get(0);
+      ByteBuffer buffer = allocate.apply(child.getLength());
+      populateBuffer(child, buffer, objectContent);
+      child.getData().complete(buffer);
+    } else {
+      FileRange prev = null;
+      for (FileRange child : combinedFileRange.getUnderlying()) {
+        if (prev != null && prev.getOffset() + prev.getLength() < 
child.getOffset()) {
+          long drainQuantity = child.getOffset() - prev.getOffset() - 
prev.getLength();
+          drainUnnecessaryData(objectContent, drainQuantity);
+        }
+        ByteBuffer buffer = allocate.apply(child.getLength());
+        populateBuffer(child, buffer, objectContent);
+        child.getData().complete(buffer);
+        prev = child;
+      }
+    }
+  }
+
+  /**
+   * Read data from OSS for this range and populate the buffer.
+   * @param range range of data to read.
+   * @param buffer buffer to fill.
+   */
+  private void readSingleRange(FileRange range, ByteBuffer buffer) {
+    LOG.debug("Start reading range {} from path {} ", range, key);
+    InputStream objectRange = null;
+    try {
+      long position = range.getOffset();
+      int length = range.getLength();
+      objectRange = store.retrieve(key, position, position + length - 1);
+      populateBuffer(range, buffer, objectRange);
+      range.getData().complete(buffer);
+    } catch (Exception ex) {
+      LOG.warn("Exception while reading a range {} from path {} ", range, key, 
ex);
+      range.getData().completeExceptionally(ex);
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange);
+    }
+  }
+
+  /**
+   * Read the data from OSS for the bigger combined file range and update all 
the
+   * underlying ranges.
+   * @param combinedFileRange big combined file range.
+   * @param allocate method to create byte buffers to hold result data.
+   */
+  private void readCombinedRangeAndUpdateChildren(CombinedFileRange 
combinedFileRange,
+                                                  IntFunction<ByteBuffer> 
allocate) {
+    LOG.debug("Start reading combined range {} from path {} ", 
combinedFileRange, key);
+    InputStream objectRange = null;
+    try {
+      long position = combinedFileRange.getOffset();
+      int length = combinedFileRange.getLength();
+      objectRange = store.retrieve(key, position, position + length - 1);
+      populateChildBuffers(combinedFileRange, objectRange, allocate);
+    } catch (Exception ex) {
+      LOG.debug("Exception while reading a range {} from path {} ", 
combinedFileRange, key, ex);
+      for(FileRange child : combinedFileRange.getUnderlying()) {
+        child.getData().completeExceptionally(ex);
+      }
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, objectRange);
+    }
+    LOG.debug("Finished reading range {} from path {} ", combinedFileRange, 
key);
+  }
+
+  /**
+   * {@inheritDoc}
+   * Vectored read implementation for {@link AliyunOSSInputStream}.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                           IntFunction<ByteBuffer> allocate) throws 
IOException {
+    LOG.debug("Starting vectored read on path {} for ranges {} ", key, ranges);
+    checkNotClosed();
+    if (stopVectoredIOOperations.getAndSet(false)) {
+      LOG.debug("Reinstating vectored read operation for path {} ", key);
+    }
+    List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
+        Optional.of(contentLength));
+    for (FileRange range : ranges) {
+      validateRangeRequest(range);

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to