This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit b92a66176ed0aa3ff399965a2eaf88b1307ad0ef Author: Ahmar Suhail <ahma...@amazon.co.uk> AuthorDate: Wed Feb 12 13:59:14 2025 +0000 pass down file open options such as read policy, file status to AAL --- .../fs/s3a/impl/streams/AnalyticsStream.java | 38 +++++++++++++++++++++- .../fs/s3a/ITestS3AAnalyticsAcceleratorStream.java | 27 +++++++-------- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 9e6f98c5553..e6f085d54f0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -25,12 +25,16 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.InputPolicy; +import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; /** @@ -49,7 +53,7 @@ public class AnalyticsStream extends ObjectInputStream implements StreamCapabili public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { super(InputStreamType.Analytics, parameters); S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); - this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey())); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenFileInformation(parameters)); getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); } @@ -197,6 +201,38 @@ private void onReadFailure(IOException ioe) throws IOException { this.close(); } + private OpenFileInformation buildOpenFileInformation(ObjectReadParameters parameters) { + OpenFileInformation openFileInformation = OpenFileInformation.builder().inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() + .getInputPolicy())) + .objectMetadata(ObjectMetadata.builder() + .contentLength(parameters.getObjectAttributes().getLen()) + .etag(parameters.getObjectAttributes().getETag()).build()) + .build(); + + return openFileInformation; + } + + /** + * If S3A's input policy is Sequential, that is, if the file format to be read is sequential + * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific + * optimisations will be turned off, regardless of the file extension. This is to allow for + * applications like DISTCP that read parquet files, but will read them whole, and so do not + * follow the typical parquet read patterns of reading footer first etc. and will not benefit + * from parquet optimisations. + * Else, AAL will make a decision on which optimisations based on the file extension, + * if the file ends in .par or .parquet, then parquet specific optimisations are used. + * + * @param inputPolicy S3A's input file policy passed down when opening the file + * @return the AAL read policy + */ + private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) { + switch (inputPolicy) { + case Sequential: + return InputPolicy.Sequential; + default: + return InputPolicy.None; + } + } protected void throwIfClosed() throws IOException { if (closed) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java index 040a141586e..02c766b9941 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java @@ -21,42 +21,34 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; -import java.util.UUID; +import java.io.InputStream; import org.junit.Before; import org.junit.Test; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathHandle; -import org.apache.hadoop.fs.s3a.impl.AwsSdkWorkarounds; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; +import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.test.GenericTestUtils; -import static org.apache.commons.io.FileUtils.ONE_KB; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; -import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; -import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; + import org.assertj.core.api.Assertions; -import org.slf4j.LoggerFactory; + import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; -import software.amazon.s3.analyticsaccelerator.io.logical.parquet.ParquetMetadataParsingTask; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase { @@ -90,6 +82,9 @@ public Configuration createConfiguration() { public void testConnectorFrameWorkIntegration() throws IOException { describe("Verify S3 connector framework integration"); + removeBaseAndBucketOverrides(conf, INPUT_FADVISE); + conf.set(INPUT_FADVISE, "whole-file"); + S3AFileSystem fs = (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); byte[] buffer = new byte[500]; @@ -99,7 +94,13 @@ public void testConnectorFrameWorkIntegration() throws IOException { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); inputStream.read(buffer, 0, 500); + + final InputStream wrappedStream = inputStream.getWrappedStream(); + ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream; + assertEquals(objectInputStream.streamType(), InputStreamType.Analytics); + assertEquals(objectInputStream.getInputPolicy(), S3AInputPolicy.Sequential); } + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org