[ 
https://issues.apache.org/jira/browse/HADOOP-19364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028866#comment-18028866
 ] 

ASF GitHub Bot commented on HADOOP-19364:
-----------------------------------------

steveloughran commented on code in PR #8007:
URL: https://github.com/apache/hadoop/pull/8007#discussion_r2417685380


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;

Review Comment:
   nit: import ordering



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -34,11 +35,11 @@
 import org.junit.jupiter.params.ParameterizedClass;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;

Review Comment:
   prefer not the .* except for lots of constants; can you stop the IDE from 
auto-enabling it.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + "physicalio.request.coalesce.tolerance", 10 * ONE_KB);
+
+    // Set the minimum block size to 32KB. AAL uses a default block size of 
128KB, which means the minimum size a S3
+    // request will be is 128KB. Since the file being read is 128KB, we need 
to  use this here to demonstrate that
+    // separate GET requests are made for ranges that are not coalesced.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + "physicalio.readbuffersize", 32 * ONE_KB);

Review Comment:
   S_32K



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -53,6 +54,8 @@
 @MethodSource("params")
 public class ITestS3AContractAnalyticsStreamVectoredRead extends 
AbstractContractVectoredReadTest {
 
+  private static final int ONE_KB = 1024;

Review Comment:
   org.apache.hadoop.io.Sizes.S_1K



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + "physicalio.request.coalesce.tolerance", 10 * ONE_KB);

Review Comment:
   create a new S_10K in Sizes for this, then use. 



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java:
##########
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +

Review Comment:
   Add new strings in Constants, and use removeBaseAndBufferOverrides to make 
sure there's no manual overrrides there to break tests



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java:
##########
@@ -489,6 +489,16 @@ public final class StreamStatisticNames {
   public static final String STREAM_FILE_CACHE_EVICTION
       = "stream_file_cache_eviction";
 
+  /**
+   * Bytes that were prefetched by the stream.
+   */
+  public static final String STREAM_READ_PREFETCHED_BYTES = 
"stream_read_prefetched_bytes";

Review Comment:
   Add entries in `org.apache.hadoop.fs.s3a.Statistic`; these are scanned and 
used to create the full filesystem instance stats which the input stream 
updates in close()



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -105,9 +112,21 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Total file size is: 21511173, and read starts from pos 5. Since policy 
is WHOLE_FILE, the whole file starts
+    // getting prefetched as soon as the stream to it is opened. So prefetched 
bytes is 21511173 - 5 = 21511168
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
21511168);

Review Comment:
   explicitly do the maths in the code "len - 5" for future maintenance. Leave 
that explanation. In fact, we should plan for the nightmare scenario of "file 
goes away" by not having any assumptions. We also need to handle test setups 
where its on a third-party store.
   
   - grab its length
   - if too short, fail the test meaningfully
   - calculate the relevant values



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -203,4 +310,59 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = path("seek-test.txt");
+    byte[] data = dataset(5 * S_1M, 256, 255);
+    writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
+
+    byte[] buffer = new byte[S_1M];
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.read(buffer);
+      inputStream.seek(2 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+      inputStream.seek(3 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);

Review Comment:
   really nice to see this. 





> S3A Analytics-Accelerator: Add IoStatistics support
> ---------------------------------------------------
>
>                 Key: HADOOP-19364
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19364
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3A provides InputStream statistics: 
> [https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java]
> This helps track things like how many bytes were read from a stream etc. 
>  
> The current integration does not currently implement statistics. To start off 
> with we should identify which of these statistics makes sense for us track in 
> the new stream. Some examples are:
>  
> 1/ bytesRead
> 2/ readOperationStarted
> 3/ initiateGetRequest
>  
> Some of these (1 and 2) are more straightforward, and should not require any 
> changes to analytics-accelerator-s3, but tracking GET requests will require 
> this. 
> We should also add tests that make assertions on these statistics. See 
> ITestS3APrefetchingInputStream for an example to do this. 
> And see https://issues.apache.org/jira/browse/HADOOP-18190 for how this was 
> done on the prefetching stream, and PR: 
> https://github.com/apache/hadoop/pull/4458



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to