steveloughran commented on code in PR #8007: URL: https://github.com/apache/hadoop/pull/8007#discussion_r2417685380
########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; Review Comment: nit: import ordering ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -34,11 +35,11 @@ import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; -import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; -import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; Review Comment: prefer not the .* except for lots of constants; can you stop the IDE from auto-enabling it. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + // Set the coalesce tolerance to 1KB, default is 1MB. + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + "physicalio.request.coalesce.tolerance", 10 * ONE_KB); + + // Set the minimum block size to 32KB. AAL uses a default block size of 128KB, which means the minimum size a S3 + // request will be is 128KB. Since the file being read is 128KB, we need to use this here to demonstrate that + // separate GET requests are made for ranges that are not coalesced. + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + "physicalio.readbuffersize", 32 * ONE_KB); Review Comment: S_32K ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -53,6 +54,8 @@ @MethodSource("params") public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest { + private static final int ONE_KB = 1024; Review Comment: org.apache.hadoop.io.Sizes.S_1K ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + // Set the coalesce tolerance to 1KB, default is 1MB. + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + "physicalio.request.coalesce.tolerance", 10 * ONE_KB); Review Comment: create a new S_10K in Sizes for this, then use. ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java: ########## @@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + // Set the coalesce tolerance to 1KB, default is 1MB. + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + Review Comment: Add new strings in Constants, and use removeBaseAndBufferOverrides to make sure there's no manual overrrides there to break tests ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java: ########## @@ -489,6 +489,16 @@ public final class StreamStatisticNames { public static final String STREAM_FILE_CACHE_EVICTION = "stream_file_cache_eviction"; + /** + * Bytes that were prefetched by the stream. + */ + public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes"; Review Comment: Add entries in `org.apache.hadoop.fs.s3a.Statistic`; these are scanned and used to create the full filesystem instance stats which the input stream updates in close() ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java: ########## @@ -105,9 +112,21 @@ public void testConnectorFrameWorkIntegration() throws Throwable { Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); Assertions.assertThat(objectInputStream.getInputPolicy()) .isEqualTo(S3AInputPolicy.Sequential); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + // Total file size is: 21511173, and read starts from pos 5. Since policy is WHOLE_FILE, the whole file starts + // getting prefetched as soon as the stream to it is opened. So prefetched bytes is 21511173 - 5 = 21511168 + verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 21511168); Review Comment: explicitly do the maths in the code "len - 5" for future maintenance. Leave that explanation. In fact, we should plan for the nightmare scenario of "file goes away" by not having any assumptions. We also need to handle test setups where its on a third-party store. - grab its length - if too short, fail the test meaningfully - calculate the relevant values ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java: ########## @@ -203,4 +310,59 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + + @Test + public void testRandomSeekPatternGets() throws Throwable { + describe("Random seek pattern should optimize GET requests"); + + Path dest = path("seek-test.txt"); + byte[] data = dataset(5 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true); + + byte[] buffer = new byte[S_1M]; + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + + inputStream.read(buffer); + inputStream.seek(2 * S_1M); + inputStream.read(new byte[512 * S_1K]); + inputStream.seek(3 * S_1M); + inputStream.read(new byte[512 * S_1K]); + + verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1); Review Comment: really nice to see this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
