[ 
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904960#comment-17904960
 ] 

ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------

mukund-thakur commented on code in PR #7192:
URL: https://github.com/apache/hadoop/pull/7192#discussion_r1881107834


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {

Review Comment:
   How about a contract test to use the new stream for all types of read. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream {
+
+    private S3SeekableInputStream inputStream;
+    private long lastReadCurrentPos = 0;
+    private final String key;
+
+    public static final Logger LOG = 
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+    public S3ASeekableStream(String bucket, String key, 
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
+        this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+        this.key = key;
+    }
+
+    @Override
+    public int read() throws IOException {
+        throwIfClosed();
+        return inputStream.read();

Review Comment:
   We need to deal with all the Exception handling in read() and seek() calls 
as done in S3AInputStream. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -1760,4 +1760,22 @@ private Constants() {
    * Value: {@value}.
    */
   public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+  /**
+   * Config to enable Analytics Accelerator Library for Amazon S3
+   * https://github.com/awslabs/analytics-accelerator-s3
+   */
+  public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = 
"fs.s3a.analytics.accelerator.enabled";

Review Comment:
   use the prefix ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX here. 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+    final String PHYSICAL_IO_PREFIX = "physicalio";
+    final String LOGICAL_IO_PREFIX = "logicalio";
+
+    @Test
+    public void testConnectorFrameWorkIntegration() throws IOException {
+        describe("Verify S3 connector framework integration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+        conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+        String testFile =  
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+        S3AFileSystem s3AFileSystem  =  (S3AFileSystem) 
FileSystem.newInstance(new Path(testFile).toUri(), conf);
+        byte[] buffer = new byte[500];
+
+        try (FSDataInputStream inputStream = s3AFileSystem.open(new 
Path(testFile))) {
+            inputStream.seek(5);
+            inputStream.read(buffer, 0, 500);
+        }
+
+    }
+
+
+    @Test
+    public void testConnectorFrameworkConfigurable() {
+        describe("Verify S3 connector framework reads configuration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf);
+
+        //Disable Predictive Prefetching
+        conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+        //Set Blobstore Capacity
+        conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+        ConnectorConfiguration connectorConfiguration = new 
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+        S3SeekableInputStreamConfiguration configuration = 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+        
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), 
PrefetchMode.ALL);

Review Comment:
   use assertj



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+    final String PHYSICAL_IO_PREFIX = "physicalio";
+    final String LOGICAL_IO_PREFIX = "logicalio";
+
+    @Test
+    public void testConnectorFrameWorkIntegration() throws IOException {
+        describe("Verify S3 connector framework integration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+        conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+        String testFile =  
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+        S3AFileSystem s3AFileSystem  =  (S3AFileSystem) 
FileSystem.newInstance(new Path(testFile).toUri(), conf);
+        byte[] buffer = new byte[500];
+
+        try (FSDataInputStream inputStream = s3AFileSystem.open(new 
Path(testFile))) {
+            inputStream.seek(5);
+            inputStream.read(buffer, 0, 500);
+        }
+
+    }
+
+
+    @Test
+    public void testConnectorFrameworkConfigurable() {
+        describe("Verify S3 connector framework reads configuration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf);
+
+        //Disable Predictive Prefetching
+        conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+        //Set Blobstore Capacity
+        conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+        ConnectorConfiguration connectorConfiguration = new 
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+        S3SeekableInputStreamConfiguration configuration = 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+        
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(), 
PrefetchMode.ALL);
+        assert 
configuration.getPhysicalIOConfiguration().getBlobStoreCapacity() == 1;
+    }
+
+    @Test
+    public void testInvalidConfigurationThrows() {
+        describe("Verify S3 connector framework throws with invalid 
configuration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf);
+        //Disable Sequential Prefetching
+        conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+        ConnectorConfiguration connectorConfiguration = new 
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+        assertThrows(IllegalArgumentException.class, () ->

Review Comment:
   use assertJ with a message, 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+    final String PHYSICAL_IO_PREFIX = "physicalio";
+    final String LOGICAL_IO_PREFIX = "logicalio";
+
+    @Test
+    public void testConnectorFrameWorkIntegration() throws IOException {
+        describe("Verify S3 connector framework integration");
+
+        Configuration conf = getConfiguration();
+        removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+        conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+        String testFile =  
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";

Review Comment:
   there is a config fs.s3a.scale.test.csvfile for this file. use that. 





> S3A: Add support for analytics-accelerator-s3
> ---------------------------------------------
>
>                 Key: HADOOP-19348
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19348
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.2
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon 
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release, 
> which is an input stream, with an initial goal of improving performance for 
> Apache Spark workloads on Parquet datasets. 
> For example, it implements optimisations such as footer prefetching, and so 
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes 
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the 
> parquet metadata, and then prefetches these bytes when parquet files with the 
> same schema are opened. 
> This ticket tracks the work required for the basic initial integration. There 
> is still more work to be done, such as VectoredIO support etc, which we will 
> identify and follow up with. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to