[ https://issues.apache.org/jira/browse/HADOOP-19394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18004116#comment-18004116 ]
ASF GitHub Bot commented on HADOOP-19394: ----------------------------------------- steveloughran commented on code in PR #7720: URL: https://github.com/apache/hadoop/pull/7720#discussion_r2194800946 ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -63,12 +75,35 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Override Review Comment: add a javadoc explaining why the override ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java: ########## @@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List<? extends FileRange> ranges, + final IntFunction<ByteBuffer> allocate, + final Consumer<ByteBuffer> release) throws IOException { + LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); + throwIfClosed(); + + List<ObjectRange> objectRanges = new ArrayList<>(); + + for (FileRange range : ranges) { + CompletableFuture<ByteBuffer> result = new CompletableFuture<>(); + ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength()); + objectRanges.add(objectRange); + range.setData(result); + } + + // AAL does not do any range coalescing, so input and combined ranges are the same. + this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size()); + inputStream.readVectored(objectRanges, allocate, release); Review Comment: does this call release on errors? curious -and hopeful ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -63,12 +75,35 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @Override protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Override + public void testNegativeOffsetRange(String pBufferType) throws Exception { + verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class); + } + + @Test + public void testReadVectoredWithAALStatsCollection() throws Exception { + + List<FileRange> fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = openVectorFile()){ Review Comment: nit, space before { ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java: ########## @@ -21,9 +21,19 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.FileRange; Review Comment: nit: import block ########## hadoop-project/pom.xml: ########## @@ -207,7 +207,7 @@ <aws-java-sdk.version>1.12.720</aws-java-sdk.version> <aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version> <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version> - <amazon-s3-analyticsaccelerator-s3.version>1.0.0</amazon-s3-analyticsaccelerator-s3.version> + <amazon-s3-analyticsaccelerator-s3.version>1.2.0</amazon-s3-analyticsaccelerator-s3.version> Review Comment: for the final merge, this should be pulled out to its own. should we try to get into 3.4.2 now? ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java: ########## @@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} Review Comment: I'd put that at L148 and cut the params/IOE as superfluous ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java: ########## @@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} Review Comment: same > S3A Analytics Accelerator: vector IO support > -------------------------------------------- > > Key: HADOOP-19394 > URL: https://issues.apache.org/jira/browse/HADOOP-19394 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.4.1 > Reporter: Steve Loughran > Priority: Major > Labels: pull-request-available > > Add vector IO support for analytics accelerator stream > Three stages > # pull up s3a input stream to work with all ObjectInputStreams; do its own > fetching independent of the analytics stream > # provide info to stream of fetches having taken place (remove from cache, > cancel prefetch) > full integration > * return a range from cache if present > * append to the block retrieval callback if a prefetch is in progress > * only do merge + new request if the range cannot be satisifed entirely from > cached data > Out of scope: handling case where part of a range is in cache/retrieval. Too > complicated and so prone to problems. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org