[
https://issues.apache.org/jira/browse/HADOOP-19211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862727#comment-17862727
]
ASF GitHub Bot commented on HADOOP-19211:
-----------------------------------------
wujinhu commented on code in PR #6904:
URL: https://github.com/apache/hadoop/pull/6904#discussion_r1664008455
##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java:
##########
@@ -316,4 +349,248 @@ public boolean seekToNewSource(long targetPos) throws
IOException {
public long getExpectNextPos() {
return this.expectNextPos;
}
+
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (toLowerCase(capability)) {
+ case StreamCapabilities.VECTOREDIO:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Validates range parameters.
+ * In case of OSS we already have contentLength from the first GET request
+ * during an open file operation so failing fast here.
+ * @param range requested range.
+ * @throws EOFException end of file exception.
+ */
+ private void validateRangeRequest(FileRange range) throws EOFException {
+ VectoredReadUtils.validateRangeRequest(range);
+ if (range.getOffset() + range.getLength() > contentLength) {
+ final String errMsg = String.format(
+ "Requested range [%d, %d) is beyond EOF for path %s",
+ range.getOffset(), range.getLength(), key);
+ LOG.warn(errMsg);
+ throw new EOFException(errMsg);
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int minSeekForVectorReads() {
+ return vectoredIOContext.getMinSeekForVectorReads();
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public int maxReadSizeForVectorReads() {
+ return vectoredIOContext.getMaxReadSizeForVectorReads();
+ }
+
+ /**
+ * Read data into destination buffer from OSS object content.
+ * @param objectContent result from OSS.
+ * @param dest destination buffer.
+ * @param offset start offset of dest buffer.
+ * @param length number of bytes to fill in dest.
+ * @throws IOException any IOE.
+ */
+ private void readByteArray(InputStream objectContent,
+ byte[] dest,
+ int offset,
+ int length) throws IOException {
+ int readBytes = 0;
+ while (readBytes < length) {
+ int readBytesCurr = objectContent.read(dest,
+ offset + readBytes,
+ length - readBytes);
+ readBytes +=readBytesCurr;
+ if (readBytesCurr < 0) {
+ throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
+ }
+ }
+ }
+
+ /**
+ * Populates the buffer with data from objectContent
+ * till length. Handles both direct and heap byte buffers.
+ * @param range vector range to populate.
+ * @param buffer buffer to fill.
+ * @param objectContent result retrieved from OSS store.
+ * @throws IOException any IOE.
+ */
+ private void populateBuffer(FileRange range,
+ ByteBuffer buffer,
+ InputStream objectContent) throws IOException {
+
+ int length = range.getLength();
+ if (buffer.isDirect()) {
+ VectoredReadUtils.readInDirectBuffer(range, buffer,
+ (position, tmp, offset, currentLength) -> {
+ readByteArray(objectContent, tmp, offset, currentLength);
+ return null;
+ });
+ buffer.flip();
+ } else {
+ readByteArray(objectContent, buffer.array(), 0, length);
+ }
+ }
+
+ /**
+ * Drain unnecessary data in between ranges.
+ * @param objectContent oss data stream.
+ * @param drainQuantity how many bytes to drain.
+ * @throws IOException any IOE.
+ */
+ private void drainUnnecessaryData(InputStream objectContent, long
drainQuantity)
+ throws IOException {
+ int drainBytes = 0;
+ int readCount;
+ while (drainBytes < drainQuantity) {
+ if (drainBytes + Constants.DRAIN_BUFFER_SIZE <= drainQuantity) {
+ byte[] drainBuffer = new byte[Constants.DRAIN_BUFFER_SIZE];
+ readCount = objectContent.read(drainBuffer);
+ } else {
+ byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)];
+ readCount = objectContent.read(drainBuffer);
+ }
+ drainBytes += readCount;
+ }
+ LOG.debug("{} bytes drained from stream ", drainBytes);
+ }
+
+ /**
+ * Populate underlying buffers of the child ranges.
+ * @param combinedFileRange big combined file range.
+ * @param objectContent data from oss.
+ * @param allocate method to allocate child byte buffers.
+ * @throws IOException any IOE.
+ */
+ private void populateChildBuffers(CombinedFileRange combinedFileRange,
+ InputStream objectContent,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ // If the combined file range just contains a single child
+ // range, we only have to fill that one child buffer else
+ // we drain the intermediate data between consecutive ranges
+ // and fill the buffers one by one.
+ if (combinedFileRange.getUnderlying().size() == 1) {
+ FileRange child = combinedFileRange.getUnderlying().get(0);
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child, buffer, objectContent);
+ child.getData().complete(buffer);
+ } else {
+ FileRange prev = null;
+ for (FileRange child : combinedFileRange.getUnderlying()) {
+ if (prev != null && prev.getOffset() + prev.getLength() <
child.getOffset()) {
+ long drainQuantity = child.getOffset() - prev.getOffset() -
prev.getLength();
+ drainUnnecessaryData(objectContent, drainQuantity);
+ }
+ ByteBuffer buffer = allocate.apply(child.getLength());
+ populateBuffer(child, buffer, objectContent);
+ child.getData().complete(buffer);
+ prev = child;
+ }
+ }
+ }
+
+ /**
+ * Read data from OSS for this range and populate the buffer.
+ * @param range range of data to read.
+ * @param buffer buffer to fill.
+ */
+ private void readSingleRange(FileRange range, ByteBuffer buffer) {
+ LOG.debug("Start reading range {} from path {} ", range, key);
+ InputStream objectRange = null;
+ try {
+ long position = range.getOffset();
+ int length = range.getLength();
+ objectRange = store.retrieve(key, position, position + length - 1);
+ populateBuffer(range, buffer, objectRange);
+ range.getData().complete(buffer);
+ } catch (Exception ex) {
+ LOG.warn("Exception while reading a range {} from path {} ", range, key,
ex);
+ range.getData().completeExceptionally(ex);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, objectRange);
+ }
+ }
+
+ /**
+ * Read the data from OSS for the bigger combined file range and update all
the
+ * underlying ranges.
+ * @param combinedFileRange big combined file range.
+ * @param allocate method to create byte buffers to hold result data.
+ */
+ private void readCombinedRangeAndUpdateChildren(CombinedFileRange
combinedFileRange,
+ IntFunction<ByteBuffer>
allocate) {
+ LOG.debug("Start reading combined range {} from path {} ",
combinedFileRange, key);
+ InputStream objectRange = null;
+ try {
+ long position = combinedFileRange.getOffset();
+ int length = combinedFileRange.getLength();
+ objectRange = store.retrieve(key, position, position + length - 1);
+ populateChildBuffers(combinedFileRange, objectRange, allocate);
+ } catch (Exception ex) {
+ LOG.debug("Exception while reading a range {} from path {} ",
combinedFileRange, key, ex);
+ for(FileRange child : combinedFileRange.getUnderlying()) {
+ child.getData().completeExceptionally(ex);
+ }
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, objectRange);
+ }
+ LOG.debug("Finished reading range {} from path {} ", combinedFileRange,
key);
+ }
+
+ /**
+ * {@inheritDoc}
+ * Vectored read implementation for {@link AliyunOSSInputStream}.
+ * @param ranges the byte ranges to read.
+ * @param allocate the function to allocate ByteBuffer.
+ * @throws IOException IOE if any.
+ */
+ @Override
+ public void readVectored(List<? extends FileRange> ranges,
+ IntFunction<ByteBuffer> allocate) throws
IOException {
+ LOG.debug("Starting vectored read on path {} for ranges {} ", key, ranges);
+ checkNotClosed();
+ if (stopVectoredIOOperations.getAndSet(false)) {
+ LOG.debug("Reinstating vectored read operation for path {} ", key);
+ }
+ List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
+ Optional.of(contentLength));
+ for (FileRange range : ranges) {
+ validateRangeRequest(range);
Review Comment:
OK
> AliyunOSS: Support vectored read API
> ------------------------------------
>
> Key: HADOOP-19211
> URL: https://issues.apache.org/jira/browse/HADOOP-19211
> Project: Hadoop Common
> Issue Type: Improvement
> Components: fs/oss
> Affects Versions: 3.2.4, 3.3.6
> Reporter: wujinhu
> Assignee: wujinhu
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]