fuatbasik commented on code in PR #7295:
URL: https://github.com/apache/hadoop/pull/7295#discussion_r1920296183
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -230,7 +232,23 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {
- objectInputStreamFactory = createStreamFactory(conf);
+ if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) {
Review Comment:
are we still doing this or using the new StreamKind? See here:
`
Adds a new config, fs.s3a.input.stream.type. This can be set to classic,
prefetch, analytics. Believe this is better than having
multipleprefetch.enabled and analytics.enabled flags.
`
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/S3SeekableInputStreamFactory.java:
##########
@@ -0,0 +1,54 @@
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ASeekableStream;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+public class S3SeekableInputStreamFactory extends
AbstractObjectInputStreamFactory {
Review Comment:
What about renaming this to `S3ASeekableInputStreamFactory`. This is inline
with the S3ASeekableInputStream name and also we can get rid of full-path
reference in the below lines
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java:
##########
@@ -230,7 +232,23 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {
- objectInputStreamFactory = createStreamFactory(conf);
+ if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) {
+ boolean analyticsAcceleratorCRTEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
+ ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+ final S3AsyncClient s3AsyncClient;
+ LOG.info("Using S3SeekableInputStream");
+ if(analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3 CRT client for analytics accelerator S3");
+ s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
Review Comment:
similar to other shall we move this to a method getOrCreateAsyncCRTClient?
or maybe even change the existing method to make a decision to use CRT or not?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/S3SeekableInputStreamFactory.java:
##########
@@ -0,0 +1,54 @@
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3ASeekableStream;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+public class S3SeekableInputStreamFactory extends
AbstractObjectInputStreamFactory {
+
+ private final S3AsyncClient s3AsyncClient;
+ private
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory
s3SeekableInputStreamFactory;
+
+ public S3SeekableInputStreamFactory(S3AsyncClient s3AsyncClient) {
+ super("S3SeekableInputStreamFactory");
+ this.s3AsyncClient = s3AsyncClient;
+ }
+
+ @Override
+ protected void serviceInit(final Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
+
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+ this.s3SeekableInputStreamFactory =
+ new
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory(
+ new S3SdkObjectClient(this.s3AsyncClient),
+ seekableInputStreamConfiguration);
+ }
+
+ @Override
+ public ObjectInputStream readObject(final ObjectReadParameters parameters)
throws IOException {
+ return new S3ASeekableStream(
Review Comment:
i wonder if shall we rename this class to S3ASeekableInputStream since it
now implements ObjectInputStream
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]