[ 
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906235#comment-17906235
 ] 

ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------

ahmarsuhail commented on code in PR #7192:
URL: https://github.com/apache/hadoop/pull/7192#discussion_r1887788758


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -680,8 +704,18 @@ public void initialize(URI name, Configuration 
originalConf)
       this.prefetchBlockSize = (int) prefetchBlockSizeLong;
       this.prefetchBlockCount =
           intOption(conf, PREFETCH_BLOCK_COUNT_KEY, 
PREFETCH_BLOCK_DEFAULT_COUNT, 1);
+
+      this.analyticsAcceleratorEnabled = 
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, 
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+      this.analyticsAcceleratorCRTEnabled = 
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, 
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+
       this.isMultipartUploadEnabled = 
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
-          DEFAULT_MULTIPART_UPLOAD_ENABLED);
+              DEFAULT_MULTIPART_UPLOAD_ENABLED);
+
+      if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
+        // Temp change: Analytics Accelerator with S3AsyncClient do not 
support Multi-part upload.

Review Comment:
   removal of this is also tracked in: 
https://issues.apache.org/jira/browse/HADOOP-19368



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private S3Client s3Client;
 
+  /**
+   * CRT-Based S3Client created of analytics accelerator library is enabled
+   * and managed by the ClientManager. Analytics accelerator library can be

Review Comment:
   nit: Not true as of now, S3AFileSystem is managing the CRT as we are not 
creating it in the ClientManager/S3AStoreImpl.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration 
originalConf)
       // directly through the client manager.
       // this is to aid mocking.
       s3Client = store.getOrCreateS3Client();
+
+      if (this.analyticsAcceleratorEnabled) {
+        LOG.info("Using S3SeekableInputStream");
+        if(this.analyticsAcceleratorCRTEnabled) {
+          LOG.info("Using S3CrtClient");
+          this.s3AsyncClient = 
S3CrtAsyncClient.builder().maxConcurrency(600).build();

Review Comment:
   We will need to move this S3AStoreImpl, and configure properly as a follow 
up. Ticket to track this work: 
https://issues.apache.org/jira/browse/HADOOP-19368



##########
hadoop-tools/hadoop-aws/pom.xml:
##########
@@ -525,6 +525,17 @@
       <artifactId>amazon-s3-encryption-client-java</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+      <artifactId>analyticsaccelerator-s3</artifactId>
+      <version>0.0.1</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>

Review Comment:
   Think we discussed the CRT should be provided scope? (not sure though)



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -88,6 +87,12 @@ protected Configuration createConfiguration() {
     return conf;
   }
 
+  @Override
+  public void testOverwriteExistingFile() throws Throwable {
+    skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());

Review Comment:
   We intend to remove this right? Will work with you offline to create a list 
of tests we're skipping right now, but won't skip before trunk merge and create 
a JIRA for it.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1892,6 +1949,14 @@ private FSDataInputStream executeOpen(
     fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
 
+    if (this.analyticsAcceleratorEnabled) {
+      return new FSDataInputStream(

Review Comment:
   input stream creation to be moved to S3AStore, work tracked here: 
https://issues.apache.org/jira/browse/HADOOP-19369



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream implements 
StreamCapabilities {
+
+  private S3SeekableInputStream inputStream;
+  private long lastReadCurrentPos = 0;
+  private final String key;
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+  public S3ASeekableStream(String bucket, String key,
+                           S3SeekableInputStreamFactory 
s3SeekableInputStreamFactory) {
+    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+    this.key = key;
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this 
stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, 
false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return false;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read();
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throwIfClosed();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+          + " " + pos);
+    }
+    inputStream.seek(pos);
+  }
+
+
+  @Override
+  public synchronized long getPos() {
+    if (!isClosed()) {
+      lastReadCurrentPos = inputStream.getPos();
+    }
+    return lastReadCurrentPos;
+  }
+
+
+  /**
+   * Reads the last n bytes from the stream into a byte buffer. Blocks until 
end of stream is
+   * reached. Leaves the position of the stream unaltered.
+   *
+   * @param buf buffer to read data into
+   * @param off start position in buffer at which data is written
+   * @param len the number of bytes to read; the n-th byte should be the last 
byte of the stream.
+   * @return the total number of bytes read into the buffer
+   * @throws IOException if an I/O error occurs
+   */
+  public int readTail(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.readTail(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    throwIfClosed();
+    return super.available();
+  }
+
+  @Override
+  public void close() throws IOException {

Review Comment:
   Looking at existing implementations, they maintain a volatile variable
   
   `private volatile boolean closed;` and then update it on the in a 
synchronised block on the close and check it on throwIfClosed(). Should we do 
the same?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration 
originalConf)
       // directly through the client manager.
       // this is to aid mocking.
       s3Client = store.getOrCreateS3Client();
+
+      if (this.analyticsAcceleratorEnabled) {
+        LOG.info("Using S3SeekableInputStream");
+        if(this.analyticsAcceleratorCRTEnabled) {
+          LOG.info("Using S3CrtClient");
+          this.s3AsyncClient = 
S3CrtAsyncClient.builder().maxConcurrency(600).build();
+        } else {
+          LOG.info("Using S3Client");

Review Comment:
   Same here, "Using S3 async client for of analytics accelerator S3"
   
   We will eventually either get rid of these/move them to debug or have them 
at log only once. Can decide when we update client creation code.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration 
originalConf)
       // directly through the client manager.
       // this is to aid mocking.
       s3Client = store.getOrCreateS3Client();
+
+      if (this.analyticsAcceleratorEnabled) {
+        LOG.info("Using S3SeekableInputStream");
+        if(this.analyticsAcceleratorCRTEnabled) {
+          LOG.info("Using S3CrtClient");

Review Comment:
   can improve logging messages.
   
   "Using S3 CRT client of analytics accelerator S3"



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -88,6 +87,12 @@ protected Configuration createConfiguration() {
     return conf;
   }
 
+  @Override
+  public void testOverwriteExistingFile() throws Throwable {
+    skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());

Review Comment:
   Would be great if we could add some temporary comments wherever we skip.  
Explain the reason for skipping and if this skip will be removed or not in the 
future.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java:
##########
@@ -203,6 +204,9 @@ private void abortActiveStream() throws IOException {
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
     describe("Files created under magic paths skip existence checks and marker 
deletes");
+
+    // Assertions will fail as {@link S3ASeekableInputStream} do not support 
InputStreamStatistics yes

Review Comment:
   nit: typo yet*



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -574,6 +574,20 @@ public static boolean isS3ExpressTestBucket(final 
Configuration conf) {
     return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
   }
 
+  /**
+   * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+   * @param configuration configuration to probe
+   */
+  public static void skipIfAnalyticsAcceleratorEnabled(

Review Comment:
   actually you could add a reason field here, and then wherever you skip, pass 
in the reason. 





> S3A: Add initial support for analytics-accelerator-s3
> -----------------------------------------------------
>
>                 Key: HADOOP-19348
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19348
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon 
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release, 
> which is an input stream, with an initial goal of improving performance for 
> Apache Spark workloads on Parquet datasets. 
> For example, it implements optimisations such as footer prefetching, and so 
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes 
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the 
> parquet metadata, and then prefetches these bytes when parquet files with the 
> same schema are opened. 
> This ticket tracks the work required for the basic initial integration. There 
> is still more work to be done, such as VectoredIO support etc, which we will 
> identify and follow up with. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to