[
https://issues.apache.org/jira/browse/HADOOP-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18038067#comment-18038067
]
ASF GitHub Bot commented on HADOOP-19364:
-----------------------------------------
steveloughran commented on code in PR #8007:
URL: https://github.com/apache/hadoop/pull/8007#discussion_r2523839488
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -88,6 +101,12 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
S3AFileSystem fs =
(S3AFileSystem) FileSystem.get(externalTestFile.toUri(),
getConfiguration());
+
+ long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+ // Head request for the file length.
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
1);
Review Comment:
get the iostats and the current value on L104; assert the execution matches
original + 1; avoids problem we've hit elsewhere
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -36,9 +37,20 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+
Review Comment:
nit: cut this line
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
Assertions.assertThat(objectInputStream.getInputPolicy())
.isEqualTo(S3AInputPolicy.Sequential);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+ long streamBytesRead =
objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should
track bytes read")
+ .isEqualTo(500);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ // Since policy is WHOLE_FILE, the whole file starts getting prefetched as
soon as the stream to it is opened.
+ // So prefetched bytes is fileLen - 5
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
fileLength - 5);
+
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(),
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because
the read policy is WHOLE_FILE,
// in which case, AAL will start prefetching till EoF on file open in 8MB
chunks. The file read here
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of
~21MB, resulting in 3 GETS:
// [0-8388607, 8388608-16777215, 16777216-21511173].
- verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
4);
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
5);
Review Comment:
do the same request counting as we now have in `testMultiRowGroupParquet()`;
less brittle
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -193,13 +204,19 @@ public void testStreamIsNotChecksummed() throws Throwable
{
// open the stream.
in.read();
+
// now examine the innermost stream and make sure it doesn't have a
checksum
- assertStreamIsNotChecksummed(getS3AInputStream(in));
+ assertStreamIsNotChecksummed(getS3AInputStream(in));
Review Comment:
nit, revert
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -247,10 +270,13 @@ private void onReadFailure(IOException ioe) throws
IOException {
}
private OpenStreamInformation
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+ final RequestCallback requestCallback = new
AnalyticsRequestCallback(getS3AStreamStatistics());
+
OpenStreamInformation.OpenStreamInformationBuilder
openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
- .getInputPolicy()));
+ .getInputPolicy())).requestCallback(requestCallback);
Review Comment:
nit: put on a new line
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,17 +124,90 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
Assertions.assertThat(objectInputStream.getInputPolicy())
.isEqualTo(S3AInputPolicy.Sequential);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+ long streamBytesRead =
objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should
track bytes read")
+ .isEqualTo(500);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ // Since policy is WHOLE_FILE, the whole file starts getting prefetched as
soon as the stream to it is opened.
+ // So prefetched bytes is fileLen - 5
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
fileLength - 5);
+
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(),
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
// Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because
the read policy is WHOLE_FILE,
// in which case, AAL will start prefetching till EoF on file open in 8MB
chunks. The file read here
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of
~21MB, resulting in 3 GETS:
// [0-8388607, 8388608-16777215, 16777216-21511173].
- verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
4);
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
5);
+ }
+
+ @Test
+ public void testSequentialPrefetching() throws IOException {
+
+ Configuration conf = getConfiguration();
+
+ // AAL uses a caffeine cache, and expires any prefetched data for a key 1s
after it was last accessed by default.
+ // While this works well when running on EC2, for local testing, it can
take more than 1s to download large chunks
+ // of data. Set this value to higher for testing to prevent early cache
evictions.
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + AAL_CACHE_TIMEOUT, 10000);
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(externalTestFile.toUri(),
getConfiguration());
+ byte[] buffer = new byte[10 * ONE_MB];
+ IOStatistics ioStats;
+
+ long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+ // Here we read through the 21MB external test file, but do not pass in
the WHOLE_FILE policy. Instead, we rely
+ // on AAL detecting a sequential pattern being read, and then prefetching
bytes in a geometrical progression.
+ // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB,
16MB etc. depending on how many
+ // sequential reads happen.
+ try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+ ioStats = inputStream.getIOStatistics();
+
+ inputStream.readFully(buffer, 0, ONE_MB);
+ // The first sequential read, so prefetch the next 4MB.
+ inputStream.readFully(buffer, 0, ONE_MB);
+
+ // Since ONE_MB was requested by the reader, the prefetched bytes are
3MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 *
ONE_MB);
+
+ // These next two reads are within the last prefetched bytes, so no
further bytes are prefetched.
+ inputStream.readFully(buffer, 0, 2 * ONE_MB);
+ inputStream.readFully(buffer, 0, ONE_MB);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 *
ONE_MB);
+ // Two cache hits, as the previous two reads were already prefetched.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+ // Another sequential read, GP will now prefetch the next 8MB of data.
+ inputStream.readFully(buffer, 0, ONE_MB);
+ // Cache hit is still 2, as the previous read required a new GET request
as it was outside the previously fetched
+ // 4MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+ // A total of 10MB is prefetched - 3MB and then 7MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 *
ONE_MB);
Review Comment:
so we are reading 10 MB of data. I wonder if we should consider this a scale
test?
I'd say no as
* it's reading, not writing
* everyone's networks should be faster in the decade+ since the Huge tests
were first written
* having scale off means it gets run more often
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String
bufferType) {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
+ // Set the coalesce tolerance to 1KB, default is 1MB.
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
Review Comment:
still need those ` removeBaseAndBufferOverrides()` calls
+ disable fs caching
##########
hadoop-project/pom.xml:
##########
@@ -211,7 +211,7 @@
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.35.4</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
+
<amazon-s3-analyticsaccelerator-s3.version>1.3.1</amazon-s3-analyticsaccelerator-s3.version>
Review Comment:
split this into its own patch and update LICENSE-binary in it too
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -439,18 +455,27 @@ public void testVectorReadPastEOF() throws Throwable {
byte[] buf = new byte[longLen];
ByteBuffer bb = ByteBuffer.wrap(buf);
final FileRange range = FileRange.createFileRange(0, longLen);
- in.readVectored(Arrays.asList(range), (i) -> bb);
- interceptFuture(EOFException.class,
- "",
- ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
- TimeUnit.SECONDS,
- range.getData());
- assertS3StreamClosed(in);
- return "vector read past EOF with " + in;
+
+ // For AAL, if there is no eTag, the provided length will not be
passed in, and a HEAD request will be made.
+ // AAL requires the etag to detect changes in the object and then do
cache eviction if required.
+ if (isAnalyticsStream()) {
+ intercept(EOFException.class, () ->
in.readVectored(Arrays.asList(range), (i) -> bb));
Review Comment:
nit, split in.readVectored() onto the next line
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -261,10 +277,12 @@ public void testOpenFileLongerLengthReadFully() throws
Throwable {
}
},
always(),
- // two GET calls were made, one for readFully,
- // the second on the read() past the EOF
+ // two GET calls were made, one for readFully,
Review Comment:
nit, revert
> S3A Analytics-Accelerator: Add IoStatistics support
> ---------------------------------------------------
>
> Key: HADOOP-19364
> URL: https://issues.apache.org/jira/browse/HADOOP-19364
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
>
> S3A provides InputStream statistics:
> [https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java]
> This helps track things like how many bytes were read from a stream etc.
>
> The current integration does not currently implement statistics. To start off
> with we should identify which of these statistics makes sense for us track in
> the new stream. Some examples are:
>
> 1/ bytesRead
> 2/ readOperationStarted
> 3/ initiateGetRequest
>
> Some of these (1 and 2) are more straightforward, and should not require any
> changes to analytics-accelerator-s3, but tracking GET requests will require
> this.
> We should also add tests that make assertions on these statistics. See
> ITestS3APrefetchingInputStream for an example to do this.
> And see https://issues.apache.org/jira/browse/HADOOP-18190 for how this was
> done on the prefetching stream, and PR:
> https://github.com/apache/hadoop/pull/4458
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]