This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 6e0e5673d3fb79b86f8bc519488b6dca33d1cb7d Author: Ahmar Suhail <ahma...@amazon.co.uk> AuthorDate: Tue Feb 11 16:12:40 2025 +0000 Integrates AAL into S3A --- .../hadoop/fs/statistics/StreamStatisticNames.java | 7 + .../AbstractContractMultipartUploaderTest.java | 2 +- hadoop-project/pom.xml | 6 + hadoop-tools/hadoop-aws/pom.xml | 5 + .../java/org/apache/hadoop/fs/s3a/Constants.java | 8 + .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 2 +- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 10 + .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 18 +- .../org/apache/hadoop/fs/s3a/S3ClientFactory.java | 24 +++ .../streams/AbstractObjectInputStreamFactory.java | 4 +- .../fs/s3a/impl/streams/AnalyticsStream.java | 206 +++++++++++++++++++++ .../s3a/impl/streams/AnalyticsStreamFactory.java | 99 ++++++++++ .../fs/s3a/impl/streams/InputStreamType.java | 6 +- .../fs/s3a/impl/streams/StreamIntegration.java | 3 +- .../s3a/statistics/S3AInputStreamStatistics.java | 8 + .../statistics/impl/EmptyS3AStatisticsContext.java | 6 + .../fs/contract/s3a/ITestS3AContractCreate.java | 16 +- .../fs/contract/s3a/ITestS3AContractDistCp.java | 14 ++ .../s3a/ITestS3AContractMultipartUploader.java | 7 + .../contract/s3a/ITestS3AContractVectoredRead.java | 12 ++ .../fs/s3a/ITestS3AAnalyticsAcceleratorStream.java | 201 ++++++++++++++++++++ .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 6 +- .../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 26 ++- .../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 14 +- .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java | 6 + .../hadoop/fs/s3a/ITestS3AInputStreamLeakage.java | 6 + .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java | 6 + .../hadoop/fs/s3a/ITestS3ARequesterPays.java | 6 + .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 29 +++ .../fs/s3a/commit/ITestCommitOperationCost.java | 12 +- .../fileContext/ITestS3AFileContextStatistics.java | 6 + .../fs/s3a/performance/ITestS3AOpenCost.java | 11 +- .../fs/s3a/performance/ITestUnbufferDraining.java | 1 - .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java | 3 + .../ITestS3AContractStreamIOStatistics.java | 9 + .../statistics/ITestS3AFileSystemStatistic.java | 6 + .../src/test/resources/malformed_footer.parquet | Bin 0 -> 451 bytes .../src/test/resources/multi_row_group.parquet | Bin 0 -> 2080 bytes 38 files changed, 782 insertions(+), 29 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 57c77572138..ab4838618da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -97,6 +97,13 @@ public final class StreamStatisticNames { */ public static final String STREAM_READ_OPENED = "stream_read_opened"; + /** + * Total count of times an analytics input stream was opened. + * + * Value: {@value}. + */ + public static final String STREAM_READ_ANALYTICS_OPENED = "stream_read_analytics_opened"; + /** * Count of exceptions raised during input stream reads. * Value: {@value}. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java index 16482915bdf..4c4514b249c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -507,7 +507,7 @@ public void testMultipartUpload() throws Exception { @Test public void testMultipartUploadEmptyPart() throws Exception { FileSystem fs = getFileSystem(); - Path file = path("testMultipartUpload"); + Path file = path("testMultipartUploadEmptyPart"); try (MultipartUploader uploader = fs.createMultipartUploader(file).build()) { UploadHandle uploadHandle = uploader.startUpload(file).get(); diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 6dac90a5814..8038f538da2 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -206,6 +206,7 @@ <aws-java-sdk.version>1.12.720</aws-java-sdk.version> <aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version> <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version> + <amazon-s3-analyticsaccelerator-s3.version>0.0.3</amazon-s3-analyticsaccelerator-s3.version> <aws.eventstream.version>1.0.1</aws.eventstream.version> <hsqldb.version>2.7.1</hsqldb.version> <frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version> @@ -1113,6 +1114,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <version>${amazon-s3-analyticsaccelerator-s3.version}</version> + </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 4c36d5b4653..f10eea7a4d3 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -484,6 +484,11 @@ <artifactId>amazon-s3-encryption-client-java</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 6796c29d348..b2843d24c8a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1827,4 +1827,12 @@ private Constants() { * Value: {@value}. */ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; + + + /** + * Prefix to configure Analytics Accelerator Library. + */ + public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = + "fs.s3a.analytics.accelerator"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 556bd351075..a19e5f155f0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -166,7 +166,7 @@ public S3AsyncClient createS3AsyncClient( .httpClientBuilder(httpClientBuilder); // multipart upload pending with HADOOP-19326. - if (!parameters.isClientSideEncryptionEnabled()) { + if (!parameters.isClientSideEncryptionEnabled() && !parameters.isAnalyticsAcceleratorEnabled()) { s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration) .multipartEnabled(parameters.isMultipartCopy()); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 5ac3dca08e3..cd34e17f346 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -147,9 +147,11 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; import org.apache.hadoop.fs.s3a.impl.CSEUtils; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks; import org.apache.hadoop.fs.s3a.impl.streams.StreamFactoryRequirements; +import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean isCSEEnabled; + /** + * Is this S3A FS instance using analytics accelerator? + */ + private boolean isAnalyticsAccelaratorEnabled; + /** * Bucket AccessPoint. */ @@ -629,6 +636,8 @@ public void initialize(URI name, Configuration originalConf) // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled. isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod()); + isAnalyticsAccelaratorEnabled = StreamIntegration.determineInputStreamType(conf).equals(InputStreamType.Analytics); + // Create the appropriate fsHandler instance using a factory method fsHandler = createFileSystemHandler(); fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics()); @@ -1156,6 +1165,7 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)) .withClientSideEncryptionEnabled(isCSEEnabled) .withClientSideEncryptionMaterials(cseMaterials) + .withAnalyticsAcceleratorEnabled(isAnalyticsAccelaratorEnabled) .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION)); // this is where clients and the transfer manager are created on demand. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b84f19fcd87..1eef0b404bc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.impl.WeakRefMetricsSource; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -840,6 +841,7 @@ private final class InputStreamStatistics private final AtomicLong closed; private final AtomicLong forwardSeekOperations; private final AtomicLong openOperations; + private final AtomicLong analyticsStreamOpenOperations; private final AtomicLong readExceptions; private final AtomicLong readsIncomplete; private final AtomicLong readOperations; @@ -888,7 +890,8 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES, - StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE) + StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY, STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(), STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(), @@ -927,6 +930,9 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_SEEK_FORWARD_OPERATIONS); openOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_OPENED); + analyticsStreamOpenOperations = st.getCounterReference( + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED + ); readExceptions = st.getCounterReference( StreamStatisticNames.STREAM_READ_EXCEPTIONS); readsIncomplete = st.getCounterReference( @@ -1030,6 +1036,16 @@ public long streamOpened() { return openOperations.getAndIncrement(); } + @Override + public long streamOpened(InputStreamType type) { + switch (type) { + case Analytics: + return analyticsStreamOpenOperations.getAndIncrement(); + default: + return openOperations.getAndIncrement(); + } + } + /** * {@inheritDoc}. * If the connection was aborted, increment {@link #aborted} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index cda09b622ea..3a9425de111 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -202,6 +202,11 @@ final class S3ClientCreationParameters { */ private boolean fipsEnabled; + /** + * Is analytics accelerator enabled + */ + private boolean isAnalyticsAcceleratorEnabled; + /** * List of execution interceptors to include in the chain * of interceptors in the SDK. @@ -457,6 +462,17 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean return this; } + /** + * Set the analytics accelerator enabled flag. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withAnalyticsAcceleratorEnabled(final boolean value) { + this.isAnalyticsAcceleratorEnabled = value; + return this; + } + /** * Set the KMS client region. * This is required for CSE-KMS @@ -477,6 +493,14 @@ public boolean isClientSideEncryptionEnabled() { return this.isCSEEnabled; } + /** + * Get the analytics accelerator enabled flag. + * @return analytics accelerator enabled flag. + */ + public boolean isAnalyticsAcceleratorEnabled() { + return this.isAnalyticsAcceleratorEnabled; + } + /** * Set the client side encryption materials. * diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java index 09dfa27ff4b..205a4aaa858 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.s3a.impl.streams; +import java.io.IOException; + import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.service.AbstractService; @@ -58,7 +60,7 @@ protected AbstractObjectInputStreamFactory(final String name) { * @param factoryBindingParameters parameters for the factory binding */ @Override - public void bind(final FactoryBindingParameters factoryBindingParameters) { + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Input Stream factory %s is in wrong state: %s", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java new file mode 100644 index 00000000000..9e6f98c5553 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +/** + * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports + * parquet specific optimisations such as parquet-aware prefetching. For more details, see + * https://github.com/awslabs/analytics-accelerator-s3. + */ +public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities { + + private S3SeekableInputStream inputStream; + private long lastReadCurrentPos = 0; + private volatile boolean closed; + + public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class); + + public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { + super(InputStreamType.Analytics, parameters); + S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey())); + getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); + } + + /** + * Indicates whether the given {@code capability} is supported by this stream. + * + * @param capability the capability to check. + * @return true if the given {@code capability} is supported by this stream, false otherwise. + */ + @Override + public boolean hasCapability(String capability) { + return false; + } + + @Override + public int read() throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public void seek(long pos) throws IOException { + throwIfClosed(); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + + " " + pos); + } + inputStream.seek(pos); + } + + + @Override + public synchronized long getPos() { + if (!closed) { + lastReadCurrentPos = inputStream.getPos(); + } + return lastReadCurrentPos; + } + + + /** + * Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is + * reached. Leaves the position of the stream unaltered. + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len the number of bytes to read; the n-th byte should be the last byte of the stream. + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int readTail(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.readTail(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + throwIfClosed(); + return super.available(); + } + + @Override + protected boolean isStreamOpen() { + return !isClosed(); + } + + protected boolean isClosed() { + return inputStream == null; + } + + @Override + protected void abortInFinalizer() { + try { + close(); + } catch (IOException ignored) { + + } + } + + @Override + public synchronized void close() throws IOException { + if(!closed) { + closed = true; + try { + inputStream.close(); + inputStream = null; + super.close(); + } catch (IOException ioe) { + LOG.debug("Failure closing stream {}: ", getKey()); + throw ioe; + } + } + } + + /** + * Close the stream on read failure. + * No attempt to recover from failure + * + * @param ioe exception caught. + */ + @Retries.OnceTranslated + private void onReadFailure(IOException ioe) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } else { + LOG.info("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } + this.close(); + } + + + protected void throwIfClosed() throws IOException { + if (closed) { + throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java new file mode 100644 index 00000000000..0ec6713d9ca --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.VectoredIOContext; + +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext; + +/** + * A factory for {@link AnalyticsStream}. This class is instantiated during initialization of + * {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics. + */ +public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { + + private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; + private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; + private boolean requireCrt; + + public AnalyticsStreamFactory() { + super("AnalyticsStreamFactory"); + } + + @Override + protected void serviceInit(final Configuration conf) throws Exception { + super.serviceInit(conf); + ConnectorConfiguration configuration = new ConnectorConfiguration(conf, + ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + this.seekableInputStreamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(configuration); + this.requireCrt = false; + } + + @Override + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { + super.bind(factoryBindingParameters); + this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory( + new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), + seekableInputStreamConfiguration); + } + + @Override + public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException { + return new AnalyticsStream( + parameters, + s3SeekableInputStreamFactory); + } + + + + @Override + public InputStreamType streamType() { + return InputStreamType.Analytics; + } + + /** + * Calculate Return StreamFactoryRequirements + * @return a positive thread count. + */ + @Override + public StreamFactoryRequirements factoryRequirements() { + // fill in the vector context + final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig()); + // and then disable range merging. + // this ensures that no reads are made for data which is then discarded... + // so the prefetch and block read code doesn't ever do wasteful fetches. + vectorContext.setMinSeekForVectoredReads(0); + + return new StreamFactoryRequirements(0, + 0, vectorContext); + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java index 1775fa5f05c..a9c33a60fc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java @@ -45,13 +45,11 @@ public enum InputStreamType { */ Prefetch(StreamIntegration.PREFETCH, 2, c -> new PrefetchingInputStreamFactory()), - /** * The analytics input stream. */ - Analytics(StreamIntegration.ANALYTICS, 3, c -> { - throw new IllegalArgumentException("not yet supported"); - }), + Analytics(StreamIntegration.ANALYTICS, 3, c -> + new AnalyticsStreamFactory()), /** * The a custom input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java index 63f7053e0e4..bb35a0580a2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java @@ -113,7 +113,6 @@ private StreamIntegration() { * @throws RuntimeException any binding/loading/instantiation problem */ public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) { - // Construct the factory. return determineInputStreamType(conf) .factory() @@ -135,7 +134,7 @@ public static ObjectInputStreamFactory factoryFromConfig(final Configuration con * @param conf configuration * @return a stream factory. */ - static InputStreamType determineInputStreamType(final Configuration conf) { + public static InputStreamType determineInputStreamType(final Configuration conf) { // work out the default stream; this includes looking at the // deprecated prefetch enabled key to see if it is set. if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 1e28a27e708..7ad7cf75367 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.statistics; import org.apache.hadoop.fs.impl.prefetch.PrefetchingStatistics; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.statistics.DurationTracker; /** @@ -53,6 +54,13 @@ public interface S3AInputStreamStatistics extends AutoCloseable, */ long streamOpened(); + /** + * A stream of the given type was opened. + * @param type type of input stream + * @return the previous count or zero if this is the first opening. + */ + long streamOpened(InputStreamType type); + /** * The inner stream was closed. * @param abortedConnection flag to indicate the stream was aborted, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index e47656efd18..a5d20095ba5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -22,6 +22,7 @@ import java.time.Duration; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -164,6 +165,11 @@ public long streamOpened() { return 0; } + @Override + public long streamOpened(InputStreamType type) { + return 0; + } + @Override public void streamClose(final boolean abortedConnection, final long remainingInCurrentRequest) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java index 033c2d94c7b..3e87a2f7e5f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java @@ -31,9 +31,8 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; /** * S3A contract tests creating files. @@ -93,6 +92,17 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void testOverwriteExistingFile() throws Throwable { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverwriteExistingFile(); + } + @Override public void testOverwriteNonEmptyDirectory() throws Throwable { try { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index e761e0d14bf..9c269d3ed44 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageStatistics; @@ -78,6 +79,19 @@ public void testNonDirectWrite() throws Exception { getRenameOperationCount() - renames); } + @Override + public void testDistCpUpdateCheckFileSkip() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata and data, and when a file is + // overwritten, the old data continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + // In this test case, the remote file is created, read, then deleted, and then created again + // with different contents, and read again, which leads to assertions failing. + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator Library does not support update to existing files"); + super.testDistCpUpdateCheckFileSkip(); + } + private long getRenameOperationCount() { return getFileSystem().getStorageStatistics() .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java index 5a92066e06d..54aca14798b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE; /** @@ -127,6 +128,12 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc @Override public void testConcurrentUploads() throws Throwable { assumeNotS3ExpressFileSystem(getFileSystem()); + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata and data, and when a file is + // overwritten, the old data continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), + "Analytics Accelerator currently does not support reading of over written files"); super.testConcurrentUploads(); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index fbb6d5a04d2..87e3b23cd71 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.io.Sizes.S_1M; @@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + /** + * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads. + * @throws Exception + */ + @Override + public void setup() throws Exception { + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator does not support vectored reads"); + super.setup(); + } + /** * Verify response to a vector read request which is beyond the * real length of the file. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java new file mode 100644 index 00000000000..040a141586e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.UUID; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.s3a.impl.AwsSdkWorkarounds; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.test.GenericTestUtils; + +import static org.apache.commons.io.FileUtils.ONE_KB; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; +import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS; +import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; + +import org.assertj.core.api.Assertions; + +import org.slf4j.LoggerFactory; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.io.logical.parquet.ParquetMetadataParsingTask; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; + +public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase { + + private static final String PHYSICAL_IO_PREFIX = "physicalio"; + private static final String LOGICAL_IO_PREFIX = "logicalio"; + + + private Configuration conf; + private Path testFile; + + @Before + public void setUp() throws Exception { + super.setup(); + conf = createConfiguration(); + testFile = getExternalData(conf); + } + + @Override + public Configuration createConfiguration() { + Configuration configuration = super.createConfiguration(); + if (isUsingDefaultExternalDataFile(configuration)) { + S3ATestUtils.removeBaseAndBucketOverrides(configuration, + ENDPOINT); + } + enableAnalyticsAccelerator(configuration); + return configuration; + } + + @Test + public void testConnectorFrameWorkIntegration() throws IOException { + describe("Verify S3 connector framework integration"); + + S3AFileSystem fs = + (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); + byte[] buffer = new byte[500]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = fs.open(testFile)) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + @Test + public void testMalformedParquetFooter() throws IOException { + describe("Reading a malformed parquet file should not throw an exception"); + + // File with malformed footer take from https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet. + // This test ensures AAL does not throw exceptions if footer parsing fails. It will only emit a WARN log, + // "Unable to parse parquet footer for test/malformedFooter.parquet, parquet prefetch optimisations will be disabled for this key." + Path dest = path("malformed_footer.parquet"); + + File file = new File("src/test/resources/malformed_footer.parquet"); + Path sourcePath = new Path(file.toURI().getPath()); + getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); + + byte[] buffer = new byte[500]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + @Test + public void testMultiRowGroupParquet() throws IOException { + describe("A parquet file is read successfully"); + + Path dest = path("multi_row_group.parquet"); + + File file = new File("src/test/resources/multi_row_group.parquet"); + Path sourcePath = new Path(file.toURI().getPath()); + getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); + + FileStatus fileStatus = getFileSystem().getFileStatus(dest); + + byte[] buffer = new byte[3000]; + IOStatistics ioStats; + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + ioStats = inputStream.getIOStatistics(); + inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + } + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + } + + @Test + public void testConnectorFrameworkConfigurable() { + describe("Verify S3 connector framework reads configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + + //Disable Predictive Prefetching + conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all"); + + //Set Blobstore Capacity + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + + S3SeekableInputStreamConfiguration configuration = + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); + + Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode()) + .as("AnalyticsStream configuration is not set to expected value") + .isSameAs(PrefetchMode.ALL); + + Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity()) + .as("AnalyticsStream configuration is not set to expected value") + .isEqualTo(1); + } + + @Test + public void testInvalidConfigurationThrows() throws Exception { + describe("Verify S3 connector framework throws with invalid configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + //Disable Sequential Prefetching + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index d22de3b06d8..169898df829 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -38,9 +38,7 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -93,6 +91,8 @@ protected Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); assumeEnabled(); // although not a root dir test, this confuses paths enough it shouldn't be run in // parallel with other jobs diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java index 0281c57f5cb..b056dac5a85 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java @@ -29,9 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.s3a.S3AContract; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; /** * S3A Test suite for the FSMainOperationsBaseTest tests. @@ -78,6 +76,28 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption() throws Exception { } + @Override + public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteOneAndAHalfBlocks(); + } + + @Override + public void testWriteReadAndDeleteTwoBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteTwoBlocks(); + } + @Override public void testOverwrite() throws IOException { boolean createPerformance = isCreatePerformanceEnabled(fSys); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java index 48081457658..2034ea274e7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -34,8 +34,7 @@ import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assume.*; import static org.junit.Assert.*; @@ -160,4 +159,15 @@ public void testOverwrite() throws IOException { } } } + + @Override + public void testOverWriteAndRead() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(fs.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverWriteAndRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 70dc5ee476c..daf5306dc39 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -77,6 +78,11 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + // Analytics accelerator currently does not support IOStatisticsContext, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support IOStatisticsContext"); + } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java index 72c75162c9f..f26a585776a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS; import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; @@ -87,6 +88,11 @@ public void setup() throws Exception { @Test public void testFinalizer() throws Throwable { Path path = methodPath(); + // Analytics accelerator currently does not support stream leak detection. This work is tracked + // in https://issues.apache.org/jira/browse/HADOOP-19451 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support leak detection"); + final S3AFileSystem fs = getFileSystem(); ContractTestUtils.createFile(fs, path, true, DATASET); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 3bfe69c2bca..4ec579ce4f6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -51,6 +52,11 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 049a698d208..725e54c7d85 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -33,6 +33,7 @@ import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -59,6 +60,11 @@ protected Configuration createConfiguration() { public void testRequesterPaysOptionSuccess() throws Throwable { describe("Test requester pays enabled case by reading last then first byte"); skipIfClientSideEncryption(); + // Analytics accelerator currently does not support IOStatistics which leads to the + // STREAM_READ_OPENED assertion to fail, this will be added as part of + // https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support IOStatistics"); Configuration conf = this.createConfiguration(); conf.setBoolean(ALLOW_REQUESTER_PAYS, true); // Enable bucket exists check, the first failure point people may encounter diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 14ab2b953d1..bf804415133 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -105,6 +105,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet; +import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics; +import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.DEFAULT_STREAM_TYPE; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; @@ -577,6 +579,21 @@ public static boolean isS3ExpressTestBucket(final Configuration conf) { return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), ""); } + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * @param configuration configuration to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + Configuration configuration, String message) { + assume(message, + !isAnalyticsAcceleratorEnabled(configuration)); + } + + public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { + return conf.getEnum(INPUT_STREAM_TYPE, + InputStreamType.Classic) == InputStreamType.Analytics; + } + /** * Skip a test if the filesystem lacks a required capability. * @param fs filesystem @@ -1804,6 +1821,18 @@ public static Configuration enablePrefetching(Configuration conf) { return conf; } + /** + * Enable analytics stream for S3A S3AFileSystem in tests. + * @param conf Configuration to update + * @return patched config + */ + public static Configuration enableAnalyticsAccelerator(Configuration conf) { + removeBaseAndBucketOverrides(conf, + INPUT_STREAM_TYPE); + conf.setEnum(INPUT_STREAM_TYPE, Analytics); + return conf; + } + /** * Probe for a filesystem having a specific stream type; * this is done through filesystem capabilities. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 8132b44cdb4..02f0251e8f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -167,7 +168,12 @@ private void abortActiveStream() throws IOException { @Test public void testCostOfCreatingMagicFile() throws Throwable { - describe("Files created under magic paths skip existence checks"); + describe("Files created under magic paths skip existence checks and marker deletes"); + + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -245,6 +251,10 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 1724006a831..5b489c1c39c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,6 +30,8 @@ import org.junit.Assert; import org.junit.Before; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + /** * S3a implementation of FCStatisticsBaseTest. */ @@ -44,6 +46,10 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @Before public void setUp() throws Exception { conf = new Configuration(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(conf, + "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index e94bb156cd2..75689211ba8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -53,11 +53,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; @@ -110,6 +106,10 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -388,7 +388,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable { describe("PositionedReadable.read() past the end of the file"); assumeNoPrefetching(); - verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index caf723b95de..e68ea9a0315 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -117,7 +117,6 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // now create a new FS with minimal http capacity and recovery // a separate one is used to avoid test teardown suffering // from the lack of http connections and short timeouts. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java index 6f19ba15c1c..0203b00caab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; /** @@ -54,6 +55,8 @@ public class ITestS3AHugeFilesSSECDiskBlocks public void setup() throws Exception { try { super.setup(); + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); } catch (AccessDeniedException | AWSUnsupportedFeatureException e) { skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method"); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 0f6b69cd54d..3c7644f2323 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -78,4 +79,12 @@ public List<String> outputStreamStatisticKeys() { STREAM_WRITE_EXCEPTIONS); } + @Override + public void testInputStreamStatisticRead() throws Throwable { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); + super.testInputStreamStatisticRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 0d5d2a789a0..8cb0de1b83f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { private static final int ONE_KB = 1024; @@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB]; diff --git a/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet new file mode 100644 index 00000000000..614912f630b Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/malformed_footer.parquet differ diff --git a/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet new file mode 100644 index 00000000000..b8d6a95cd44 Binary files /dev/null and b/hadoop-tools/hadoop-aws/src/test/resources/multi_row_group.parquet differ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org