This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b92a66176ed0aa3ff399965a2eaf88b1307ad0ef
Author: Ahmar Suhail <ahma...@amazon.co.uk>
AuthorDate: Wed Feb 12 13:59:14 2025 +0000

    pass down file open options such as read policy, file status to AAL
---
 .../fs/s3a/impl/streams/AnalyticsStream.java       | 38 +++++++++++++++++++++-
 .../fs/s3a/ITestS3AAnalyticsAcceleratorStream.java | 27 +++++++--------
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 9e6f98c5553..e6f085d54f0 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -25,12 +25,16 @@
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
 import software.amazon.s3.analyticsaccelerator.util.S3URI;
 
 /**
@@ -49,7 +53,7 @@ public class AnalyticsStream extends ObjectInputStream 
implements StreamCapabili
   public AnalyticsStream(final ObjectReadParameters parameters, final 
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
     super(InputStreamType.Analytics, parameters);
     S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
-    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), 
s3Attributes.getKey()));
+    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), 
s3Attributes.getKey()), buildOpenFileInformation(parameters));
     getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
   }
 
@@ -197,6 +201,38 @@ private void onReadFailure(IOException ioe) throws 
IOException {
     this.close();
   }
 
+  private OpenFileInformation buildOpenFileInformation(ObjectReadParameters 
parameters) {
+    OpenFileInformation openFileInformation = 
OpenFileInformation.builder().inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
+        .getInputPolicy()))
+        .objectMetadata(ObjectMetadata.builder()
+            .contentLength(parameters.getObjectAttributes().getLen())
+            .etag(parameters.getObjectAttributes().getETag()).build())
+        .build();
+
+    return openFileInformation;
+  }
+
+  /**
+   * If S3A's input policy is Sequential, that is, if the file format to be 
read is sequential
+   * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's 
parquet specific
+   * optimisations will be turned off, regardless of the file extension. This 
is to allow for
+   * applications like DISTCP that read parquet files, but will read them 
whole, and so do not
+   * follow the typical parquet read patterns of reading footer first etc. and 
will not benefit
+   * from parquet optimisations.
+   * Else, AAL will make a decision on which optimisations based on the file 
extension,
+   * if the file ends in .par or .parquet, then parquet specific optimisations 
are used.
+   *
+   * @param inputPolicy S3A's input file policy passed down when opening the 
file
+   * @return the AAL read policy
+   */
+  private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
+    switch (inputPolicy) {
+     case Sequential:
+       return InputPolicy.Sequential;
+     default:
+       return InputPolicy.None;
+    }
+  }
 
   protected void throwIfClosed() throws IOException {
     if (closed) {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java
index 040a141586e..02c766b9941 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java
@@ -21,42 +21,34 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.UUID;
+import java.io.InputStream;
 
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathHandle;
-import org.apache.hadoop.fs.s3a.impl.AwsSdkWorkarounds;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
 import org.apache.hadoop.fs.statistics.IOStatistics;
-import org.apache.hadoop.test.GenericTestUtils;
 
-import static org.apache.commons.io.FileUtils.ONE_KB;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
-import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
-import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
-import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
 
 import org.assertj.core.api.Assertions;
 
-import org.slf4j.LoggerFactory;
+
 import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
 import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
-import 
software.amazon.s3.analyticsaccelerator.io.logical.parquet.ParquetMetadataParsingTask;
 import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
 
 public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
@@ -90,6 +82,9 @@ public Configuration createConfiguration() {
   public void testConnectorFrameWorkIntegration() throws IOException {
     describe("Verify S3 connector framework integration");
 
+    removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+    conf.set(INPUT_FADVISE, "whole-file");
+
     S3AFileSystem fs =
         (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
     byte[] buffer = new byte[500];
@@ -99,7 +94,13 @@ public void testConnectorFrameWorkIntegration() throws 
IOException {
       ioStats = inputStream.getIOStatistics();
       inputStream.seek(5);
       inputStream.read(buffer, 0, 500);
+
+      final InputStream wrappedStream = inputStream.getWrappedStream();
+      ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+      assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+      assertEquals(objectInputStream.getInputPolicy(), 
S3AInputPolicy.Sequential);
     }
+
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to