steveloughran commented on code in PR #7334:
URL: https://github.com/apache/hadoop/pull/7334#discussion_r1969584663
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java:
##########
@@ -127,6 +128,12 @@ public void
testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc
@Override
public void testConcurrentUploads() throws Throwable {
assumeNotS3ExpressFileSystem(getFileSystem());
+ // Currently analytics accelerator does not support reading of files that
have been overwritten.
+ // This is because the analytics accelerator library caches metadata and
data, and when a file is
+ // overwritten, the old data continues to be used, until it is removed
from the cache over
+ // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
Review Comment:
this is a whole-file read? what's cached: length?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more
details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!closed) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until
end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last
byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ throwIfClosed();
+ return super.available();
+ }
+
+ @Override
+ protected boolean isStreamOpen() {
+ return !isClosed();
+ }
+
+ protected boolean isClosed() {
+ return inputStream == null;
+ }
+
+ @Override
+ protected void abortInFinalizer() {
+ try {
+ close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if(!closed) {
+ closed = true;
+ try {
+ inputStream.close();
+ inputStream = null;
+ super.close();
+ } catch (IOException ioe) {
+ LOG.debug("Failure closing stream {}: ", getKey());
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Close the stream on read failure.
+ * No attempt to recover from failure
+ *
+ * @param ioe exception caught.
+ */
+ @Retries.OnceTranslated
+ private void onReadFailure(IOException ioe) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ } else {
+ LOG.info("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ }
+ this.close();
+ }
+
+ private OpenStreamInformation
buildOpenStreamInformation(ObjectReadParameters parameters) {
+ OpenStreamInformation.OpenStreamInformationBuilder
openStreamInformationBuilder =
+ OpenStreamInformation.builder()
+ .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
+ .getInputPolicy()));
+
+ if (parameters.getObjectAttributes().getETag() != null) {
+ openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
+ .contentLength(parameters.getObjectAttributes().getLen())
+ .etag(parameters.getObjectAttributes().getETag()).build());
+ }
+
+ return openStreamInformationBuilder.build();
+ }
+
+ /**
+ * If S3A's input policy is Sequential, that is, if the file format to be
read is sequential
+ * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's
parquet specific
+ * optimisations will be turned off, regardless of the file extension. This
is to allow for
+ * applications like DISTCP that read parquet files, but will read them
whole, and so do not
+ * follow the typical parquet read patterns of reading footer first etc. and
will not benefit
+ * from parquet optimisations.
+ * Else, AAL will make a decision on which optimisations based on the file
extension,
+ * if the file ends in .par or .parquet, then parquet specific optimisations
are used.
+ *
+ * @param inputPolicy S3A's input file policy passed down when opening the
file
+ * @return the AAL read policy
+ */
+ private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
+ switch (inputPolicy) {
+ case Sequential:
+ return InputPolicy.Sequential;
+ default:
+ return InputPolicy.None;
+ }
+ }
+
+ protected void throwIfClosed() throws IOException {
+ if (closed) {
+ throw new IOException(getKey() + ": " +
FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+}
Review Comment:
nit; add a newline
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration
conf) {
return new S3AContract(conf);
}
+ /**
+ * Analytics Accelerator Library for Amazon S3 does not support Vectored
Reads.
+ * @throws Exception
+ */
+ @Override
+ public void setup() throws Exception {
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+ "Analytics Accelerator does not support vectored reads");
+ super.setup();
Review Comment:
must always be the first method in setup unless you really need to get in
first; it must be the only place to call createConfiguration
I know, there's the extra cost of test dir setup/delete, but given the
overall duration of a test run, it's not that expensive
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -31,9 +31,8 @@
import static
org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
Review Comment:
prefer static imports
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java:
##########
@@ -1030,6 +1036,16 @@ public long streamOpened() {
return openOperations.getAndIncrement();
}
+ @Override
+ public long streamOpened(InputStreamType type) {
+ switch (type) {
+ case Analytics:
+ return analyticsStreamOpenOperations.getAndIncrement();
+ default:
+ return openOperations.getAndIncrement();
Review Comment:
proposed: you always increment openOperations;
```
long count = openOperations.getAndIncrement()
if (type == Analytics) count=analyticsStreamOpenOperations.getAndIncrement()
return count
```
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
Review Comment:
use Assertj
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMultiRowGroupParquet() throws IOException {
+ describe("A parquet file is read successfully");
+
+ Path dest = path("multi_row_group.parquet");
+
+ File file = new File("src/test/resources/multi_row_group.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+ byte[] buffer = new byte[3000];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
Review Comment:
this is a no-op. did you want to add some options?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMultiRowGroupParquet() throws IOException {
+ describe("A parquet file is read successfully");
+
+ Path dest = path("multi_row_group.parquet");
+
+ File file = new File("src/test/resources/multi_row_group.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+ byte[] buffer = new byte[3000];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+
Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
+ .as("AnalyticsStream configuration is not set to expected value")
+ .isSameAs(PrefetchMode.ALL);
Review Comment:
this actually the kind of thing that hasCapability() should export, e.g with
a capability `ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
"." + LOGICAL_IO_PREFIX + ".prefetching.mode.all"
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -202,6 +202,11 @@ final class S3ClientCreationParameters {
*/
private boolean fipsEnabled;
+ /**
+ * Is analytics accelerator enabled
Review Comment:
nit, add ?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -53,11 +53,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
Review Comment:
revert
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -440,6 +442,11 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
*/
private boolean isCSEEnabled;
+ /**
+ * Is this S3A FS instance using analytics accelerator?
+ */
+ private boolean isAnalyticsAccelaratorEnabled;
Review Comment:
FWIW I've got a draft plan in my head to move all these bools and other
config options into some config class under store...probably start with the
Conditional Create stuff where the new flags go in. Microsoft's
reflection-based resolution is what I'm looking at.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more
details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
Review Comment:
IDE is unhappy; make superclass getS3AStreamStatistics() final
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
Review Comment:
the test data endpoint should be in
hadoop-tools/hadoop-aws/src/test/resources/core-site.xml; is the problem here
you need to turn it off for all the other tests?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
Review Comment:
getConfiguration everywhere; no need to cache it on L60
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more
details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
Review Comment:
cut. the superclass does implement two capabilities now: leak detection and
IOStatistics
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more
details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!closed) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until
end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last
byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ throwIfClosed();
+ return super.available();
+ }
+
+ @Override
+ protected boolean isStreamOpen() {
+ return !isClosed();
+ }
+
+ protected boolean isClosed() {
+ return inputStream == null;
+ }
+
+ @Override
+ protected void abortInFinalizer() {
+ try {
+ close();
+ } catch (IOException ignored) {
+
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if(!closed) {
+ closed = true;
+ try {
+ inputStream.close();
+ inputStream = null;
+ super.close();
+ } catch (IOException ioe) {
+ LOG.debug("Failure closing stream {}: ", getKey());
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Close the stream on read failure.
+ * No attempt to recover from failure
+ *
+ * @param ioe exception caught.
+ */
+ @Retries.OnceTranslated
+ private void onReadFailure(IOException ioe) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ } else {
+ LOG.info("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ getKey(), ioe);
+ }
+ this.close();
+ }
+
+ private OpenStreamInformation
buildOpenStreamInformation(ObjectReadParameters parameters) {
+ OpenStreamInformation.OpenStreamInformationBuilder
openStreamInformationBuilder =
+ OpenStreamInformation.builder()
+ .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
+ .getInputPolicy()));
+
+ if (parameters.getObjectAttributes().getETag() != null) {
+ openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
+ .contentLength(parameters.getObjectAttributes().getLen())
+ .etag(parameters.getObjectAttributes().getETag()).build());
+ }
+
+ return openStreamInformationBuilder.build();
+ }
+
+ /**
+ * If S3A's input policy is Sequential, that is, if the file format to be
read is sequential
+ * (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's
parquet specific
+ * optimisations will be turned off, regardless of the file extension. This
is to allow for
+ * applications like DISTCP that read parquet files, but will read them
whole, and so do not
+ * follow the typical parquet read patterns of reading footer first etc. and
will not benefit
+ * from parquet optimisations.
+ * Else, AAL will make a decision on which optimisations based on the file
extension,
+ * if the file ends in .par or .parquet, then parquet specific optimisations
are used.
+ *
+ * @param inputPolicy S3A's input file policy passed down when opening the
file
+ * @return the AAL read policy
+ */
+ private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
Review Comment:
👍🏽
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
+import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+
+/**
+ * Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
+ * parquet specific optimisations such as parquet-aware prefetching. For more
details, see
+ * https://github.com/awslabs/analytics-accelerator-s3.
+ */
+public class AnalyticsStream extends ObjectInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(AnalyticsStream.class);
+
+ public AnalyticsStream(final ObjectReadParameters parameters, final
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
+ super(InputStreamType.Analytics, parameters);
+ S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
+ getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!closed) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until
end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last
byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
Review Comment:
I think recovery will need some future work here, either here or underneath.
Complex to do and test
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java:
##########
@@ -45,13 +45,11 @@ public enum InputStreamType {
*/
Prefetch(StreamIntegration.PREFETCH, 2, c ->
new PrefetchingInputStreamFactory()),
-
/**
* The analytics input stream.
*/
- Analytics(StreamIntegration.ANALYTICS, 3, c -> {
- throw new IllegalArgumentException("not yet supported");
- }),
+ Analytics(StreamIntegration.ANALYTICS, 3, c ->
Review Comment:
this is the most complicated/sophisticated enum I've ever done! hope it
works. I don't see any reason for a CNFE to be raised if the analytics JAR is
not on the classpath though -do you?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java:
##########
@@ -93,6 +92,17 @@ protected Configuration createConfiguration() {
return conf;
}
+ @Override
+ public void testOverwriteExistingFile() throws Throwable {
+ // Currently analytics accelerator does not support reading of files that
have been overwritten.
+ // This is because the analytics accelerator library caches metadata, and
when a file is
+ // overwritten, the old metadata continues to be used, until it is removed
from the cache over
+ // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
+ skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(),
Review Comment:
use `getConfiguration()`, which is inited in setup()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java:
##########
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends
AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
+ // Analytics accelerator currently does not support IOStatistics, this
will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
Review Comment:
getConfiguration()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java:
##########
@@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration
conf) {
return new S3AContract(conf);
}
+ /**
+ * Analytics Accelerator Library for Amazon S3 does not support Vectored
Reads.
+ * @throws Exception
+ */
+ @Override
+ public void setup() throws Exception {
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
Review Comment:
I understand why the s3a-specific tests are being skipped, but the base
contract tests should all work, and we do need an
AbstractContractVectoredReadTest to validate those.
Proposed: add a new subclass `ITestAnalyticsStreamVectoredRead()` which just
subclasses the contract tests, forces into analytics stream.
Note also that somehow ITestS3AContractVectoredRead.testEOFRanges did get
executed for me.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
Review Comment:
rename to `externalTestFile`
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
Review Comment:
should all be moved up to the junit block
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java:
##########
@@ -38,9 +38,7 @@
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static
org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
Review Comment:
revert, maybe change your ide settings
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java:
##########
@@ -29,9 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
Review Comment:
revert
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
Review Comment:
0. give a more detailed name, e.g `ITestS3AAnalyticsAcceleratorStreamReading`
1. add a javadoc
1. emphasise this uses external file
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
Review Comment:
use the builder here rather than play with bucket settings.
See `JsonSerialization.load()` for an example.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
Review Comment:
This could potentially be very noisy
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMultiRowGroupParquet() throws IOException {
+ describe("A parquet file is read successfully");
Review Comment:
a bit more detail on a rowgroup in text or javadocs
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
Review Comment:
follow with openFile() of format parquet and of format whole-file, to make
sure these are all good too; whole file read will not inc analytics counter
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -1827,4 +1827,12 @@ private Constants() {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+
+ /**
+ * Prefix to configure Analytics Accelerator Library.
Review Comment:
{@value}
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMultiRowGroupParquet() throws IOException {
+ describe("A parquet file is read successfully");
+
+ Path dest = path("multi_row_group.parquet");
+
+ File file = new File("src/test/resources/multi_row_group.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+ byte[] buffer = new byte[3000];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
Review Comment:
create a new Configuration(getConfiguration()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
Review Comment:
call getFileStatus here, pass as param to the openFile() calls I've proposed
before. It should succeed, hopefully skipping the HEAD and picking up file
length. Certainly *not* raise any IllegalArgumentException.
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = fs.open(testFile)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+
+ final InputStream wrappedStream = inputStream.getWrappedStream();
+ ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
+ assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
+ assertEquals(objectInputStream.getInputPolicy(),
S3AInputPolicy.Sequential);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMalformedParquetFooter() throws IOException {
+ describe("Reading a malformed parquet file should not throw an exception");
+
+ // File with malformed footer take from
https://github.com/apache/parquet-testing/blob/master/bad_data/PARQUET-1481.parquet.
+ // This test ensures AAL does not throw exceptions if footer parsing
fails. It will only emit a WARN log,
+ // "Unable to parse parquet footer for test/malformedFooter.parquet,
parquet prefetch optimisations will be disabled for this key."
+ Path dest = path("malformed_footer.parquet");
+
+ File file = new File("src/test/resources/malformed_footer.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ byte[] buffer = new byte[500];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testMultiRowGroupParquet() throws IOException {
+ describe("A parquet file is read successfully");
+
+ Path dest = path("multi_row_group.parquet");
+
+ File file = new File("src/test/resources/multi_row_group.parquet");
+ Path sourcePath = new Path(file.toURI().getPath());
+ getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+ FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+
+ byte[] buffer = new byte[3000];
+ IOStatistics ioStats;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurable() {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+
Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
+ .as("AnalyticsStream configuration is not set to expected value")
+ .isSameAs(PrefetchMode.ALL);
+
+
Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
+ .as("AnalyticsStream configuration is not set to expected value")
+ .isEqualTo(1);
+ }
+
+ @Test
+ public void testInvalidConfigurationThrows() throws Exception {
+ describe("Verify S3 connector framework throws with invalid
configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+ //Disable Sequential Prefetching
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
Review Comment:
LambaTestUtils.intercept, as if an exception isn't raised you'll get the
configuration.toString() of what was built
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
+import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+
+
+import org.assertj.core.api.Assertions;
+
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+
+ private Configuration conf;
+ private Path testFile;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setup();
+ conf = createConfiguration();
+ testFile = getExternalData(conf);
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration configuration = super.createConfiguration();
+ if (isUsingDefaultExternalDataFile(configuration)) {
+ S3ATestUtils.removeBaseAndBucketOverrides(configuration,
+ ENDPOINT);
+ }
+ enableAnalyticsAccelerator(configuration);
+ return configuration;
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegration() throws IOException {
+ describe("Verify S3 connector framework integration");
+
+ removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
+ conf.set(INPUT_FADVISE, "whole-file");
Review Comment:
use local var and init `new Configuration(getConfiguration())`
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java:
##########
@@ -78,4 +79,12 @@ public List<String> outputStreamStatisticKeys() {
STREAM_WRITE_EXCEPTIONS);
}
+ @Override
+ public void testInputStreamStatisticRead() throws Throwable {
+ // Analytics accelerator currently does not support IOStatistics, this
will be added as
+ // part of https://issues.apache.org/jira/browse/HADOOP-19364
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
Review Comment:
getConfiguration().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]