This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 49dd88b041f HADOOP-19559. S3A: Updates S3A default stream to AAL, adds 
IoStats. (#8099) (#8007)
49dd88b041f is described below

commit 49dd88b041f1650d886ff3b229a31834435783aa
Author: ahmarsuhail <[email protected]>
AuthorDate: Fri Nov 21 13:30:48 2025 +0000

    HADOOP-19559. S3A: Updates S3A default stream to AAL, adds IoStats. (#8099) 
(#8007)
---
 .../hadoop/fs/statistics/StreamStatisticNames.java |  17 ++
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  36 +++-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  14 +-
 .../s3a/impl/streams/AnalyticsRequestCallback.java |  69 +++++++
 .../fs/s3a/impl/streams/AnalyticsStream.java       |  45 ++++-
 .../s3a/impl/streams/AnalyticsStreamFactory.java   |   2 -
 .../fs/s3a/impl/streams/StreamIntegration.java     |   2 +-
 .../s3a/statistics/S3AInputStreamStatistics.java   |  26 +++
 .../statistics/impl/EmptyS3AStatisticsContext.java |  25 +++
 ...TestS3AContractAnalyticsStreamVectoredRead.java |  71 ++++++-
 .../fs/contract/s3a/ITestS3AContractCreate.java    |   7 -
 .../fs/contract/s3a/ITestS3AContractDistCp.java    |  10 -
 .../s3a/ITestS3AContractMultipartUploader.java     |   7 -
 .../ITestS3AAnalyticsAcceleratorStreamReading.java | 214 ++++++++++++++++++++-
 .../hadoop/fs/s3a/ITestS3AFSMainOperations.java    |  13 --
 .../hadoop/fs/s3a/ITestS3AFileSystemContract.java  |   7 -
 .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java |   9 +-
 .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java  |   5 -
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  20 ++
 .../fs/s3a/commit/ITestCommitOperationCost.java    |   9 -
 .../fileContext/ITestS3AFileContextStatistics.java |   6 -
 .../fs/s3a/performance/ITestS3AOpenCost.java       |  92 ++++++---
 .../fs/s3a/performance/ITestUnbufferDraining.java  |   3 +
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   2 +
 .../ITestS3AContractStreamIOStatistics.java        |   5 -
 .../statistics/ITestS3AFileSystemStatistic.java    |   5 -
 26 files changed, 607 insertions(+), 114 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 09c19ad071a..e8cb9f6469a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -489,6 +489,23 @@ public final class StreamStatisticNames {
   public static final String STREAM_FILE_CACHE_EVICTION
       = "stream_file_cache_eviction";
 
+  /**
+   * Bytes that were prefetched by the stream.
+   */
+  public static final String STREAM_READ_PREFETCHED_BYTES = 
"stream_read_prefetched_bytes";
+
+  /**
+   * Tracks failures in footer parsing.
+   */
+  public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+          = "stream_read_parquet_footer_parsing_failed";
+
+  /**
+   * A cache hit occurs when the request range can be satisfied by the data in 
the cache.
+   */
+  public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";
+
+
   private StreamStatisticNames() {
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b3c907428ac..bc2c83a6872 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -81,6 +81,9 @@
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
 import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 
@@ -891,7 +894,12 @@ private InputStreamStatistics(
               StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
               StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
-              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
+              StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
+              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
+              StreamStatisticNames.STREAM_READ_CACHE_HIT,
+              StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+              StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+                  )
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
               STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
               STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -1128,6 +1136,32 @@ public void readVectoredBytesDiscarded(int discarded) {
       bytesDiscardedInVectoredIO.addAndGet(discarded);
     }
 
+    @Override
+    public void getRequestInitiated() {
+      increment(ACTION_HTTP_GET_REQUEST);
+    }
+
+    @Override
+    public void headRequestInitiated() {
+      increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST);
+    }
+
+    @Override
+    public void bytesPrefetched(long size) {
+      increment(STREAM_READ_PREFETCHED_BYTES, size);
+    }
+
+    @Override
+    public void footerParsingFailed() {
+      increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
+    }
+
+    @Override
+    public void streamReadCacheHit() {
+      increment(STREAM_READ_CACHE_HIT);
+    }
+
+
     @Override
     public void executorAcquired(Duration timeInQueue) {
       // update the duration fields in the IOStatistics.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6389742167d..4b09530ba9b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -458,9 +458,21 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
       "Gauge of active memory in use",
       TYPE_GAUGE),
+  STREAM_READ_PREFETCH_BYTES(
+          StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+          "Bytes prefetched by AAL stream",
+          TYPE_COUNTER),
+  STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
+          StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
+          "Count of Parquet footer parsing failures encountered by AAL",
+          TYPE_COUNTER),
+  STREAM_READ_CACHE_HIT(
+          StreamStatisticNames.STREAM_READ_CACHE_HIT,
+          "Count of cache hits in AAL stream",
+          TYPE_COUNTER),
 
-  /* Stream Write statistics */
 
+  /* Stream Write statistics */
   STREAM_WRITE_EXCEPTIONS(
       StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
       "Count of stream write failures reported",
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
new file mode 100644
index 00000000000..d3940bd86e8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+
+/**
+ * Implementation of AAL's RequestCallback interface that tracks analytics 
operations.
+ */
+public class AnalyticsRequestCallback implements RequestCallback {
+  private final S3AInputStreamStatistics statistics;
+
+    /**
+     * Create a new callback instance.
+     * @param statistics the statistics to update
+     */
+  public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
+    this.statistics = statistics;
+  }
+
+  @Override
+    public void onGetRequest() {
+    statistics.getRequestInitiated();
+  }
+
+  @Override
+    public void onHeadRequest() {
+    statistics.headRequestInitiated();
+  }
+
+  @Override
+  public void onBlockPrefetch(long start, long end) {
+    statistics.bytesPrefetched(end - start + 1);
+  }
+
+  @Override
+  public void footerParsingFailed() {
+    statistics.footerParsingFailed();
+  }
+
+  @Override
+  public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
+    statistics.readVectoredOperationStarted(numIncomingRanges, 
numCombinedRanges);
+  }
+
+  @Override
+  public void onCacheHit() {
+    statistics.streamReadCacheHit();
+  }
+
+}
+
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 8920b5b2dfc..954ee3a0e48 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -40,6 +40,7 @@
 import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
 import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
 import software.amazon.s3.analyticsaccelerator.util.S3URI;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
       final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws 
IOException {
     super(InputStreamType.Analytics, parameters);
     S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+
     this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
         s3Attributes.getKey()), buildOpenStreamInformation(parameters));
     getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters,
   @Override
   public int read() throws IOException {
     throwIfClosed();
+
+    getS3AStreamStatistics().readOperationStarted(getPos(), 1);
+
     int bytesRead;
     try {
       bytesRead = inputStream.read();
@@ -87,6 +92,11 @@ public int read() throws IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead != -1) {
+      incrementBytesRead(1);
+    }
+
     return bytesRead;
   }
 
@@ -122,6 +132,8 @@ public synchronized long getPos() {
    */
   public int readTail(byte[] buf, int off, int len) throws IOException {
     throwIfClosed();
+    getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
     int bytesRead;
     try {
       bytesRead = inputStream.readTail(buf, off, len);
@@ -129,12 +141,20 @@ public int readTail(byte[] buf, int off, int len) throws 
IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead > 0) {
+      incrementBytesRead(bytesRead);
+    }
+
     return bytesRead;
   }
 
   @Override
   public int read(byte[] buf, int off, int len) throws IOException {
     throwIfClosed();
+
+    getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
     int bytesRead;
     try {
       bytesRead = inputStream.read(buf, off, len);
@@ -142,6 +162,11 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead > 0) {
+      incrementBytesRead(bytesRead);
+    }
+
     return bytesRead;
   }
 
@@ -177,8 +202,6 @@ public void readVectored(final List<? extends FileRange> 
ranges,
       range.setData(result);
     }
 
-    // AAL does not do any range coalescing, so input and combined ranges are 
the same.
-    this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), 
ranges.size());
     inputStream.readVectored(objectRanges, allocate, release);
   }
 
