[
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904960#comment-17904960
]
ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------
mukund-thakur commented on code in PR #7192:
URL: https://github.com/apache/hadoop/pull/7192#discussion_r1881107834
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
Review Comment:
How about a contract test to use the new stream for all types of read.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private final String key;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+ public S3ASeekableStream(String bucket, String key,
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+ this.key = key;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ return inputStream.read();
Review Comment:
We need to deal with all the Exception handling in read() and seek() calls
as done in S3AInputStream.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -1760,4 +1760,22 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+ /**
+ * Config to enable Analytics Accelerator Library for Amazon S3
+ * https://github.com/awslabs/analytics-accelerator-s3
+ */
+ public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
"fs.s3a.analytics.accelerator.enabled";
Review Comment:
use the prefix ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX here.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+ final String PHYSICAL_IO_PREFIX = "physicalio";
+ final String LOGICAL_IO_PREFIX = "logicalio";
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+ String testFile =
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+ S3AFileSystem s3AFileSystem = (S3AFileSystem)
FileSystem.newInstance(new Path(testFile).toUri(), conf);
+ byte[] buffer = new byte[500];
+
+ try (FSDataInputStream inputStream = s3AFileSystem.open(new
Path(testFile))) {
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ }
+
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ ConnectorConfiguration connectorConfiguration = new
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(),
PrefetchMode.ALL);
Review Comment:
use assertj
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+ final String PHYSICAL_IO_PREFIX = "physicalio";
+ final String LOGICAL_IO_PREFIX = "logicalio";
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+ String testFile =
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+ S3AFileSystem s3AFileSystem = (S3AFileSystem)
FileSystem.newInstance(new Path(testFile).toUri(), conf);
+ byte[] buffer = new byte[500];
+
+ try (FSDataInputStream inputStream = s3AFileSystem.open(new
Path(testFile))) {
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ }
+
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ ConnectorConfiguration connectorConfiguration = new
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
assertEquals(configuration.getLogicalIOConfiguration().getPrefetchingMode(),
PrefetchMode.ALL);
+ assert
configuration.getPhysicalIOConfiguration().getBlobStoreCapacity() == 1;
+ }
+
+ @Test
+ public void testInvalidConfigurationThrows() {
+ describe("Verify S3 connector framework throws with invalid
configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+ //Disable Sequential Prefetching
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+ ConnectorConfiguration connectorConfiguration = new
ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ assertThrows(IllegalArgumentException.class, () ->
Review Comment:
use assertJ with a message,
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+ final String PHYSICAL_IO_PREFIX = "physicalio";
+ final String LOGICAL_IO_PREFIX = "logicalio";
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+
+ String testFile =
"s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
Review Comment:
there is a config fs.s3a.scale.test.csvfile for this file. use that.
> S3A: Add support for analytics-accelerator-s3
> ---------------------------------------------
>
> Key: HADOOP-19348
> URL: https://issues.apache.org/jira/browse/HADOOP-19348
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.2
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release,
> which is an input stream, with an initial goal of improving performance for
> Apache Spark workloads on Parquet datasets.
> For example, it implements optimisations such as footer prefetching, and so
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the
> parquet metadata, and then prefetches these bytes when parquet files with the
> same schema are opened.
> This ticket tracks the work required for the basic initial integration. There
> is still more work to be done, such as VectoredIO support etc, which we will
> identify and follow up with.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]