This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new cc14236c9d9 HADOOP-19348. Integrate analytics accelerator into S3A. (#7433) cc14236c9d9 is described below commit cc14236c9d96ffaea76b8046423f1415b71b8ea9 Author: ahmarsuhail <ahma...@amazon.co.uk> AuthorDate: Fri Feb 28 10:04:24 2025 +0000 HADOOP-19348. Integrate analytics accelerator into S3A. (#7433) Initial support for Analytics Accelerator Library, which provides a new parquet aware input stream. Contributed by: Ahmar Suhail Reviewed by: Steve Loughran --- .../hadoop/fs/statistics/StreamStatisticNames.java | 7 + .../AbstractContractMultipartUploaderTest.java | 2 +- hadoop-project/pom.xml | 6 + hadoop-tools/hadoop-aws/pom.xml | 5 + .../java/org/apache/hadoop/fs/s3a/Constants.java | 9 + .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 3 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 11 + .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 19 +- .../org/apache/hadoop/fs/s3a/S3ClientFactory.java | 24 +++ .../java/org/apache/hadoop/fs/s3a/Statistic.java | 4 + .../streams/AbstractObjectInputStreamFactory.java | 4 +- .../fs/s3a/impl/streams/AnalyticsStream.java | 238 +++++++++++++++++++++ .../s3a/impl/streams/AnalyticsStreamFactory.java | 109 ++++++++++ .../fs/s3a/impl/streams/InputStreamType.java | 6 +- .../fs/s3a/impl/streams/StreamIntegration.java | 3 +- .../s3a/statistics/S3AInputStreamStatistics.java | 8 + .../statistics/impl/EmptyS3AStatisticsContext.java | 6 + ...TestS3AContractAnalyticsStreamVectoredRead.java | 55 +++++ .../fs/contract/s3a/ITestS3AContractCreate.java | 12 ++ .../fs/contract/s3a/ITestS3AContractDistCp.java | 15 ++ .../s3a/ITestS3AContractMultipartUploader.java | 7 + .../contract/s3a/ITestS3AContractVectoredRead.java | 12 ++ .../apache/hadoop/fs/s3a/AbstractS3AMockTest.java | 3 +- .../ITestS3AAnalyticsAcceleratorStreamReading.java | 225 +++++++++++++++++++ .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 4 + .../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 23 ++ .../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 13 ++ .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java | 6 + .../hadoop/fs/s3a/ITestS3AInputStreamLeakage.java | 6 + .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java | 6 + .../hadoop/fs/s3a/ITestS3ARequesterPays.java | 1 + .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 28 +++ .../org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java | 4 + .../fs/s3a/commit/ITestCommitOperationCost.java | 12 +- .../fileContext/ITestS3AFileContextStatistics.java | 6 + .../fs/s3a/performance/ITestS3AOpenCost.java | 6 +- .../fs/s3a/performance/ITestUnbufferDraining.java | 1 - .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java | 3 + .../ITestS3AContractStreamIOStatistics.java | 9 + .../statistics/ITestS3AFileSystemStatistic.java | 6 + .../src/test/resources/malformed_footer.parquet | Bin 0 -> 451 bytes .../src/test/resources/multi_row_group.parquet | Bin 0 -> 2080 bytes 42 files changed, 913 insertions(+), 14 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 57c77572138..ab4838618da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -97,6 +97,13 @@ public final class StreamStatisticNames { */ public static final String STREAM_READ_OPENED = "stream_read_opened"; + /** + * Total count of times an analytics input stream was opened. + * + * Value: {@value}. + */ + public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened"; + /** * Count of exceptions raised during input stream reads. * Value: {@value}. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java index 16482915bdf..4c4514b249c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -507,7 +507,7 @@ public void testMultipartUpload() throws Exception { @Test public void testMultipartUploadEmptyPart() throws Exception { FileSystem fs = getFileSystem(); - Path file = path("testMultipartUpload"); + Path file = path("testMultipartUploadEmptyPart"); try (MultipartUploader uploader = fs.createMultipartUploader(file).build()) { UploadHandle uploadHandle = uploader.startUpload(file).get(); diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index df0d9bccf08..f6dc71288ed 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -206,6 +206,7 @@ <aws-java-sdk.version>1.12.720</aws-java-sdk.version> <aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version> <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version> + <amazon-s3-analyticsaccelerator-s3.version>0.0.4</amazon-s3-analyticsaccelerator-s3.version> <aws.eventstream.version>1.0.1</aws.eventstream.version> <hsqldb.version>2.7.1</hsqldb.version> <frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version> @@ -1113,6 +1114,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <version>${amazon-s3-analyticsaccelerator-s3.version}</version> + </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 4c36d5b4653..f10eea7a4d3 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -484,6 +484,11 @@ <artifactId>amazon-s3-encryption-client-java</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 6796c29d348..2b019e1fe4c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1827,4 +1827,13 @@ private Constants() { * Value: {@value}. */ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; + + + /** + * Prefix to configure Analytics Accelerator Library. + * Value: {@value}. + */ + public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = + "fs.s3a.analytics.accelerator"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 556bd351075..7b686130ce3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -166,7 +166,8 @@ public S3AsyncClient createS3AsyncClient( .httpClientBuilder(httpClientBuilder); // multipart upload pending with HADOOP-19326. - if (!parameters.isClientSideEncryptionEnabled()) { + if (!parameters.isClientSideEncryptionEnabled() && + !parameters.isAnalyticsAcceleratorEnabled()) { s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration) .multipartEnabled(parameters.isMultipartCopy()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index b3863f23f07..2e4475063df 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -147,9 +147,11 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.s3a.impl.CSEUtils; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks; import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements; +import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean isCSEEnabled; + /** + * Is this S3A FS instance using analytics accelerator? + */ + private boolean isAnalyticsAcceleratorEnabled; + /** * Bucket AccessPoint. */ @@ -629,6 +636,9 @@ public void initialize(URI name, Configuration originalConf) // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled. isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod()); + isAnalyticsAcceleratorEnabled = StreamIntegration.determineInputStreamType(conf) + .equals(InputStreamType.Analytics); + // Create the appropriate fsHandler instance using a factory method fsHandler = createFileSystemHandler(); fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics()); @@ -1156,6 +1166,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)) .withClientSideEncryptionEnabled(isCSEEnabled) .withClientSideEncryptionMaterials(cseMaterials) + .withAnalyticsAcceleratorEnabled(isAnalyticsAcceleratorEnabled) .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION)); // this is where clients and the transfer manager are created on demand. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b84f19fcd87..1d26eb62750 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.impl.WeakRefMetricsSource; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -840,6 +841,7 @@ private final class InputStreamStatistics private final AtomicLong closed; private final AtomicLong forwardSeekOperations; private final AtomicLong openOperations; + private final AtomicLong analyticsStreamOpenOperations; private final AtomicLong readExceptions; private final AtomicLong readsIncomplete; private final AtomicLong readOperations; @@ -888,7 +890,8 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES, - StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE) + StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(), @@ -927,6 +930,9 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); openOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_OPENED); + analyticsStreamOpenOperations = st.getCounterReference( + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED + ); readExceptions = st.getCounterReference( StreamStatisticNames.STREAM_READ_EXCEPTIONS); readsIncomplete = st.getCounterReference( @@ -1030,6 +1036,17 @@ public long streamOpened() { return openOperations.getAndIncrement(); } + @Override + public long streamOpened(InputStreamType type) { + long count = openOperations.getAndIncrement(); + + if (type == InputStreamType.Analytics) { + count = analyticsStreamOpenOperations.getAndIncrement(); + } + + return count; + } + /** * {@inheritDoc}. * If the connection was aborted, increment {@link #aborted} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index cda09b622ea..559cd49c345 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -202,6 +202,11 @@ final class S3ClientCreationParameters { */ private boolean fipsEnabled; + /** + * Is analytics accelerator enabled? + */ + private boolean isAnalyticsAcceleratorEnabled; + /** * List of execution interceptors to include in the chain * of interceptors in the SDK. @@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean return this; } + /** + * Set the analytics accelerator enabled flag. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) { + this.isAnalyticsAcceleratorEnabled = value; + return this; + } + /** * Set the KMS client region. * This is required for CSE-KMS @@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() { return this.isCSEEnabled; } + /** + * Get the analytics accelerator enabled flag. + * @return analytics accelerator enabled flag. + */ + public boolean isAnalyticsAcceleratorEnabled() { + return this.isAnalyticsAcceleratorEnabled; + } + /** * Set the client side encryption materials. * diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 07133a48e8e..ffd3f5e1155 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -322,6 +322,10 @@ public enum Statistic { TYPE_COUNTER), /* Stream Reads */ + STREAM_READ_ANALYTICS_OPENED( + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, + "Total count of times an analytics input stream to object store data was opened", + TYPE_COUNTER), STREAM_READ_BYTES( StreamStatisticNames.STREAM_READ_BYTES, "Bytes read from an input stream in read() calls", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java index 09dfa27ff4b..205a4aaa858 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.s3a.impl.streams; +import java.io.IOException; + import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.service.AbstractService; @@ -58,7 +60,7 @@ protected AbstractObjectInputStreamFactory(final String name) { * @param factoryBindingParameters parameters for the factory binding */ @Override - public void bind(final FactoryBindingParameters factoryBindingParameters) { + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Input Stream factory %s is in wrong state: %s", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java new file mode 100644 index 00000000000..6b910c65380 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import java.io.EOFException; +import java.io.IOException; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.InputPolicy; +import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AInputPolicy; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; + +/** + * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports + * parquet specific optimisations such as parquet-aware prefetching. For more details, see + * https://github.com/awslabs/analytics-accelerator-s3. + */ +public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities { + + private S3SeekableInputStream inputStream; + private long lastReadCurrentPos = 0; + private volatile boolean closed; + + public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class); + + public AnalyticsStream(final ObjectReadParameters parameters, + final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { + super(InputStreamType.Analytics, parameters); + S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), + s3Attributes.getKey()), buildOpenStreamInformation(parameters)); + getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); + } + + @Override + public int read() throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public void seek(long pos) throws IOException { + throwIfClosed(); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + + " " + pos); + } + inputStream.seek(pos); + } + + + @Override + public synchronized long getPos() { + if (!closed) { + lastReadCurrentPos = inputStream.getPos(); + } + return lastReadCurrentPos; + } + + + /** + * Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is + * reached. Leaves the position of the stream unaltered. + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len the number of bytes to read; the n-th byte should be the last byte of the stream. + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int readTail(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.readTail(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + throwIfClosed(); + return super.available(); + } + + @Override + protected boolean isStreamOpen() { + return !isClosed(); + } + + protected boolean isClosed() { + return inputStream == null; + } + + @Override + protected void abortInFinalizer() { + try { + close(); + } catch (IOException ignored) { + + } + } + + @Override + public synchronized void close() throws IOException { + if(!closed) { + closed = true; + try { + inputStream.close(); + inputStream = null; + super.close(); + } catch (IOException ioe) { + LOG.debug("Failure closing stream {}: ", getKey()); + throw ioe; + } + } + } + + /** + * Close the stream on read failure. + * No attempt to recover from failure + * + * @param ioe exception caught. + */ + @Retries.OnceTranslated + private void onReadFailure(IOException ioe) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } else { + LOG.info("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } + this.close(); + } + + private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) { + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = + OpenStreamInformation.builder() + .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() + .getInputPolicy())); + + if (parameters.getObjectAttributes().getETag() != null) { + openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder() + .contentLength(parameters.getObjectAttributes().getLen()) + .etag(parameters.getObjectAttributes().getETag()).build()); + } + + return openStreamInformationBuilder.build(); + } + + /** + * If S3A's input policy is Sequential, that is, if the file format to be read is sequential + * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific + * optimisations will be turned off, regardless of the file extension. This is to allow for + * applications like DISTCP that read parquet files, but will read them whole, and so do not + * follow the typical parquet read patterns of reading footer first etc. and will not benefit + * from parquet optimisations. + * Else, AAL will make a decision on which optimisations based on the file extension, + * if the file ends in .par or .parquet, then parquet specific optimisations are used. + * + * @param inputPolicy S3A's input file policy passed down when opening the file + * @return the AAL read policy + */ + private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) { + switch (inputPolicy) { + case Sequential: + return InputPolicy.Sequential; + default: + return InputPolicy.None; + } + } + + protected void throwIfClosed() throws IOException { + if (closed) { + throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java new file mode 100644 index 00000000000..e03a38ad2c0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import java.io.IOException; + +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.VectoredIOContext; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.LazyAutoCloseableReference; + +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; +import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext; + +/** + * A factory for {@link AnalyticsStream}. This class is instantiated during initialization of + * {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics. + */ +public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { + + private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; + private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory; + private boolean requireCrt; + + public AnalyticsStreamFactory() { + super("AnalyticsStreamFactory"); + } + + @Override + protected void serviceInit(final Configuration conf) throws Exception { + super.serviceInit(conf); + ConnectorConfiguration configuration = new ConnectorConfiguration(conf, + ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + this.seekableInputStreamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(configuration); + this.requireCrt = false; + } + + @Override + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { + super.bind(factoryBindingParameters); + this.s3SeekableInputStreamFactory = + new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory()); + } + + @Override + public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException { + return new AnalyticsStream( + parameters, + getOrCreateS3SeekableInputStreamFactory()); + } + + @Override + public InputStreamType streamType() { + return InputStreamType.Analytics; + } + + /** + * Calculate Return StreamFactoryRequirements. + * @return a positive thread count. + */ + @Override + public StreamFactoryRequirements factoryRequirements() { + // fill in the vector context + final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig()); + // and then disable range merging. + // this ensures that no reads are made for data which is then discarded... + // so the prefetch and block read code doesn't ever do wasteful fetches. + vectorContext.setMinSeekForVectoredReads(0); + + return new StreamFactoryRequirements(0, + 0, vectorContext, + StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests); + } + + private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory() + throws IOException { + return s3SeekableInputStreamFactory.eval(); + } + + private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() { + return () -> new S3SeekableInputStreamFactory( + new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), + seekableInputStreamConfiguration); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java index 1775fa5f05c..a9c33a60fc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java @@ -45,13 +45,11 @@ public enum InputStreamType { */ Prefetch(StreamIntegration.PREFETCH, 2, c -> new PrefetchingInputStreamFactory()), - /** * The analytics input stream. */ - Analytics(StreamIntegration.ANALYTICS, 3, c -> { - throw new IllegalArgumentException("not yet supported"); - }), + Analytics(StreamIntegration.ANALYTICS, 3, c -> + new AnalyticsStreamFactory()), /** * The a custom input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java index 63f7053e0e4..bb35a0580a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java @@ -113,7 +113,6 @@ private StreamIntegration() { * @throws RuntimeException any binding/loading/instantiation problem */ public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) { - // Construct the factory. return determineInputStreamType(conf) .factory() @@ -135,7 +134,7 @@ public static ObjectInputStreamFactory factoryFromConfig(final Configuration con * @param conf configuration * @return a stream factory. */ - static InputStreamType determineInputStreamType(final Configuration conf) { + public static InputStreamType determineInputStreamType(final Configuration conf) { // work out the default stream; this includes looking at the // deprecated prefetch enabled key to see if it is set. if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 1e28a27e708..7ad7cf75367 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.statistics; import org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.statistics.DurationTracker; /** @@ -53,6 +54,13 @@ public interface S3AInputStreamStatistics extends AutoCloseable, */ long streamOpened(); + /** + * A stream of the given type was opened. + * @param type type of input stream + * @return the previous count or zero if this is the first opening. + */ + long streamOpened(InputStreamType type); + /** * The inner stream was closed. * @param abortedConnection flag to indicate the stream was aborted, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index e47656efd18..a5d20095ba5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -22,6 +22,7 @@ import java.time.Duration; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -164,6 +165,11 @@ public long streamOpened() { return 0; } + @Override + public long streamOpened(InputStreamType type) { + return 0; + } + @Override public void streamClose(final boolean abortedConnection, final long remainingInCurrentRequest) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java new file mode 100644 index 00000000000..a2b053783bc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; + +/** + * S3A contract tests for vectored reads with the Analytics stream. The analytics stream does + * not explicitly implement the vectoredRead() method, or currently do and vectored-read specific + * optimisations (such as range coalescing). However, this test ensures that the base implementation + * of readVectored {@link org.apache.hadoop.fs.PositionedReadable} still works. + */ +public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest { + + public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) { + super(bufferType); + } + + /** + * Create a configuration. + * @return a configuration + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + enableAnalyticsAccelerator(conf); + conf.set("fs.contract.vector-io-early-eof-check", "false"); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java index 033c2d94c7b..f346064d4d2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java @@ -33,6 +33,7 @@ import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; /** @@ -93,6 +94,17 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void testOverwriteExistingFile() throws Throwable { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverwriteExistingFile(); + } + @Override public void testOverwriteNonEmptyDirectory() throws Throwable { try { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index e761e0d14bf..f91479abe86 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageStatistics; @@ -78,6 +79,20 @@ public void testNonDirectWrite() throws Exception { getRenameOperationCount() - renames); } + @Override + public void testDistCpUpdateCheckFileSkip() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata and data, and when a + // file is overwritten, the old data continues to be used, until it is removed from the + // cache over time. This will be fixed in + // https://github.com/awslabs/analytics-accelerator-s3/issues/218. + // In this test case, the remote file is created, read, then deleted, and then created again + // with different contents, and read again, which leads to assertions failing. + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator Library does not support update to existing files"); + super.testDistCpUpdateCheckFileSkip(); + } + private long getRenameOperationCount() { return getFileSystem().getStorageStatistics() .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java index 5a92066e06d..9a945ad0ee7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; /** @@ -127,6 +128,12 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc @Override public void testConcurrentUploads() throws Throwable { assumeNotS3ExpressFileSystem(getFileSystem()); + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata and data, and when a file + // is overwritten, the old data continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator currently does not support reading of over written files"); super.testConcurrentUploads(); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index fbb6d5a04d2..bf489fc44a5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.io.Sizes.S_1M; @@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + /** + * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads. + * @throws Exception + */ + @Override + public void setup() throws Exception { + super.setup(); + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator does not support vectored reads"); + } + /** * Verify response to a vector read request which is beyond the * real length of the file. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java index e76b3046048..940e23026af 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java @@ -54,10 +54,11 @@ public abstract class AbstractS3AMockTest { protected S3AFileSystem fs; protected S3Client s3; + protected Configuration conf; @Before public void setup() throws Exception { - Configuration conf = createConfiguration(); + conf = createConfiguration(); fs = new S3AFileSystem(); URI uri = URI.create(FS_S3A + "://" + BUCKET); // unset S3CSE property from config to avoid pathIOE. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java new file mode 100644 index 00000000000..0ed0457e757 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; + +import org.junit.Before; +import org.junit.Test; +import org.assertj.core.api.Assertions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; +import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream; +import org.apache.hadoop.fs.statistics.IOStatistics; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Tests integration of the + * <a href="https://github.com/awslabs/analytics-accelerator-s3">analytics accelerator library</a> + * + * Certain tests in this class rely on reading local parquet files stored in resources. + * These files are copied from local to S3 and then read via the analytics stream. + * This is done to ensure AAL can read the parquet format, and handles exceptions from malformed + * parquet files. + * + */ +public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase { + + private static final String PHYSICAL_IO_PREFIX = "physicalio"; + private static final String LOGICAL_IO_PREFIX = "logicalio"; + + + private Path externalTestFile; + + @Before + public void setUp() throws Exception { + super.setup(); + externalTestFile = getExternalData(getConfiguration()); + } + + @Override + public Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + enableAnalyticsAccelerator(configuration); + return configuration; + } + + @Test + public void testConnectorFrameWorkIntegration() throws Throwable { + describe("Verify S3 connector framework integration"); + + S3AFileSystem fs = + (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration()); + byte[] buffer = new byte[500]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = + fs.openFile(externalTestFile) + .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .build().get()) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + + final InputStream wrappedStream = inputStream.getWrappedStream(); + ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream; + + Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); + Assertions.assertThat(objectInputStream.getInputPolicy()) + .isEqualTo(S3AInputPolicy.Sequential); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + @Test + public void testMalformedParquetFooter() throws IOException { + describe("Reading a malformed parquet file should not throw an exception"); + + // File with malformed footer take from + // https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet. + // This test ensures AAL does not throw exceptions if footer parsing fails. + // It will only emit a WARN log, "Unable to parse parquet footer for + // test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key." + Path dest = path("malformed_footer.parquet"); + + File file = new File("src/test/resources/malformed_footer.parquet"); + + Path sourcePath = new Path(file.toURI().getPath()); + getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); + + byte[] buffer = new byte[500]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + /** + * This test reads a multi-row group parquet file. Each parquet consists of at least one + * row group, which contains the column data for a subset of rows. A single parquet file + * can contain multiple row groups, this allows for further parallelisation, as each row group + * can be processed independently. + */ + @Test + public void testMultiRowGroupParquet() throws Throwable { + describe("A parquet file is read successfully"); + + Path dest = path("multi_row_group.parquet"); + + File file = new File("src/test/resources/multi_row_group.parquet"); + Path sourcePath = new Path(file.toURI().getPath()); + getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); + + FileStatus fileStatus = getFileSystem().getFileStatus(dest); + + byte[] buffer = new byte[3000]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + try (FSDataInputStream inputStream = getFileSystem().openFile(dest) + .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) + .build().get()) { + ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + @Test + public void testConnectorFrameworkConfigurable() { + describe("Verify S3 connector framework reads configuration"); + + Configuration conf = new Configuration(getConfiguration()); + + //Disable Predictive Prefetching + conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all"); + + //Set Blobstore Capacity + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + + S3SeekableInputStreamConfiguration configuration = + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); + + Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode()) + .as("AnalyticsStream configuration is not set to expected value") + .isSameAs(PrefetchMode.ALL); + + Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity()) + .as("AnalyticsStream configuration is not set to expected value") + .isEqualTo(1); + } + + @Test + public void testInvalidConfigurationThrows() throws Exception { + describe("Verify S3 connector framework throws with invalid configuration"); + + Configuration conf = new Configuration(getConfiguration()); + removeBaseAndBucketOverrides(conf); + //Disable Sequential Prefetching + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + + intercept(IllegalArgumentException.class, + () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index d22de3b06d8..8671d962175 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -38,9 +38,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; + import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -93,6 +95,8 @@ protected Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); assumeEnabled(); // although not a root dir test, this confuses paths enough it shouldn't be run in // parallel with other jobs diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java index 0281c57f5cb..3c405cb7c51 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath; import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; /** * S3A Test suite for the FSMainOperationsBaseTest tests. @@ -78,6 +79,28 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption() throws Exception { } + @Override + public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteOneAndAHalfBlocks(); + } + + @Override + public void testWriteReadAndDeleteTwoBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteTwoBlocks(); + } + @Override public void testOverwrite() throws IOException { boolean createPerformance = isCreatePerformanceEnabled(fSys); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java index 48081457658..02d56795890 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -34,8 +34,10 @@ import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; + import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assume.*; import static org.junit.Assert.*; @@ -160,4 +162,15 @@ public void testOverwrite() throws IOException { } } } + + @Override + public void testOverWriteAndRead() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(fs.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverWriteAndRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 70dc5ee476c..daf5306dc39 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -77,6 +78,11 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + // Analytics accelerator currently does not support IOStatisticsContext, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support IOStatisticsContext"); + } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java index 72c75162c9f..f26a585776a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS; import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; @@ -87,6 +88,11 @@ public void setup() throws Exception { @Test public void testFinalizer() throws Throwable { Path path = methodPath(); + // Analytics accelerator currently does not support stream leak detection. This work is tracked + // in https://issues.apache.org/jira/browse/HADOOP-19451 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support leak detection"); + final S3AFileSystem fs = getFileSystem(); ContractTestUtils.createFile(fs, path, true, DATASET); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 3bfe69c2bca..4ec579ce4f6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -51,6 +52,11 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 049a698d208..eadc398e61a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -59,6 +59,7 @@ protected Configuration createConfiguration() { public void testRequesterPaysOptionSuccess() throws Throwable { describe("Test requester pays enabled case by reading last then first byte"); skipIfClientSideEncryption(); + Configuration conf = this.createConfiguration(); conf.setBoolean(ALLOW_REQUESTER_PAYS, true); // Enable bucket exists check, the first failure point people may encounter diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 14ab2b953d1..132401ce8ff 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -105,6 +105,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet; +import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; @@ -577,6 +578,21 @@ public static boolean isS3ExpressTestBucket(final Configuration conf) { return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), ""); } + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * @param configuration configuration to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + Configuration configuration, String message) { + assume(message, + !isAnalyticsAcceleratorEnabled(configuration)); + } + + public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { + return conf.get(INPUT_STREAM_TYPE, + INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS); + } + /** * Skip a test if the filesystem lacks a required capability. * @param fs filesystem @@ -1804,6 +1820,18 @@ public static Configuration enablePrefetching(Configuration conf) { return conf; } + /** + * Enable analytics stream for S3A S3AFileSystem in tests. + * @param conf Configuration to update + * @return patched config + */ + public static Configuration enableAnalyticsAccelerator(Configuration conf) { + removeBaseAndBucketOverrides(conf, + INPUT_STREAM_TYPE); + conf.setEnum(INPUT_STREAM_TYPE, Analytics); + return conf; + } + /** * Probe for a filesystem having a specific stream type; * this is done through filesystem capabilities. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java index 643db02087b..eb7d5af9300 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java @@ -44,6 +44,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + /** * Uses mocks to check that the {@link ResponseInputStream<GetObjectResponse>} is * closed when {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called. @@ -55,6 +57,8 @@ public class TestS3AUnbuffer extends AbstractS3AMockTest { @Test public void testUnbuffer() throws IOException { // Create mock ObjectMetadata for getFileStatus() + skipIfAnalyticsAcceleratorEnabled(conf, + "Analytics accelerator does not support unbuffer"); Path path = new Path("/file"); HeadObjectResponse objectMetadata = HeadObjectResponse.builder() .contentLength(1L) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 8132b44cdb4..02f0251e8f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -167,7 +168,12 @@ private void abortActiveStream() throws IOException { @Test public void testCostOfCreatingMagicFile() throws Throwable { - describe("Files created under magic paths skip existence checks"); + describe("Files created under magic paths skip existence checks and marker deletes"); + + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -245,6 +251,10 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 1724006a831..5b489c1c39c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,6 +30,8 @@ import org.junit.Assert; import org.junit.Before; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + /** * S3a implementation of FCStatisticsBaseTest. */ @@ -44,6 +46,10 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @Before public void setUp() throws Exception { conf = new Configuration(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(conf, + "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index e94bb156cd2..febc6bb82c4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -57,6 +57,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; @@ -110,6 +111,10 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -388,7 +393,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable { describe("PositionedReadable.read() past the end of the file"); assumeNoPrefetching(); - verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index caf723b95de..e68ea9a0315 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -117,7 +117,6 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // now create a new FS with minimal http capacity and recovery // a separate one is used to avoid test teardown suffering // from the lack of http connections and short timeouts. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java index 6f19ba15c1c..0203b00caab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; /** @@ -54,6 +55,8 @@ public class ITestS3AHugeFilesSSECDiskBlocks public void setup() throws Exception { try { super.setup(); + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); } catch (AccessDeniedException | AWSUnsupportedFeatureException e) { skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method"); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 0f6b69cd54d..da2a39a986e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -78,4 +79,12 @@ public List<String> outputStreamStatisticKeys() { STREAM_WRITE_EXCEPTIONS); } + @Override + public void testInputStreamStatisticRead() throws Throwable { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator currently does not support stream statistics"); + super.testInputStreamStatisticRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 0d5d2a789a0..376dcdf727f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { private static final int ONE_KB = 1024; @@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB]; diff --git a/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet new file mode 100644 index 00000000000..614912f630b Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet differ diff --git a/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet new file mode 100644 index 00000000000..b8d6a95cd44 Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet differ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org