@@ -247,10 +270,14 @@ private void onReadFailure(IOException ioe) throws 
IOException {
   }
 
   private OpenStreamInformation 
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+    final RequestCallback requestCallback = new 
AnalyticsRequestCallback(getS3AStreamStatistics());
+
     OpenStreamInformation.OpenStreamInformationBuilder 
openStreamInformationBuilder =
         OpenStreamInformation.builder()
             .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
-            .getInputPolicy()));
+            .getInputPolicy()))
+            .requestCallback(requestCallback);
 
     if (parameters.getObjectAttributes().getETag() != null) {
       openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -300,4 +327,16 @@ protected void throwIfClosed() throws IOException {
       throw new IOException(getKey() + ": " + 
FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
+
+  /**
+   * Increment the bytes read counter if there is a stats instance
+   * and the number of bytes read is more than zero.
+   * @param bytesRead number of bytes read
+   */
+  private void incrementBytesRead(long bytesRead) {
+    getS3AStreamStatistics().bytesRead(bytesRead);
+    if (getContext().getStats() != null && bytesRead > 0) {
+      getContext().getStats().incrementBytesRead(bytesRead);
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index 50333c68e0c..cff743242f1 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends 
AbstractObjectInputStreamFactory {
 
   private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
   private LazyAutoCloseableReference<S3SeekableInputStreamFactory>  
s3SeekableInputStreamFactory;
-  private boolean requireCrt;
 
   public AnalyticsStreamFactory() {
     super("AnalyticsStreamFactory");
@@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws 
Exception {
                 ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
     this.seekableInputStreamConfiguration =
                 
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
-    this.requireCrt = false;
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
index bb35a0580a2..07159343d33 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
@@ -77,7 +77,7 @@ public final class StreamIntegration {
   /**
    * What is the default type?
    */
-  public static final InputStreamType DEFAULT_STREAM_TYPE = 
InputStreamType.Classic;
+  public static final InputStreamType DEFAULT_STREAM_TYPE = 
InputStreamType.Analytics;
 
   /**
    * Configuration deprecation log for warning about use of the
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 7ad7cf75367..1e48b3ef850 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -119,6 +119,32 @@ void readVectoredOperationStarted(int numIncomingRanges,
    */
   void readVectoredBytesDiscarded(int discarded);
 
+  /**
+   * Number of S3 GET requests initiated by the stream.
+   */
+  void getRequestInitiated();
+
+  /**
+   * Number of S3 HEAD requests initiated by the stream.
+   */
+  void headRequestInitiated();
+
+  /**
+   * Number of bytes prefetched.
+   * @param size number of bytes prefetched.
+   */
+  void bytesPrefetched(long size);
+
+  /**
+   * Number of failures in footer parsing.
+   */
+  void footerParsingFailed();
+
+  /**
+   * If the request data is already in the data cache.
+   */
+  void streamReadCacheHit();
+
   @Override
   void close();
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 26b9f2b1568..cf339269549 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -212,6 +212,31 @@ public void readVectoredBytesDiscarded(int discarded) {
 
     }
 
+    @Override
+    public void getRequestInitiated() {
+
+    }
+
+    @Override
+    public void headRequestInitiated() {
+
+    }
+
+    @Override
+    public void bytesPrefetched(long size) {
+
+    }
+
+    @Override
+    public void footerParsingFailed() {
+
+    }
+
+    @Override
+    public void streamReadCacheHit() {
+
+    }
+
     @Override
     public void close() {
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
index f3a10b209c2..2df6bcf8e4e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.junit.Test;
@@ -28,14 +29,26 @@
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_CACHE_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_READ_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_REQUEST_COALESCE_TOLERANCE;
+import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_SMALL_OBJECT_PREFETCH_ENABLED;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.io.Sizes.S_16K;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_32K;
+
 
 /**
  * S3A contract tests for vectored reads with the Analytics stream.
@@ -51,6 +64,18 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
     super(bufferType);
   }
 
+  private static final String REQUEST_COALESCE_TOLERANCE_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_REQUEST_COALESCE_TOLERANCE;
+
+  private static final String READ_BUFFER_SIZE_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_READ_BUFFER_SIZE;
+
+  private static final String SMALL_OBJECT_PREFETCH_ENABLED_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_SMALL_OBJECT_PREFETCH_ENABLED;
+
+  private static final String CACHE_TIMEOUT_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + AAL_CACHE_TIMEOUT;
+
   /**
    * Create a configuration.
    * @return a configuration
@@ -58,6 +83,28 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+
+    S3ATestUtils.disableFilesystemCaching(conf);
+
+    removeBaseAndBucketOverrides(conf,
+            REQUEST_COALESCE_TOLERANCE_KEY,
+            READ_BUFFER_SIZE_KEY,
+            SMALL_OBJECT_PREFETCH_ENABLED_KEY,
+            CACHE_TIMEOUT_KEY);
+
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(REQUEST_COALESCE_TOLERANCE_KEY, S_16K);
+
+    // Set the minimum block size to 32KB. AAL uses a default block size of 
128KB, which means the minimum size a S3
+    // request will be is 128KB. Since the file being read is 128KB, we need 
to  use this here to demonstrate that
+    // separate GET requests are made for ranges that are not coalesced.
+    conf.setInt(READ_BUFFER_SIZE_KEY, S_32K);
+
+    // Disable small object prefetched, otherwise anything less than 8MB is 
fetched in a single GET.
+    conf.set(SMALL_OBJECT_PREFETCH_ENABLED_KEY, "false");
+
+    conf.setInt(CACHE_TIMEOUT_KEY, 5000);
+
     enableAnalyticsAccelerator(conf);
     // If encryption is set, some AAL tests will fail.
     // This is because AAL caches the head request response, and uses
@@ -96,21 +143,41 @@ public void testNullReleaseOperation()  {
 
   @Test
   public void testReadVectoredWithAALStatsCollection() throws Exception {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(800, 200));
+    fileRanges.add(FileRange.createFileRange(4 * S_1K, 4 * S_1K));
+    fileRanges.add(FileRange.createFileRange(80 * S_1K, 4 * S_1K));
 
-    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = openVectorFile()) {
       in.readVectored(fileRanges, getAllocate());
 
       validateVectoredReadResult(fileRanges, DATASET, 0);
       IOStatistics st = in.getIOStatistics();
 
-      // Statistics such as GET requests will be added after IoStats support.
       verifyStatisticCounterValue(st,
               StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
 
       verifyStatisticCounterValue(st,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               1);
+
+      // Verify ranges are coalesced, we are using a coalescing tolerance of 
16KB, so [0-100, 800-200, 4KB-8KB] will
+      // get coalesced into a single range.
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, 4);
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, 2);
+
+      verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+      // read the same ranges again to demonstrate that the data is cached, 
and no new GETs are made.
+      in.readVectored(fileRanges, getAllocate());
+      verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+      // Because of how AAL is currently written, it is not possible to track 
cache hits that originate from a
+      // readVectored() accurately. For this reason, cache hits from 
readVectored are currently not tracked, for more
+      // details see: 
https://github.com/awslabs/analytics-accelerator-s3/issues/359
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_CACHE_HIT, 0);
     }
+
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index df9cadc51d9..cdd6e11b08f 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -32,7 +32,6 @@
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 
 
@@ -93,12 +92,6 @@ protected Configuration createConfiguration() {
 
   @Override
   public void testOverwriteExistingFile() throws Throwable {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testOverwriteExistingFile();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index f91479abe86..944b98b4d98 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -20,7 +20,6 @@
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageStatistics;
@@ -81,15 +80,6 @@ public void testNonDirectWrite() throws Exception {
 
   @Override
   public void testDistCpUpdateCheckFileSkip() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata and 
data, and when a
-    // file is overwritten, the old data continues to be used, until it is 
removed from the
-    // cache over time. This will be fixed in
-    // https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    // In this test case, the remote file is created, read, then deleted, and 
then created again
-    // with different contents, and read again, which leads to assertions 
failing.
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator Library does not support update to existing 
files");
     super.testDistCpUpdateCheckFileSkip();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 27c8732b0c7..8fcc366d7cf 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -42,7 +42,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.impl.ChecksumSupport.getChecksumAlgorithm;
 import static 
org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
 
@@ -160,12 +159,6 @@ public void 
testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc
   @Override
   public void testConcurrentUploads() throws Throwable {
     assumeNotS3ExpressFileSystem(getFileSystem());
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata and 
data, and when a file
-    // is overwritten, the old data continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testConcurrentUploads();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index 209e78b822b..e3f0f8ea4b5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -43,14 +43,27 @@
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static 
org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
-import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_1M;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_KB;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB;
 
 /**
  * Tests integration of the
@@ -88,6 +101,16 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
 
     S3AFileSystem fs =
         (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+
+    final long initialAuditCount = fs.getIOStatistics().counters()
+            .getOrDefault(AUDIT_REQUEST_EXECUTION, 0L);
+
+   long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+   // Head request for the file length.
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+            initialAuditCount + 1);
+
     byte[] buffer = new byte[500];
     IOStatistics ioStats;
 
@@ -105,9 +128,21 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
@@ -115,7 +150,70 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+            initialAuditCount + 1 + 4);
+  }
+
+  @Test
+  public void testSequentialPrefetching() throws IOException {
+
+    Configuration conf = getConfiguration();
+
+    // AAL uses a caffeine cache, and expires any prefetched data for a key 1s 
after it was last accessed by default.
+    // While this works well when running on EC2, for local testing, it can 
take more than 1s to download large chunks
+    // of data. Set this value to higher for testing to prevent early cache 
evictions.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + AAL_CACHE_TIMEOUT, 10000);
+
+    S3AFileSystem fs =
+            (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+    byte[] buffer = new byte[10 * ONE_MB];
+    IOStatistics ioStats;
+
+    long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+    // Here we read through the 21MB external test file, but do not pass in 
the WHOLE_FILE policy. Instead, we rely
+    // on AAL detecting a sequential pattern being read, and then prefetching 
bytes in a geometrical progression.
+    // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 
16MB etc. depending on how many
+    // sequential reads happen.
+    try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+      ioStats = inputStream.getIOStatistics();
+
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // The first sequential read, so prefetch the next 4MB.
+      inputStream.readFully(buffer,   0, ONE_MB);
+
+      // Since ONE_MB was requested by the reader, the prefetched bytes are 
3MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+
+      // These next two reads are within the last prefetched bytes, so no 
further bytes are prefetched.
+      inputStream.readFully(buffer, 0, 2 *  ONE_MB);
+      inputStream.readFully(buffer, 0, ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+      // Two cache hits, as the previous two reads were already prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+      // Another sequential read, GP will now prefetch the next 8MB of data.
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // Cache hit is still 2, as the previous read required a new GET request 
as it was outside the previously fetched
+      // 4MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+      // A total of 10MB is prefetched - 3MB and then 7MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * 
ONE_MB);
+      long bytesRemainingForPrefetch = fileLength - (inputStream.getPos() + 10 
* ONE_MB);
+      inputStream.readFully(buffer, 0, 10 * ONE_MB);
+
+
+      // Though the next GP should prefetch 16MB, since the file is ~23MB, 
only the bytes till EoF are prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
+              10 * ONE_MB + bytesRemainingForPrefetch);
+      inputStream.readFully(buffer, 0, 3 * ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 3);
+    }
+
+    // verify all AAL stats are passed to the FS.
+    verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_CACHE_HIT, 
3);
+    verifyStatisticCounterValue(fs.getIOStatistics(), 
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 0);
   }
 
   @Test
@@ -134,9 +232,33 @@ public void testMalformedParquetFooter() throws 
IOException {
     Path sourcePath = new Path(file.toURI().getPath());
     getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
 
+    long fileLength = getFileSystem().getFileStatus(dest).getLen();
+
     byte[] buffer = new byte[500];
     IOStatistics ioStats;
+    int bytesRead;
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.seek(5);
+      bytesRead = inputStream.read(buffer, 0, 500);
+
+      ObjectInputStream objectInputStream = (ObjectInputStream) 
inputStream.getWrappedStream();
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(bytesRead);
 
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    // This file has a content length of 451. Since it's a parquet file, AAL 
will prefetch the footer bytes (last 32KB),
+    // as soon as the file is opened, but because the file is < 32KB, the 
whole file is prefetched.
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength);
+    
+    // Open a stream to the object twice, verifying that data is cached, and 
streams to the same object, do not
+    // prefetch the same data twice.
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
       ioStats = inputStream.getIOStatistics();
       inputStream.seek(5);
@@ -144,6 +266,10 @@ public void testMalformedParquetFooter() throws 
IOException {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    // No data is prefetched, as it already exists in the cache from the 
previous factory.
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 0);
   }
 
   /**
@@ -167,6 +293,7 @@ public void testMultiRowGroupParquet() throws Throwable {
     final int size = 3000;
     byte[] buffer = new byte[size];
     int readLimit = Math.min(size, (int) fileStatus.getLen());
+
     IOStatistics ioStats;
 
     final IOStatistics fsIostats = getFileSystem().getIOStatistics();
@@ -179,6 +306,13 @@ public void testMultiRowGroupParquet() throws Throwable {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+
+    // S3A makes a HEAD request on the stream open(), and then AAL makes a GET 
request to get the object, total audit
+    // operations = 10.
+    long currentAuditCount = initialAuditCount + 2;
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+            AUDIT_REQUEST_EXECUTION, currentAuditCount);
 
     try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
         .withFileStatus(fileStatus)
@@ -186,11 +320,17 @@ public void testMultiRowGroupParquet() throws Throwable {
         .build().get()) {
       ioStats = inputStream.getIOStatistics();
       inputStream.readFully(buffer, 0, readLimit);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) 
fileStatus.getLen());
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
 
-    verifyStatisticCounterValue(fsIostats, AUDIT_REQUEST_EXECUTION, 
initialAuditCount + 2);
+    // S3A passes in the meta-data(content length) on file open,
+    // we expect AAL to make no HEAD requests
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
   }
 
   @Test
@@ -210,4 +350,72 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = path("seek-test.txt");
+    byte[] data = dataset(5 * S_1M, 256, 255);
+    writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
+
+    byte[] buffer = new byte[S_1M];
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.read(buffer);
+      inputStream.seek(2 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+      inputStream.seek(3 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    }
+
+    // We did 3 reads, and all of them were served from the cache
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), 
STREAM_READ_CACHE_HIT, 3);
+  }
+
+
+  @Test
+  public void testSequentialStreamsNoDuplicateGets() throws Throwable {
+    describe("Sequential streams reading same object should not duplicate 
GETs");
+
+    Path dest = path("sequential-test.txt");
+    int fileLen = S_1M;
+
+    byte[] data = dataset(fileLen, 256, 255);
+    writeDataset(getFileSystem(), dest, data, fileLen, 1024, true);
+
+    byte[] buffer = new byte[ONE_MB];
+    try (FSDataInputStream stream1 = getFileSystem().open(dest);
+         FSDataInputStream stream2 = getFileSystem().open(dest)) {
+
+      stream1.read(buffer, 0, 2 * ONE_KB);
+      stream2.read(buffer);
+      stream1.read(buffer, 0, 10 * ONE_KB);
+
+      IOStatistics stats1 = stream1.getIOStatistics();
+      IOStatistics stats2 = stream2.getIOStatistics();
+
+      verifyStatisticCounterValue(stats1, ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(stats2, ACTION_HTTP_HEAD_REQUEST, 0);
+
+      // Since it's a small file (ALL will prefetch the whole file for size < 
8MB), the whole file is prefetched
+      // on the first read.
+      verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, 
fileLen);
+
+      // The second stream will not prefetch any bytes, as they have already 
been prefetched by stream 1.
+      verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0);
+    }
+
+    // verify value is passed up to the FS
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+            STREAM_READ_PREFETCHED_BYTES, fileLen);
+
+    // We did 3 reads, all of them were served from the small object cache. In 
this case, the whole object was
+    // downloaded as soon as the stream to it was opened.
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), 
STREAM_READ_CACHE_HIT, 3);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index 3c405cb7c51..6595eae1640 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -32,7 +32,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 /**
  * S3A Test suite for the FSMainOperationsBaseTest tests.
@@ -81,23 +80,11 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
 
   @Override
   public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testWriteReadAndDeleteOneAndAHalfBlocks();
   }
 
   @Override
   public void testWriteReadAndDeleteTwoBlocks() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testWriteReadAndDeleteTwoBlocks();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 02d56795890..c62779d1562 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -37,7 +37,6 @@
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assume.*;
 import static org.junit.Assert.*;
@@ -165,12 +164,6 @@ public void testOverwrite() throws IOException {
 
   @Override
   public void testOverWriteAndRead() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testOverWriteAndRead();
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index daf5306dc39..5b38ba1cd87 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -42,8 +42,9 @@
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -70,6 +71,7 @@ public class ITestS3AIOStatisticsContext extends 
AbstractS3ATestBase {
   protected Configuration createConfiguration() {
     Configuration configuration = super.createConfiguration();
     disablePrefetching(configuration);
+    configuration.setEnum(INPUT_STREAM_TYPE, Classic);
     enableIOStatisticsContext();
     return configuration;
   }
@@ -78,10 +80,7 @@ protected Configuration createConfiguration() {
   public void setup() throws Exception {
     super.setup();
     executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
-    // Analytics accelerator currently does not support IOStatisticsContext, 
this will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support 
IOStatisticsContext");
+
 
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 4ec579ce4f6..6af6ceab8a5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,7 +28,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 
 /**
@@ -52,10 +51,6 @@ public void testMetricsRegister()
 
   @Test
   public void testStreamStatistics() throws IOException {
-     // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
 
     S3AFileSystem fs = getFileSystem();
     Path file = path("testStreamStatistics");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 46610429f26..19f25aa8209 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -303,4 +303,24 @@ public interface S3ATestConstants {
    * Default value of {@link #MULTIPART_COMMIT_CONSUMES_UPLOAD_ID}: {@value}.
    */
   boolean DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = false;
+
+  /**
+   * Ranges within this distance of each other will be coalesced.
+   */
+  String AAL_REQUEST_COALESCE_TOLERANCE = 
"physicalio.request.coalesce.tolerance";
+
+  /**
+   * The minimum size of a block in AAL.
+   */
+  String AAL_READ_BUFFER_SIZE = "physicalio.readbuffersize";
+
+  /**
+   * Objects smaller than this will be downloaded completely.
+   */
+  String AAL_SMALL_OBJECT_PREFETCH_ENABLED = 
"physicalio.small.objects.prefetching.enabled";
+
+  /**
+   * Objects in AAL's cache will expire after this duration.
+   */
+  String AAL_CACHE_TIMEOUT = "physicalio.cache.timeout";
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index 2b288bfec0a..8d9a5f3b46d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -41,7 +41,6 @@
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -206,10 +205,6 @@ private void abortActiveStream() throws IOException {
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
     describe("Files created under magic paths skip existence checks and marker 
deletes");
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path destFile = methodSubPath("file.txt");
     fs.delete(destFile.getParent(), true);
@@ -289,10 +284,6 @@ public void testCostOfCreatingMagicFile() throws Throwable 
{
   public void testCostOfSavingLoadingPendingFile() throws Throwable {
     describe("Verify costs of saving .pending file under a magic path");
 
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path partDir = methodSubPath("file.pending");
     Path destFile = new Path(partDir, "file.pending");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 5b489c1c39c..1724006a831 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,8 +30,6 @@
 import org.junit.Assert;
 import org.junit.Before;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
-
 /**
  * S3a implementation of FCStatisticsBaseTest.
  */
@@ -46,10 +44,6 @@ public class ITestS3AFileContextStatistics extends 
FCStatisticsBaseTest {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(conf,
-        "Analytics Accelerator currently does not support stream statistics");
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 57e63403449..7905126afc3 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -57,7 +57,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -70,6 +69,7 @@
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 
@@ -98,6 +98,16 @@ public ITestS3AOpenCost() {
     super(true);
   }
 
+  /**
+   * Is the analytics stream enabled?
+   */
+  private boolean analyticsStream;
+
+  /**
+   * Is the classic input stream enabled?
+   */
+  private boolean classicInputStream;
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -115,17 +125,14 @@ public Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
-
     writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
     fileLength = (int)testFileStatus.getLen();
     prefetching = prefetching();
+    analyticsStream = isAnalyticsStream();
+    classicInputStream = isClassicInputStream();
   }
 
   /**
@@ -179,6 +186,10 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
     // if prefetching is enabled, skip this test
     assumeNoPrefetching();
+    // If AAL is enabled, skip this test. AAL uses S3A's default S3 client, 
and if checksumming is disabled on the
+    // client, then AAL will also not enforce it.
+    assumeNotAnalytics();
+
     S3AFileSystem fs = getFileSystem();
 
     // open the file
@@ -195,6 +206,7 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
       // open the stream.
       in.read();
+
       // now examine the innermost stream and make sure it doesn't have a 
checksum
       assertStreamIsNotChecksummed(getS3AInputStream(in));
     }
@@ -202,6 +214,11 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
   @Test
   public void testOpenFileShorterLength() throws Throwable {
+
+    // For AAL, since it makes the HEAD to get the file length if the eTag is 
not supplied,
+    // it is not able to use the file length supplied in the open() call, and 
the test fails.
+    assumeNotAnalytics();
+
     // do a second read with the length declared as short.
     // we now expect the bytes read to be shorter.
     S3AFileSystem fs = getFileSystem();
@@ -246,7 +263,6 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
     final int extra = 10;
     long longLen = fileLength + extra;
 
-
     // assert behaviors of seeking/reading past the file length.
     // there is no attempt at recovery.
     verifyMetrics(() -> {
@@ -266,7 +282,9 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
         // two GET calls were made, one for readFully,
         // the second on the read() past the EOF
         // the operation has got as far as S3
-        probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
+        probe(classicInputStream, STREAM_READ_OPENED, 1 + 1),
+        // For AAL, the seek past content length fails, before the GET is made.
+        probe(analyticsStream, STREAM_READ_OPENED, 1));
 
     // now on a new stream, try a full read from after the EOF
     verifyMetrics(() -> {
@@ -277,10 +295,6 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
         return in.toString();
       }
     },
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
-        // the operation has got as far as S3
-
         with(STREAM_READ_OPENED, 1));
   }
 
@@ -350,7 +364,9 @@ public void testReadPastEOF() throws Throwable {
       }
     },
         always(),
-        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
+        probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, extra),
+        // AAL won't make the GET call if trying to read beyond EOF
+        probe(analyticsStream, Statistic.ACTION_HTTP_GET_REQUEST, 0));
   }
 
   /**
@@ -441,18 +457,28 @@ public void testVectorReadPastEOF() throws Throwable {
         byte[] buf = new byte[longLen];
         ByteBuffer bb = ByteBuffer.wrap(buf);
         final FileRange range = FileRange.createFileRange(0, longLen);
-        in.readVectored(Arrays.asList(range), (i) -> bb);
-        interceptFuture(EOFException.class,
-            "",
-            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
-            TimeUnit.SECONDS,
-            range.getData());
-        assertS3StreamClosed(in);
-        return "vector read past EOF with " + in;
+
+        // For AAL, if there is no eTag, the provided length will not be 
passed in, and a HEAD request will be made.
+        // AAL requires the etag to detect changes in the object and then do 
cache eviction if required.
+        if (isAnalyticsStream()) {
+          intercept(EOFException.class, () ->
+                  in.readVectored(Arrays.asList(range), (i) -> bb));
+          verifyStatisticCounterValue(in.getIOStatistics(), 
ACTION_HTTP_HEAD_REQUEST, 1);
+          return "vector read past EOF with " + in;
+        } else {
+          in.readVectored(Arrays.asList(range), (i) -> bb);
+          interceptFuture(EOFException.class,
+                  "",
+                  
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+                  TimeUnit.SECONDS,
+                  range.getData());
+          assertS3StreamClosed(in);
+          return "vector read past EOF with " + in;
+        }
       }
     },
         always(),
-        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
+        probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, 1));
   }
 
   /**
@@ -463,6 +489,22 @@ private boolean prefetching()  {
     return InputStreamType.Prefetch == streamType(getFileSystem());
   }
 
+  /**
+   * Is the current stream type Analytics?
+   * @return true if Analytics stream is enabled.
+   */
+  private boolean isAnalyticsStream() {
+    return InputStreamType.Analytics == streamType(getFileSystem());
+  }
+
+  /**
+   * Is the current input stream type S3AInputStream?
+   * @return true if the S3AInputStream is being used.
+   */
+  private boolean isClassicInputStream() {
+    return InputStreamType.Classic == streamType(getFileSystem());
+  }
+
   /**
    * Skip the test if prefetching is enabled.
    */
@@ -472,6 +514,12 @@ private void assumeNoPrefetching(){
     }
   }
 
+  private void assumeNotAnalytics() {
+    if (analyticsStream) {
+      skip("Analytics stream is enabled");
+    }
+  }
+
   /**
    * Assert that the inner S3 Stream is closed.
    * @param in input stream
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index 45bf5c58f26..40b04e7ecf5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -46,6 +46,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
 import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
 import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
 import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
@@ -55,6 +56,7 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsSeconds;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ABORTED;
@@ -106,6 +108,7 @@ public ITestUnbufferDraining() {
   @Override
   public Configuration createConfiguration() {
     Configuration conf = disablePrefetching(super.createConfiguration());
+    conf.setEnum(INPUT_STREAM_TYPE, Classic);
     removeBaseAndBucketOverrides(conf,
         ASYNC_DRAIN_THRESHOLD,
         CHECKSUM_VALIDATION,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index e0c71136e80..d4ad1eeccc6 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -61,6 +61,7 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
@@ -102,6 +103,7 @@ public class ITestS3AInputStreamPerformance extends 
S3AScaleTestBase {
   @Override
   protected Configuration createScaleConfiguration() {
     Configuration conf = disablePrefetching(super.createScaleConfiguration());
+    conf.setEnum(INPUT_STREAM_TYPE, Classic);
     if (isUsingDefaultExternalDataFile(conf)) {
       S3ATestUtils.removeBaseAndBucketOverrides(
           conf,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index da2a39a986e..f6b3c97692a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
 
 /**
@@ -81,10 +80,6 @@ public List<String> outputStreamStatisticKeys() {
 
   @Override
   public void testInputStreamStatisticRead() throws Throwable {
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator currently does not support stream statistics");
     super.testInputStreamStatisticRead();
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 376dcdf727f..997f641e8ac 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,7 +31,6 @@
 import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
 
@@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends 
AbstractS3ATestBase {
    */
   @Test
   public void testBytesReadWithStream() throws IOException {
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path filePath = path(getMethodName());
     byte[] oneKbBuf = new byte[ONE_KB];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to