[
https://issues.apache.org/jira/browse/HADOOP-19348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906235#comment-17906235
]
ASF GitHub Bot commented on HADOOP-19348:
-----------------------------------------
ahmarsuhail commented on code in PR #7192:
URL: https://github.com/apache/hadoop/pull/7192#discussion_r1887788758
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -680,8 +704,18 @@ public void initialize(URI name, Configuration
originalConf)
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY,
PREFETCH_BLOCK_DEFAULT_COUNT, 1);
+
+ this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+ this.analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+
this.isMultipartUploadEnabled =
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
- DEFAULT_MULTIPART_UPLOAD_ENABLED);
+ DEFAULT_MULTIPART_UPLOAD_ENABLED);
+
+ if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
+ // Temp change: Analytics Accelerator with S3AsyncClient do not
support Multi-part upload.
Review Comment:
removal of this is also tracked in:
https://issues.apache.org/jira/browse/HADOOP-19368
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
*/
private S3Client s3Client;
+ /**
+ * CRT-Based S3Client created of analytics accelerator library is enabled
+ * and managed by the ClientManager. Analytics accelerator library can be
Review Comment:
nit: Not true as of now, S3AFileSystem is managing the CRT as we are not
creating it in the ClientManager/S3AStoreImpl.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration
originalConf)
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
+
+ if (this.analyticsAcceleratorEnabled) {
+ LOG.info("Using S3SeekableInputStream");
+ if(this.analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3CrtClient");
+ this.s3AsyncClient =
S3CrtAsyncClient.builder().maxConcurrency(600).build();
Review Comment:
We will need to move this S3AStoreImpl, and configure properly as a follow
up. Ticket to track this work:
https://issues.apache.org/jira/browse/HADOOP-19368
##########
hadoop-tools/hadoop-aws/pom.xml:
##########
@@ -525,6 +525,17 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+ <artifactId>analyticsaccelerator-s3</artifactId>
+ <version>0.0.1</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
Review Comment:
Think we discussed the CRT should be provided scope? (not sure though)
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -88,6 +87,12 @@ protected Configuration createConfiguration() {
return conf;
}
+ @Override
+ public void testOverwriteExistingFile() throws Throwable {
+ skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
Review Comment:
We intend to remove this right? Will work with you offline to create a list
of tests we're skipping right now, but won't skip before trunk merge and create
a JIRA for it.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1892,6 +1949,14 @@ private FSDataInputStream executeOpen(
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);
+ if (this.analyticsAcceleratorEnabled) {
+ return new FSDataInputStream(
Review Comment:
input stream creation to be moved to S3AStore, work tracked here:
https://issues.apache.org/jira/browse/HADOOP-19369
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private final String key;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+ public S3ASeekableStream(String bucket, String key,
+ S3SeekableInputStreamFactory
s3SeekableInputStreamFactory) {
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+ this.key = key;
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!isClosed()) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until
end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last
byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ throwIfClosed();
+ return super.available();
+ }
+
+ @Override
+ public void close() throws IOException {
Review Comment:
Looking at existing implementations, they maintain a volatile variable
`private volatile boolean closed;` and then update it on the in a
synchronised block on the close and check it on throwIfClosed(). Should we do
the same?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration
originalConf)
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
+
+ if (this.analyticsAcceleratorEnabled) {
+ LOG.info("Using S3SeekableInputStream");
+ if(this.analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3CrtClient");
+ this.s3AsyncClient =
S3CrtAsyncClient.builder().maxConcurrency(600).build();
+ } else {
+ LOG.info("Using S3Client");
Review Comment:
Same here, "Using S3 async client for of analytics accelerator S3"
We will eventually either get rid of these/move them to debug or have them
at log only once. Can decide when we update client creation code.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -809,6 +843,27 @@ public void initialize(URI name, Configuration
originalConf)
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
+
+ if (this.analyticsAcceleratorEnabled) {
+ LOG.info("Using S3SeekableInputStream");
+ if(this.analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3CrtClient");
Review Comment:
can improve logging messages.
"Using S3 CRT client of analytics accelerator S3"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -88,6 +87,12 @@ protected Configuration createConfiguration() {
return conf;
}
+ @Override
+ public void testOverwriteExistingFile() throws Throwable {
+ skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
Review Comment:
Would be great if we could add some temporary comments wherever we skip.
Explain the reason for skipping and if this skip will be removed or not in the
future.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java:
##########
@@ -203,6 +204,9 @@ private void abortActiveStream() throws IOException {
@Test
public void testCostOfCreatingMagicFile() throws Throwable {
describe("Files created under magic paths skip existence checks and marker
deletes");
+
+ // Assertions will fail as {@link S3ASeekableInputStream} do not support
InputStreamStatistics yes
Review Comment:
nit: typo yet*
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java:
##########
@@ -574,6 +574,20 @@ public static boolean isS3ExpressTestBucket(final
Configuration conf) {
return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
}
+ /**
+ * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+ * @param configuration configuration to probe
+ */
+ public static void skipIfAnalyticsAcceleratorEnabled(
Review Comment:
actually you could add a reason field here, and then wherever you skip, pass
in the reason.
> S3A: Add initial support for analytics-accelerator-s3
> -----------------------------------------------------
>
> Key: HADOOP-19348
> URL: https://issues.apache.org/jira/browse/HADOOP-19348
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.2
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3 recently released [Analytics Accelerator Library for Amazon
> S3|https://github.com/awslabs/analytics-accelerator-s3] as an Alpha release,
> which is an input stream, with an initial goal of improving performance for
> Apache Spark workloads on Parquet datasets.
> For example, it implements optimisations such as footer prefetching, and so
> avoids the multiple GETS S3AInputStream currently makes for the footer bytes
> and PageIndex structures.
> The library also tracks columns currently being read by a query using the
> parquet metadata, and then prefetches these bytes when parquet files with the
> same schema are opened.
> This ticket tracks the work required for the basic initial integration. There
> is still more work to be done, such as VectoredIO support etc, which we will
> identify and follow up with.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]