[ 
https://issues.apache.org/jira/browse/HADOOP-19394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18004116#comment-18004116
 ] 

ASF GitHub Bot commented on HADOOP-19394:
-----------------------------------------

steveloughran commented on code in PR #7720:
URL: https://github.com/apache/hadoop/pull/7720#discussion_r2194800946


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -63,12 +75,35 @@ protected Configuration createConfiguration() {
     // This issue is tracked in:
     // https://github.com/awslabs/analytics-accelerator-s3/issues/218
     skipForAnyEncryptionExceptSSES3(conf);
-    conf.set("fs.contract.vector-io-early-eof-check", "false");
     return conf;
   }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  @Override

Review Comment:
   add a javadoc explaining why the override



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
     return bytesRead;
   }
 
+  /**
+   * {@inheritDoc}
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                                        IntFunction<ByteBuffer> allocate) 
throws IOException {
+    readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * {@inheritDoc}
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(final List<? extends FileRange> ranges,
+                           final IntFunction<ByteBuffer> allocate,
+                           final Consumer<ByteBuffer> release) throws 
IOException {
+    LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", 
getPathStr(), ranges);
+    throwIfClosed();
+
+    List<ObjectRange> objectRanges = new ArrayList<>();
+
+    for (FileRange range : ranges) {
+      CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+      ObjectRange objectRange = new ObjectRange(result, range.getOffset(), 
range.getLength());
+      objectRanges.add(objectRange);
+      range.setData(result);
+    }
+
+    // AAL does not do any range coalescing, so input and combined ranges are 
the same.
+    this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), 
ranges.size());
+    inputStream.readVectored(objectRanges, allocate, release);

Review Comment:
   does this call release on errors? curious -and hopeful



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -63,12 +75,35 @@ protected Configuration createConfiguration() {
     // This issue is tracked in:
     // https://github.com/awslabs/analytics-accelerator-s3/issues/218
     skipForAnyEncryptionExceptSSES3(conf);
-    conf.set("fs.contract.vector-io-early-eof-check", "false");
     return conf;
   }
 
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  @Override
+  public void testNegativeOffsetRange(String pBufferType)  throws Exception {
+    verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), 
IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testReadVectoredWithAALStatsCollection() throws Exception {
+
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
+    try (FSDataInputStream in = openVectorFile()){

Review Comment:
   nit, space before {



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -21,9 +21,19 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
 
+import org.apache.hadoop.fs.FileRange;

Review Comment:
   nit: import block



##########
hadoop-project/pom.xml:
##########
@@ -207,7 +207,7 @@
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
     <aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
     
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-    
<amazon-s3-analyticsaccelerator-s3.version>1.0.0</amazon-s3-analyticsaccelerator-s3.version>
+    
<amazon-s3-analyticsaccelerator-s3.version>1.2.0</amazon-s3-analyticsaccelerator-s3.version>

Review Comment:
   for the final merge, this should be pulled out to its own.
   
   should we try to get into 3.4.2 now?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
     return bytesRead;
   }
 
+  /**
+   * {@inheritDoc}

Review Comment:
   I'd put that at L148 and cut the params/IOE as superfluous



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
     return bytesRead;
   }
 
+  /**
+   * {@inheritDoc}
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   * @throws IOException IOE if any.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                                        IntFunction<ByteBuffer> allocate) 
throws IOException {
+    readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * {@inheritDoc}

Review Comment:
   same





> S3A Analytics Accelerator: vector IO support
> --------------------------------------------
>
>                 Key: HADOOP-19394
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19394
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.1
>            Reporter: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> Add vector IO support for analytics accelerator stream
> Three stages
> # pull up s3a input stream to work with all ObjectInputStreams; do its own 
> fetching independent of the analytics stream
> # provide info to stream of fetches having taken place (remove from cache, 
> cancel prefetch)
> full integration
> * return a range from cache if present
> * append to the block retrieval callback if a prefetch is in progress
> * only do merge + new request if the range cannot be satisifed entirely from 
> cached data
> Out of scope: handling case where part of a range is in cache/retrieval. Too 
> complicated and so prone to problems. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to