This is an automated email from the ASF dual-hosted git repository.

anujmodi2021 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22008a241a8 HADOOP-1593. [ABFS] Add vectored read support in ABFS 
driver  (#8400)
22008a241a8 is described below

commit 22008a241a81a5f0a77cf55ead3546502c71df99
Author: Anmol Asrani <[email protected]>
AuthorDate: Fri May 15 11:56:44 2026 +0530

    HADOOP-1593. [ABFS] Add vectored read support in ABFS driver  (#8400)
    
    Contributed by Anmol Asrani
---
 hadoop-tools/hadoop-azure/pom.xml                  |   2 +
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  71 +++
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  26 +
 .../constants/FileSystemConfigurations.java        |   4 +
 .../hadoop/fs/azurebfs/constants/ReadType.java     |  10 +
 .../hadoop/fs/azurebfs/enums/BufferType.java       |  36 ++
 .../fs/azurebfs/enums/VectoredReadStrategy.java    |  77 +++
 .../fs/azurebfs/services/AbfsInputStream.java      |  57 +-
 .../hadoop/fs/azurebfs/services/ReadBuffer.java    | 123 +++-
 .../fs/azurebfs/services/ReadBufferManager.java    | 124 ++++
 .../fs/azurebfs/services/ReadBufferManagerV1.java  | 281 ++++++--
 .../fs/azurebfs/services/ReadBufferManagerV2.java  | 237 ++++++-
 .../fs/azurebfs/services/VectoredReadHandler.java  | 696 ++++++++++++++++++++
 .../azurebfs/services/ITestReadBufferManager.java  |   2 +-
 .../fs/azurebfs/services/ITestVectoredRead.java    | 703 +++++++++++++++++++++
 15 files changed, 2375 insertions(+), 74 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/pom.xml 
b/hadoop-tools/hadoop-azure/pom.xml
index 4311a4a2a80..177fbdc2335 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -456,6 +456,7 @@
                     
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
                     
<exclude>**/azurebfs/ITestAbfsStreamStatistics*.java</exclude>
                     
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
+                    
<exclude>**/azurebfs/services/ITestVectoredRead.java</exclude>
                     <exclude>**/azurebfs/commit/*.java</exclude>
                   </excludes>
 
@@ -497,6 +498,7 @@
                     
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
                     
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
                     
<include>**/azurebfs/ITestAbfsStreamStatistics*.java</include>
+                    
<include>**/azurebfs/services/ITestVectoredRead.java</include>
                     <include>**/azurebfs/commit/*.java</include>
                   </includes>
                 </configuration>
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 0a32a050775..69362983ecb 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -51,6 +51,7 @@
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
 import 
org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
 import org.apache.hadoop.fs.azurebfs.enums.Trilean;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
 import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
@@ -623,6 +624,22 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
   private int prefetchRequestPriorityValue;
 
+  @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_VECTORED_READ_STRATEGY,
+      DefaultValue = DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY)
+  private String vectoredReadStrategy;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_MIN_SEEK_FOR_VECTORED_READS,
+  DefaultValue = DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS)
+  private int minSeekForVectoredReads;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS,
+      DefaultValue = DEFAULT_FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS)
+  private int maxReadSizeForVectoredReads;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS_THROUGHPUT,
+      DefaultValue = 
DEFAULT_FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS_THROUGHPUT)
+  private int maxReadSizeForVectoredReadsThroughput;
+
   @StringConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_READ_POLICY,
           DefaultValue = DEFAULT_AZURE_READ_POLICY)
   private String abfsReadPolicy;
@@ -2179,4 +2196,58 @@ public int getTailLatencyAnalysisWindowGranularity() {
   public int getTailLatencyMaxRetryCount() {
     return tailLatencyMaxRetryCount;
   }
+
+  /**
+   * Returns the configured vectored read strategy.
+   *
+   * <p>
+   * The configuration value is parsed in a case-insensitive manner and may be
+   * specified either using the enum name (for example,
+   * {@code TPS_OPTIMIZED}) or the short name (for example, {@code TPS}).
+   * </p>
+   *
+   * @return the resolved {@link VectoredReadStrategy}
+   *
+   * @throws IllegalArgumentException if the configured value is invalid
+   */
+  public VectoredReadStrategy getVectoredReadStrategy() {
+    try {
+      return VectoredReadStrategy.fromString(vectoredReadStrategy);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          "Invalid value for " + FS_AZURE_VECTORED_READ_STRATEGY
+              + ": " + vectoredReadStrategy
+              + ". Expected one of: TPS, THROUGHPUT, TPS_OPTIMIZED, 
THROUGHPUT_OPTIMIZED",  e);
+    }
+  }
+
+  /**
+   * Returns the minimum gap between adjacent read ranges that qualifies them
+   * for merging during vectored reads.
+   *
+   * @return minimum gap threshold for range merging
+   */
+  public int getMinSeekForVectoredReads() {
+    return minSeekForVectoredReads;
+  }
+
+  /**
+   * Returns the maximum gap between adjacent read ranges allowed when
+   * considering them for merging during vectored reads.
+   *
+   * @return maximum gap threshold for range merging
+   */
+  public int getMaxReadSizeForVectoredReads() {
+    return maxReadSizeForVectoredReads;
+  }
+
+  /**
+   * Returns the maximum gap between adjacent read ranges allowed when
+   * considering them for merging during vectored reads throughput optimized..
+   *
+   * @return maximum gap threshold for range merging
+   */
+  public int getMaxReadSizeForVectoredReadsThroughput() {
+    return maxReadSizeForVectoredReadsThroughput;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index d2dbe176508..5b98ea7901c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -627,6 +627,32 @@ public static String containerProperty(String property, 
String fsName, String ac
    */
   public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 
"fs.azure.tail.latency.max.retry.count";
 
+  /**
+   * Configuration key to control the vectored read strategy used by ABFS: 
{@value}
+   */
+  public static final String FS_AZURE_VECTORED_READ_STRATEGY = 
"fs.azure.vectored.read.strategy";
+
+  /**
+   * Configuration key that defines the minimum gap between adjacent read 
ranges
+   * for merging ranges during vectored reads in ABFS: {@value}.
+   */
+  public static final String FS_AZURE_MIN_SEEK_FOR_VECTORED_READS =
+      "fs.azure.min.seek.for.vectored.reads";
+
+  /**
+   * Configuration key that defines the maximum gap between adjacent read 
ranges
+   * for merging ranges during vectored reads in ABFS: {@value}.
+   */
+  public static final String FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS =
+      "fs.azure.vectored.read.max.merged.size";
+
+  /**
+   * Configuration key that defines the maximum gap between adjacent read 
ranges
+   * for merging ranges during vectored reads in ABFS throughput optimized: 
{@value}.
+   */
+  public static final String 
FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS_THROUGHPUT =
+      "fs.azure.vectored.read.max.merged.size.throughput";
+
   /**
    * If true, restricts GPS (getPathStatus) calls on openFileforRead
    * Default: false
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 328c878401c..875dfb55768 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -306,6 +306,10 @@ public final class FileSystemConfigurations {
   public static final int 
MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1;
   public static final int 
DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500;
   public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1;
+  public static final String DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY = "TPS";
+  public static final int DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = 
ONE_MB;
+  public static final int 
DEFAULT_FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS = 4 * ONE_MB;
+  public static final int 
DEFAULT_FS_AZURE_MAX_MERGED_READ_SIZE_FOR_VECTORED_READS_THROUGHPUT = 8 * 
ONE_MB;
   public static final boolean DEFAULT_FS_AZURE_RESTRICT_GPS_ON_OPENFILE = 
false;
 
   private FileSystemConfigurations() {}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
index 51391cc7477..056a0aba498 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java
@@ -48,6 +48,16 @@ public enum ReadType {
    * Only triggered when small file read optimization kicks in.
    */
   SMALLFILE_READ("SR"),
+  /**
+   * Read multiple disjoint ranges from the storage service using vectored 
reads.
+   * Used to coalesce and execute non-contiguous reads efficiently.
+   */
+  VECTORED_READ("VR"),
+  /**
+   * Performs a vectored direct read by fetching multiple non-contiguous
+   * ranges in a single operation.
+   */
+  VECTORED_DIRECT_READ("VDR"),
   /**
    * Reads from Random Input Stream with read ahead up to readAheadRange
    */
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java
new file mode 100644
index 00000000000..85c6d1e2e7d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+import org.apache.hadoop.fs.azurebfs.services.ReadBuffer;
+
+/**
+ * Enum for buffer types.
+ * Used in {@link ReadBuffer} to separate normal vs vectored read.
+ */
+public enum BufferType {
+  /**
+   * Normal read buffer.
+   */
+  NORMAL,
+  /**
+   * Vectored read buffer.
+   */
+  VECTORED
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java
new file mode 100644
index 00000000000..5fc9700c154
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Defines the strategy used for vectored reads in ABFS.
+ *
+ * <p>
+ * The strategy controls how read ranges are planned and executed, trading off
+ * between request parallelism and per-request payload size.
+ * </p>
+ */
+public enum VectoredReadStrategy {
+
+  /**
+   * Optimizes for transactions per second (TPS).
+   */
+  TPS_OPTIMIZED("TPS"),
+
+  /**
+   * Optimizes for overall data throughput.
+   */
+  THROUGHPUT_OPTIMIZED("THROUGHPUT");
+
+  /** Short name used for configuration and logging. */
+  private final String name;
+
+  /**
+   * Constructs a vectored read strategy with a short, user-friendly name.
+   *
+   * @param name short identifier for the strategy
+   */
+  VectoredReadStrategy(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Returns the short name of the vectored read strategy.
+   *
+   * @return short strategy name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Parses a configuration value into a {@link VectoredReadStrategy}.
+   * @param value configuration value
+   * @return matching vectored read strategy
+   * @throws IllegalArgumentException if the value is invalid
+   */
+  public static VectoredReadStrategy fromString(String value) {
+    for (VectoredReadStrategy strategy : values()) {
+      if (strategy.name().equalsIgnoreCase(value)
+          || strategy.getName().equalsIgnoreCase(value)) {
+        return strategy;
+      }
+    }
+    throw new IllegalArgumentException("Invalid vectored read strategy: " + 
value);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index eba5b093377..acf71127c53 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -22,39 +22,41 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.UUID;
+import java.util.function.IntFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
 import org.apache.hadoop.fs.azurebfs.constants.ReadType;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.impl.BackReference;
-import org.apache.hadoop.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
 import org.apache.hadoop.fs.azurebfs.utils.Listener;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.util.Preconditions;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
-
 import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
 import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getRelativePath;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
@@ -62,8 +64,6 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INVALID_RANGE;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_READ_ON_DIRECTORY;
-import static org.apache.hadoop.io.Sizes.S_128K;
-import static org.apache.hadoop.io.Sizes.S_2M;
 import static org.apache.hadoop.util.StringUtils.toLowerCase;
 
 /**
@@ -133,6 +133,7 @@ public abstract class AbfsInputStream extends FSInputStream 
implements CanUnbuff
   private final AbfsInputStreamContext context;
   private IOStatistics ioStatistics;
   private String filePathIdentifier;
+
   /**
    * This is the actual position within the object, used by
    * lazy seek to decide whether to seek on the next read or not.
@@ -207,7 +208,7 @@ public AbfsInputStream(
           readAheadBlockSize, client.getAbfsConfiguration());
       readBufferManager = 
ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     } else {
-      ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
+      ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize, 
client.getAbfsConfiguration());
       readBufferManager = ReadBufferManagerV1.getBufferManager();
     }
 
@@ -228,6 +229,14 @@ private String createInputStreamId() {
     return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN);
   }
 
+  /**
+   * Retrieves the handler responsible for processing vectored read requests.
+   * @return the {@link VectoredReadHandler} instance associated with the 
buffer manager.
+   */
+  VectoredReadHandler getVectoredReadHandler() {
+    return getReadBufferManager().getVectoredReadHandler();
+  }
+
   @Override
   public int read(long position, byte[] buffer, int offset, int length)
       throws IOException {
@@ -344,6 +353,19 @@ public synchronized int read(final byte[] b, final int 
off, final int len) throw
     return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
   }
 
+  /**
+   * {@inheritDoc}
+   * Vectored read implementation for AbfsInputStream.
+   *
+   * @param ranges the byte ranges to read.
+   * @param allocate the function to allocate ByteBuffer.
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+      IntFunction<ByteBuffer> allocate) throws EOFException {
+    getVectoredReadHandler().readVectored(this, ranges, allocate);
+  }
+
   private boolean shouldReadFully() {
     return this.firstRead && !shouldRestrictGpsOnOpenFile() && 
this.context.readSmallFilesCompletely()
         && this.contentLength <= this.bufferSize;
@@ -910,7 +932,10 @@ public synchronized void unbuffer() {
 
   @Override
   public boolean hasCapability(String capability) {
-    return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
+    return switch (toLowerCase(capability)) {
+      case StreamCapabilities.UNBUFFER, StreamCapabilities.VECTOREDIO -> true;
+      default -> false;
+    };
   }
 
   /**
@@ -1184,7 +1209,7 @@ ReadBufferManager getReadBufferManager() {
    */
   @Override
   public int minSeekForVectorReads() {
-    return S_128K;
+    return client.getAbfsConfiguration().getMinSeekForVectoredReads();
   }
 
   /**
@@ -1193,7 +1218,7 @@ public int minSeekForVectorReads() {
    */
   @Override
   public int maxReadSizeForVectorReads() {
-    return S_2M;
+    return client.getAbfsConfiguration().getMaxReadSizeForVectoredReads();
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index a6aa75f59d2..c920531c17e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -19,15 +19,22 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntFunction;
 
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.enums.BufferType;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED;
 
-class ReadBuffer {
+public class ReadBuffer {
 
   private AbfsInputStream stream;
   private String eTag;
@@ -48,6 +55,13 @@ class ReadBuffer {
   private boolean isLastByteConsumed = false;
   private boolean isAnyByteConsumed = false;
   private AtomicInteger refCount = new AtomicInteger(0);
+  private BufferType bufferType = BufferType.NORMAL;
+  // list of combined file ranges for vectored read.
+  private List<CombinedFileRange> vectoredUnits = new ArrayList<>();
+  // Allocator used for vectored fan-out; captured at queue time */
+  private IntFunction<ByteBuffer> allocator;
+  // Tracks whether fanOut has already been executed
+  private final AtomicBoolean fanOutDone = new AtomicBoolean(false);
 
   private IOException errException = null;
 
@@ -199,4 +213,111 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) 
{
   public boolean isFullyConsumed() {
     return isFirstByteConsumed() && isLastByteConsumed();
   }
+
+  /**
+   * Add a logical vectored read unit associated with this buffer.
+   *
+   * <p>Each {@link CombinedFileRange} represents a logical range requested
+   * by the caller that maps to this buffer-sized physical read. Multiple
+   * logical units may be attached to a single buffer when their ranges
+   * fall within the same physical read span.</p>
+   *
+   * @param u logical vectored read unit to associate with this buffer
+   */
+  void addVectoredUnit(CombinedFileRange u) {
+    vectoredUnits.add(u);
+  }
+
+  /**
+   * Get all logical vectored read units attached to this buffer.
+   *
+   * <p>The returned list contains the {@link CombinedFileRange}s whose
+   * requested data resides within the physical data stored in this buffer.
+   * These units will receive slices of the buffer during vectored fan-out.</p>
+   *
+   * @return list of logical vectored units mapped to this buffer
+   */
+  List<CombinedFileRange> getVectoredUnits() {
+    return vectoredUnits;
+  }
+
+  /**
+   * Clear all vectored units associated with this buffer.
+   *
+   * <p>This method removes all logical read units that were previously
+   * attached to this buffer. It is typically invoked when the buffer
+   * is being reset or returned to the buffer pool for reuse.</p>
+   */
+  void clearVectoredUnits() {
+    if (vectoredUnits != null) {
+      vectoredUnits.clear();
+    }
+  }
+
+  /**
+   * Return the type of this buffer.
+   *
+   * <p>The buffer type indicates how the buffer is used during the
+   * read pipeline (for example, normal read buffers versus vectored
+   * read buffers).</p>
+   *
+   * @return the {@link BufferType} of this buffer
+   */
+  public BufferType getBufferType() {
+    return bufferType;
+  }
+
+  /**
+   * Set the type of this buffer.
+   *
+   * <p>The buffer type determines the role of the buffer within the
+   * read pipeline, such as whether it is used for normal reads or
+   * vectored read operations.</p>
+   *
+   * @param bufferType buffer type to assign
+   */
+  public void setBufferType(final BufferType bufferType) {
+    this.bufferType = bufferType;
+  }
+
+  /**
+   * Set the allocator associated with this buffer.
+   *
+   * <p>The same allocator instance may be shared across multiple buffers
+   * belonging to a single vectored read operation. It is captured at
+   * queue time so it is available when the asynchronous read completes.</p>
+   *
+   * @param allocator allocator used for vectored fan-out
+   */
+  public void setAllocator(IntFunction<ByteBuffer> allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Return the allocator associated with this buffer.
+   *
+   * @return allocator used for vectored fan-out, or {@code null} for
+   *         non-vectored buffers
+   */
+  public IntFunction<ByteBuffer> getAllocator() {
+    return allocator;
+  }
+
+  /**
+   * Attempt to execute vectored fan-out exactly once for this buffer.
+   *
+   * @return {@code true} if the caller should perform fan-out; {@code false}
+   *         if fan-out has already been executed
+   */
+  boolean tryFanOut() {
+    return fanOutDone.compareAndSet(false, true);
+  }
+
+  /**
+   * @return {@code true} if vectored fan-out has already been executed
+   *         for this buffer; {@code false} otherwise
+   */
+  boolean isFanOutDone() {
+    return fanOutDone.get();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 5b53d641a20..87a86d6b0b9 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -19,18 +19,23 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.IntFunction;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE;
 
@@ -67,6 +72,18 @@ abstract void queueReadAhead(AbfsInputStream stream,
       int requestedLength,
       TracingContext tracingContext);
 
+  /**
+   * Queues a read-ahead request from {@link AbfsInputStream}
+   * for a given offset in file and given length.
+   *
+   * @param stream the input stream requesting the read-ahead
+   * @param unit buffer-sized vectored read unit to be queued
+   * @param tracingContext the tracing context for diagnostics
+   */
+  abstract boolean queueVectoredRead(AbfsInputStream stream,
+      CombinedFileRange unit,
+      TracingContext tracingContext, IntFunction<ByteBuffer> allocator);
+
   /**
    * Gets a block of data from the prefetched data by ReadBufferManager.
    * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
@@ -127,6 +144,14 @@ abstract void doneReading(ReadBuffer buffer,
   @VisibleForTesting
   abstract int getNumBuffers();
 
+  abstract VectoredReadHandler getVectoredReadHandler();
+
+  abstract VectoredReadStrategy getVectoredReadStrategy();
+
+  abstract int getMaxReadSizeForVectoredReads();
+
+  abstract int getMaxReadSizeForeVectoredReadsThroughput();
+
   /**
    * Attempts to evict buffers based on the eviction policy.
    */
@@ -285,5 +310,104 @@ protected void 
testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
     completedReadList.add(buf);
   }
 
+  /**
+   * Finds an existing {@link ReadBuffer} for the given stream whose buffered
+   * range covers the specified logical offset.
+   *
+   * <p>The search is performed in the read-ahead queue, in-progress list,
+   * and completed-read list, in that order.
+   *
+   * @param stream the {@link AbfsInputStream} associated with the read request
+   *
+   * @return a matching {@link ReadBuffer} if one exists, or {@code null} 
otherwise
+   */
+  ReadBuffer findQueuedBuffer(final AbfsInputStream stream,
+      long requestedOffset) {
+    ReadBuffer buffer;
+    buffer = findInList(getReadAheadQueue(), stream, requestedOffset);
+    if (buffer != null) {
+      return buffer;
+    }
+    buffer = findInList(getInProgressList(), stream, requestedOffset);
+    if (buffer != null) {
+      return buffer;
+    }
+    return findInList(getCompletedReadList(), stream, requestedOffset);
+  }
+
+  /**
+   * Searches the given collection of {@link ReadBuffer}s for one that belongs
+   * to the specified stream and whose buffered range covers the given offset.
+   *
+   * @param buffers the collection of {@link ReadBuffer}s to search
+   * @param stream the {@link AbfsInputStream} associated with the read request
+   *
+   * @return the matching {@link ReadBuffer}, or {@code null} if none is found
+   */
+  ReadBuffer findInList(final Collection<ReadBuffer> buffers,
+      final AbfsInputStream stream, long requestedOffset) {
+    for (ReadBuffer buffer : buffers) {
+      if (buffer.getStream() == stream
+          && requestedOffset >= buffer.getOffset()
+          && requestedOffset < buffer.getOffset()
+          + buffer.getRequestedLength()) {
+        return buffer;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Handle vectored-read completion for a buffer.
+   *
+   * <p>If the buffer participates in a vectored read, this method performs
+   * vectored fan-out exactly once when the physical read completes 
successfully,
+   * or fails all associated logical ranges on error. Vectored units are 
cleared
+   * after fan-out is finalized to allow safe publication or reuse of the 
buffer.</p>
+   *
+   * @param buffer the read buffer whose physical read has completed
+   * @param result the completion status of the physical read
+   * @param bytesActuallyRead number of bytes read from the backend
+   */
+  void handleVectoredCompletion(
+      ReadBuffer buffer,
+      ReadBufferStatus result,
+      int bytesActuallyRead) {
+    try {
+      if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+        if (buffer.tryFanOut()) {
+          getVectoredReadHandler().fanOut(buffer, bytesActuallyRead);
+        }
+      } else {
+        LOGGER.debug(
+            "Handling vectored completion for buffer with path: {}, offset: 
{}, length: {}, result: {}, bytesActuallyRead: {}",
+            buffer.getPath(), buffer.getOffset(), buffer.getRequestedLength(),
+            result, bytesActuallyRead);
+        throw new IOException(
+            "Vectored read failed for path: " + buffer.getPath()
+                + ", status=" + buffer.getStatus());
+      }
+    } catch (Exception e) {
+      // Fail all logical FileRange futures
+      getVectoredReadHandler().failBufferFutures(buffer, e);
+      buffer.setStatus(ReadBufferStatus.READ_FAILED);
+    } finally {
+      if (buffer.isFanOutDone()) {
+        // Must be cleared before publication / reuse
+        buffer.clearVectoredUnits();
+      }
+    }
+  }
+
   abstract void clearFreeList();
+
+  void printTraceLog(String message, Object... args) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace(message, args);
+    }
+  }
+
+  void printDebugLog(String message, Object... args) {
+    LOGGER.debug(message, args);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index c034d856596..e03931411e0 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -17,9 +17,8 @@
  */
 package org.apache.hadoop.fs.azurebfs.services;
 
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -27,10 +26,17 @@
 import java.util.List;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.IntFunction;
 
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.enums.BufferType;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
-import org.apache.hadoop.classification.VisibleForTesting;
 
 /**
  * The Read Buffer Manager for Rest AbfsClient.
@@ -46,21 +52,37 @@ public final class ReadBufferManagerV1 extends 
ReadBufferManager {
   private byte[][] buffers;
   private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
   private static ReadBufferManagerV1 bufferManager;
+  private final VectoredReadHandler vectoredReadHandler;
+  private static VectoredReadStrategy vectoredReadStrategy;
+  private static int maxReadSizeForVectoredReads;
+  private static int maxReadSizeForeVectoredReadsThroughput;
 
   // hide instance constructor
   private ReadBufferManagerV1() {
-    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+    this.vectoredReadHandler = new VectoredReadHandler(this);
+    printTraceLog("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  VectoredReadHandler getVectoredReadHandler() {
+    return vectoredReadHandler;
   }
 
   /**
    * Sets the read buffer manager configurations.
+   *
    * @param readAheadBlockSize the size of the read-ahead block in bytes
+   * @param abfsConfiguration the configuration to set for the 
ReadBufferManagerV1.
    */
-  public static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+  public static void setReadBufferManagerConfigs(int readAheadBlockSize,
+      AbfsConfiguration abfsConfiguration) {
     if (bufferManager == null) {
       LOGGER.debug(
           "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
           readAheadBlockSize);
+      vectoredReadStrategy = abfsConfiguration.getVectoredReadStrategy();
+      maxReadSizeForVectoredReads = 
abfsConfiguration.getMaxReadSizeForVectoredReads();
+      maxReadSizeForeVectoredReadsThroughput
+          = abfsConfiguration.getMaxReadSizeForVectoredReadsThroughput();
       setReadAheadBlockSize(readAheadBlockSize);
       setThresholdAgeMilliseconds(DEFAULT_THRESHOLD_AGE_MILLISECONDS);
     }
@@ -111,10 +133,8 @@ void init() {
   @Override
   public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
       TracingContext tracingContext) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
-          stream.getPath(), requestedOffset, requestedLength);
-    }
+    printTraceLog("Start Queueing readAhead for {} offset {} length {}",
+        stream.getPath(), requestedOffset, requestedLength);
     ReadBuffer buffer;
     synchronized (this) {
       if (isAlreadyQueued(stream, requestedOffset)) {
@@ -139,10 +159,127 @@ public void queueReadAhead(final AbfsInputStream stream, 
final long requestedOff
       buffer.setBufferindex(bufferIndex);
       getReadAheadQueue().add(buffer);
       notifyAll();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
+      printTraceLog("Done q-ing readAhead for file {} offset {} buffer idx {}",
             stream.getPath(), requestedOffset, buffer.getBufferindex());
+    }
+  }
+
+  /**
+   * Queue a vectored read for a buffer-sized physical read unit.
+   *
+   * <p>The method first attempts to attach the logical unit to an already
+   * in-progress physical read for the same file and offset. If that is not
+   * possible, a free read buffer is acquired and a new backend read is
+   * queued.</p>
+   *
+   * @param stream         input stream for the file being read
+   * @param unit           buffer-sized combined file range to be read
+   * @param tracingContext tracing context used for the backend read request
+   * @param allocator      allocator used to create buffers for vectored 
fan-out
+   * @return {@code true} if the read was queued or attached to an existing
+   *         in-progress buffer; {@code false} if no buffer was available
+   */
+  boolean queueVectoredRead(AbfsInputStream stream,
+      CombinedFileRange unit,
+      TracingContext tracingContext,
+      IntFunction<ByteBuffer> allocator) {
+    /* Create a child tracing context for vectored read-ahead requests */
+    TracingContext readAheadTracingContext =
+        new TracingContext(tracingContext);
+    readAheadTracingContext.setPrimaryRequestID();
+    readAheadTracingContext.setReadType(ReadType.VECTORED_READ);
+
+    synchronized (this) {
+      if (isAlreadyQueued(stream, unit.getOffset())) {
+        ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset());
+        if (existing != null && existing.getStream().getETag() != null
+            && stream.getETag().equals(existing.getStream().getETag())) {
+          /*
+           * For AVAILABLE buffers use actual bytes read (getLength()) for
+           * coverage check. For READING_IN_PROGRESS buffers use
+           * requestedLength as an estimate — the short-read guard will be
+           * applied later in doneReading before dispatching completion.
+           */
+          long end = existing.getOffset() + (
+              existing.getStatus() == ReadBufferStatus.AVAILABLE
+                  ? existing.getLength()
+                  : existing.getRequestedLength());
+          if (end >= unit.getOffset() + unit.getLength()) {
+            existing.setBufferType(BufferType.VECTORED);
+            existing.addVectoredUnit(unit);
+            existing.setAllocator(allocator);
+            if (existing.getStatus() == ReadBufferStatus.AVAILABLE) {
+              /*
+               * Buffer is already AVAILABLE. Trigger completion immediately.
+               * Use getLength() (actual bytes) for coverage — redundant here
+               * since the outer check already used getLength() for AVAILABLE,
+               * but kept explicit for clarity.
+               */
+              printTraceLog("Hitchhiking onto AVAILABLE buffer {}, length {}",
+                  existing, existing.getLength());
+              handleVectoredCompletion(existing,
+                  existing.getStatus(),
+                  existing.getLength());
+            }
+            /*
+             * For AVAILABLE buffers use the actual bytes read (getLength()) 
for
+             * coverage check. For all other states (NOT_AVAILABLE or
+             * READING_IN_PROGRESS), use requestedLength as an estimate of the
+             * planned physical read coverage. The short-read guard will later 
be
+             * enforced in doneReading() once the actual bytes read are known.
+             */
+            return true;
+          }
+        }
+      }
+      /*
+       * Ensure a free buffer is available, attempting best-effort recovery
+       * through memory upscaling or eviction if necessary.
+       */
+      if (getFreeList().isEmpty() && !tryEvict()) {
+        return false;
       }
+      /*
+       * Create a logical ReadBuffer descriptor without binding pooled memory.
+       * This captures metadata required to schedule the physical read.
+       */
+      ReadBuffer buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setETag(stream.getETag());
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(unit.getOffset());
+      buffer.setRequestedLength(unit.getLength());
+      buffer.setBufferType(BufferType.VECTORED);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.addVectoredUnit(unit);
+      buffer.setAllocator(allocator);
+      buffer.setTracingContext(readAheadTracingContext);
+      /*
+       * Perform a final free-list check before consuming pooled memory to
+       * ensure buffer availability.
+       */
+      if (getFreeList().isEmpty()) {
+        return false;
+      }
+      Integer bufferIndex = getFreeList().pop();
+      if (bufferIndex >= buffers.length) {
+        /* Defensive guard; should never occur */
+        return false;
+      }
+      /*
+       * Bind the physical buffer and queue the read for asynchronous
+       * execution.
+       */
+      buffer.setBuffer(buffers[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog(
+          "Done q-ing vectored readAhead for file {} offset {} buffer idx {}",
+          stream.getPath(), unit.getOffset(), buffer.getBufferindex());
+      return true;
     }
   }
 
@@ -150,13 +287,14 @@ public void queueReadAhead(final AbfsInputStream stream, 
final long requestedOff
    * {@inheritDoc}
    */
   @Override
-  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+  public int getBlock(final AbfsInputStream stream,
+      final long position,
+      final int length,
+      final byte[] buffer)
       throws IOException {
     // not synchronized, so have to be careful with locking
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("getBlock for file {}  position {}  thread {}",
-          stream.getPath(), position, Thread.currentThread().getName());
-    }
+    printTraceLog("getBlock for file {}  position {}  thread {}",
+        stream.getPath(), position, Thread.currentThread().getName());
 
     waitForProcess(stream, position);
 
@@ -165,10 +303,8 @@ public int getBlock(final AbfsInputStream stream, final 
long position, final int
       bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
     }
     if (bytesRead > 0) {
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done read from Cache for {} position {} length {}",
-            stream.getPath(), position, bytesRead);
-      }
+      printTraceLog("Done read from Cache for {} position {} length {}",
+          stream.getPath(), position, bytesRead);
       return bytesRead;
     }
 
@@ -194,10 +330,8 @@ public ReadBuffer getNextBlockToRead() throws 
InterruptedException {
       buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
       getInProgressList().add(buffer);
     }
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+   printTraceLog("ReadBufferWorker picked file {} for offset {}",
           buffer.getStream().getPath(), buffer.getOffset());
-    }
     return buffer;
   }
 
@@ -205,11 +339,75 @@ public ReadBuffer getNextBlockToRead() throws 
InterruptedException {
    * {@inheritDoc}
    */
   @Override
-  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result, final int bytesActuallyRead) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
+  public void doneReading(final ReadBuffer buffer,
+      final ReadBufferStatus result,
+      final int bytesActuallyRead) {
+    printTraceLog(
+        "ReadBufferWorker completed read file {} for offset {} outcome {} 
bytes {}",
+        buffer.getStream().getPath(), buffer.getOffset(), result,
+        bytesActuallyRead);
+
+    List<CombinedFileRange> vectoredUnits = buffer.getVectoredUnits();
+    if (result == ReadBufferStatus.AVAILABLE
+        && (buffer.getBufferType() == BufferType.VECTORED
+        && !vectoredUnits.isEmpty())) {
+
+      /*
+       * Set length BEFORE handling vectored completion so that any
+       * hitchhiked units that call existing.getLength() see the correct
+       * actual value rather than 0.
+       */
+      buffer.setLength(bytesActuallyRead);
+
+      /*
+       * Guard against short reads: units hitchhiked while buffer was
+       * READING_IN_PROGRESS used requestedLength as coverage estimate.
+       * Now that actual bytes are known, fail any units not fully covered
+       * so their callers are not left hanging on the CompletableFuture.
+       */
+      long actualEnd = buffer.getOffset() + bytesActuallyRead;
+
+      /*
+       * Fast path: check if any unit exceeds actual bytes read before
+       * doing expensive stream/collect. Short reads are rare so this
+       * avoids unnecessary allocations in the common case.
+       */
+      boolean hasUncovered = false;
+      for (CombinedFileRange u : vectoredUnits) {
+        if ((u.getOffset() + u.getLength()) > actualEnd) {
+          hasUncovered = true;
+          break;
+        }
+      }
+
+      if (hasUncovered) {
+        /*
+         * Short read detected — fail uncovered units explicitly so callers
+         * are not left hanging on their CompletableFuture.
+         */
+        Iterator<CombinedFileRange> it = vectoredUnits.iterator();
+        while (it.hasNext()) {
+          CombinedFileRange u = it.next();
+          if ((u.getOffset() + u.getLength()) > actualEnd) {
+            it.remove();
+            printTraceLog("Vectored unit not covered by actual bytes read: 
unitEnd={} actualEnd={}, failing unit",
+                (u.getOffset() + u.getLength()), actualEnd);
+            u.getData().completeExceptionally(new IOException(
+                "Vectored read unit not covered by actual bytes read: "
+                    + "unitEnd=" + (u.getOffset() + u.getLength())
+                    + " actualEnd=" + actualEnd));
+          }
+        }
+      }
+
+      if (!vectoredUnits.isEmpty()) {
+        printTraceLog(
+            "Entering vectored read completion with buffer {}, result {}, 
bytesActuallyRead {}",
+            buffer, result, bytesActuallyRead);
+        handleVectoredCompletion(buffer, result, bytesActuallyRead);
+      }
     }
+
     synchronized (this) {
       // If this buffer has already been purged during
       // close of InputStream then we don't update the lists.
@@ -260,10 +458,8 @@ private void waitForProcess(final AbfsInputStream stream, 
final long position) {
     }
     if (readBuf != null) {         // if in in-progress queue, then block for 
it
       try {
-        if (LOGGER.isTraceEnabled()) {
-          LOGGER.trace("got a relevant read buffer for file {} offset {} 
buffer idx {}",
+       printTraceLog("got a relevant read buffer for file {} offset {} buffer 
idx {}",
               stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
-        }
         readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
         // Note on correctness: readBuf gets out of inProgressList only in 1 
place: after worker thread
         // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
@@ -274,10 +470,8 @@ private void waitForProcess(final AbfsInputStream stream, 
final long position) {
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
       }
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("latch done for file {} buffer idx {} length {}",
+      printTraceLog("latch done for file {} buffer idx {} length {}",
             stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
-      }
     }
   }
 
@@ -346,7 +540,7 @@ private synchronized boolean tryEvict() {
       return evict(nodeToEvict);
     }
 
-    LOGGER.trace("No buffer eligible for eviction");
+    printTraceLog("No buffer eligible for eviction");
     // nothing can be evicted
     return false;
   }
@@ -366,10 +560,8 @@ private boolean evict(final ReadBuffer buf) {
 
     getCompletedReadList().remove(buf);
     buf.setTracingContext(null);
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
+    printTraceLog("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
           buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
-    }
     return true;
   }
 
@@ -623,6 +815,21 @@ private static void setBufferManager(ReadBufferManagerV1 
manager) {
     bufferManager = manager;
   }
 
+  @VisibleForTesting
+  public VectoredReadStrategy getVectoredReadStrategy() {
+    return vectoredReadStrategy;
+  }
+
+  @VisibleForTesting
+  public int getMaxReadSizeForVectoredReads() {
+    return maxReadSizeForVectoredReads;
+  }
+
+  @VisibleForTesting
+  public int getMaxReadSizeForeVectoredReadsThroughput() {
+    return maxReadSizeForeVectoredReadsThroughput;
+  }
+
   @Override
   protected void clearFreeList() {
     getFreeList().clear();
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index c271a5e354a..1da659c7515 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -19,14 +19,17 @@
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.JvmUniqueIdProvider;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
@@ -38,10 +41,14 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.IntFunction;
 
+import org.apache.hadoop.fs.azurebfs.enums.BufferType;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
 import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
 import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
@@ -122,6 +129,10 @@ public final class ReadBufferManagerV2 extends 
ReadBufferManager {
   private volatile String lastScaleDirection = EMPTY_STRING;
   /* Maximum CPU utilization observed during the monitoring interval. */
   private volatile long maxJvmCpuUtilization = 0L;
+  private final VectoredReadHandler vectoredReadHandler;
+  private static VectoredReadStrategy vectoredReadStrategy;
+  private static int maxReadSizeForVectoredReads;
+  private static int maxReadSizeForeVectoredReadsThroughput;
 
   /**
    * Private constructor to prevent instantiation as this needs to be 
singleton.
@@ -131,6 +142,7 @@ public final class ReadBufferManagerV2 extends 
ReadBufferManager {
   private ReadBufferManagerV2(AbfsCounters abfsCounters) {
     this.abfsCounters = abfsCounters;
     readThreadPoolMetrics = 
abfsCounters.getAbfsReadResourceUtilizationMetrics();
+    vectoredReadHandler = new VectoredReadHandler(this);
     printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
   }
 
@@ -160,6 +172,10 @@ static ReadBufferManagerV2 getBufferManager(AbfsCounters 
abfsCounters) {
     return bufferManager;
   }
 
+  VectoredReadHandler getVectoredReadHandler() {
+    return vectoredReadHandler;
+  }
+
   /**
    * Set the ReadBufferManagerV2 configurations based on the provided before 
singleton initialization.
    * @param readAheadBlockSize the read-ahead block size to set for the 
ReadBufferManagerV2.
@@ -194,6 +210,9 @@ public static void setReadBufferManagerConfigs(final int 
readAheadBlockSize,
               abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
           isDynamicScalingEnabled
               = abfsConfiguration.isReadAheadV2DynamicScalingEnabled();
+          vectoredReadStrategy = abfsConfiguration.getVectoredReadStrategy();
+          maxReadSizeForVectoredReads = 
abfsConfiguration.getMaxReadSizeForVectoredReads();
+          maxReadSizeForeVectoredReadsThroughput = 
abfsConfiguration.getMaxReadSizeForVectoredReadsThroughput();
           setReadAheadBlockSize(readAheadBlockSize);
           setIsConfigured(true);
         }
@@ -338,6 +357,124 @@ public void queueReadAhead(final AbfsInputStream stream,
     }
   }
 
+  /**
+   * Queue a vectored read for a buffer-sized physical read unit.
+   *
+   * <p>The method first attempts to attach the logical unit to an already
+   * in-progress physical read for the same file and offset. If that is not
+   * possible, a free read buffer is acquired and a new backend read is
+   * queued.</p>
+   *
+   * @param stream         input stream for the file being read
+   * @param unit           buffer-sized combined file range to be read
+   * @param tracingContext tracing context used for the backend read request
+   * @param allocator      allocator used to create buffers for vectored 
fan-out
+   * @return {@code true} if the read was queued or attached to an existing
+   *         in-progress buffer; {@code false} if no buffer was available
+   */
+  boolean queueVectoredRead(AbfsInputStream stream,
+      CombinedFileRange unit,
+      TracingContext tracingContext,
+      IntFunction<ByteBuffer> allocator) {
+    /* Create a child tracing context for vectored read-ahead requests */
+    TracingContext readAheadTracingContext =
+        new TracingContext(tracingContext);
+    readAheadTracingContext.setPrimaryRequestID();
+    readAheadTracingContext.setReadType(ReadType.VECTORED_READ);
+    synchronized (this) {
+      /*
+       * Attempt to hitchhike on an existing in-progress physical read if it
+       * covers the requested logical range completely.
+       */
+      if (isAlreadyQueued(stream.getETag(), unit.getOffset())) {
+        ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset());
+        if (existing != null && existing.getStream().getETag() != null  && 
stream.getETag()
+            .equals(existing.getStream().getETag())) {
+          long end = existing.getOffset() + (
+              existing.getStatus() == ReadBufferStatus.AVAILABLE
+                  ? existing.getLength()
+                  : existing.getRequestedLength());
+          if (end >= unit.getOffset() + unit.getLength()) {
+            existing.setBufferType(BufferType.VECTORED);
+            existing.addVectoredUnit(unit);
+            existing.setAllocator(allocator);
+            if (existing.getStatus() == ReadBufferStatus.AVAILABLE) {
+              /*
+               * Buffer is already AVAILABLE. Trigger completion immediately.
+               * Use getLength() (actual bytes) for coverage — redundant here
+               * since the outer check already used getLength() for AVAILABLE,
+               * but kept explicit for clarity.
+               */
+              printTraceLog("Hitchhiking onto AVAILABLE buffer {}, length {}",
+                  existing, existing.getLength());
+              handleVectoredCompletion(existing,
+                  existing.getStatus(),
+                  existing.getLength());
+            }
+            /*
+             * For AVAILABLE buffers use the actual bytes read (getLength()) 
for
+             * coverage check. For all other states (NOT_AVAILABLE or
+             * READING_IN_PROGRESS), use requestedLength as an estimate of the
+             * planned physical read coverage. The short-read guard will later 
be
+             * enforced in doneReading() once the actual bytes read are known.
+             */
+            return true;
+          }
+        }
+      }
+      /*
+       * Ensure a free buffer is available, attempting best-effort recovery
+       * through memory upscaling or eviction if necessary.
+       */
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        return false;
+      }
+      /*
+       * Create a logical ReadBuffer descriptor without binding pooled memory.
+       * This captures metadata required to schedule the physical read.
+       */
+      ReadBuffer buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setETag(stream.getETag());
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(unit.getOffset());
+      buffer.setRequestedLength(unit.getLength());
+      buffer.setBufferType(BufferType.VECTORED);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.addVectoredUnit(unit);
+      buffer.setAllocator(allocator);
+      buffer.setTracingContext(readAheadTracingContext);
+      /*
+       * Perform a final free-list check before consuming pooled memory to
+       * ensure buffer availability.
+       */
+      if (isFreeListEmpty()) {
+        return false;
+      }
+      Integer bufferIndex = popFromFreeList();
+      if (bufferIndex >= bufferPool.length) {
+        /* Defensive guard; should never occur */
+        return false;
+      }
+      /*
+       * Bind the physical buffer and queue the read for asynchronous
+       * execution.
+       */
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog(
+          "Done q-ing vectored readAhead for file: {}, with eTag:{}, offset: 
{}, "
+              + "buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), unit.getOffset(),
+          buffer.getBufferindex(), stream.hashCode());
+      return true;
+    }
+  }
+
   /**
    * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
    * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
@@ -427,10 +564,65 @@ public ReadBuffer getNextBlockToRead() throws 
InterruptedException {
   public void doneReading(final ReadBuffer buffer,
       final ReadBufferStatus result,
       final int bytesActuallyRead) {
-    printTraceLog(
-        "ReadBufferWorker completed prefetch for file: {} with eTag: {}, for 
offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
-        buffer.getPath(), buffer.getETag(), buffer.getOffset(),
-        buffer.getStream().hashCode(), result, bytesActuallyRead);
+    printTraceLog("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
+        buffer.getStream().getPath(),
+        buffer.getOffset(),
+        result,
+        bytesActuallyRead);
+    List<CombinedFileRange> vectoredUnits = buffer.getVectoredUnits();
+    if (result == ReadBufferStatus.AVAILABLE
+        && (buffer.getBufferType() == BufferType.VECTORED && 
!vectoredUnits.isEmpty())) {
+
+      /*
+       * Set length BEFORE handling vectored completion so that any
+       * hitchhiked units that call existing.getLength() see the correct
+       * actual value rather than 0.
+       */
+      buffer.setLength(bytesActuallyRead);
+      /*
+       * Guard against short reads: units hitchhiked while buffer was
+       * READING_IN_PROGRESS used requestedLength as coverage estimate.
+       * Now that actual bytes are known, fail any units not fully covered
+       * so their callers are not left hanging on the CompletableFuture.
+       */
+      long actualEnd = buffer.getOffset() + bytesActuallyRead;
+      /*
+       * Fast path: check if any unit exceeds actual bytes read before
+       * doing expensive stream/collect. Short reads are rare so this
+       * avoids unnecessary allocations in the common case.
+       */
+      boolean hasUncovered = false;
+      for (CombinedFileRange u : vectoredUnits) {
+        if ((u.getOffset() + u.getLength()) > actualEnd) {
+          hasUncovered = true;
+          break;
+        }
+      }
+      if (hasUncovered) {
+        /*
+         * Short read detected — fail uncovered units explicitly so callers
+         * are not left hanging on their CompletableFuture.
+         */
+        Iterator<CombinedFileRange> it = vectoredUnits.iterator();
+        while (it.hasNext()) {
+          CombinedFileRange u = it.next();
+          if ((u.getOffset() + u.getLength()) > actualEnd) {
+            it.remove();
+            printTraceLog("Vectored unit not covered by actual bytes read: 
unitEnd={} actualEnd={}, failing unit",
+                (u.getOffset() + u.getLength()), actualEnd);
+            u.getData().completeExceptionally(new IOException(
+                "Vectored read unit not covered by actual bytes read: "
+                    + "unitEnd=" + (u.getOffset() + u.getLength())
+                    + " actualEnd=" + actualEnd));
+          }
+        }
+      }
+      if (!vectoredUnits.isEmpty()) {
+        printTraceLog("Entering vectored read completion with buffer {}, 
result {}, bytesActuallyRead {}",
+            buffer, result, bytesActuallyRead);
+        handleVectoredCompletion(buffer, result, bytesActuallyRead);
+      }
+    }
     synchronized (this) {
       // If this buffer has already been purged during
       // close of InputStream then we don't update the lists.
@@ -438,16 +630,18 @@ public void doneReading(final ReadBuffer buffer,
         getInProgressList().remove(buffer);
         if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
           // Successful read, so update the buffer status and length
-          buffer.setStatus(ReadBufferStatus.AVAILABLE);
-          buffer.setLength(bytesActuallyRead);
+          if (!buffer.isFanOutDone()) {
+            buffer.setStatus(ReadBufferStatus.AVAILABLE);
+            buffer.setLength(bytesActuallyRead);
+          }
         } else {
           // Failed read, reuse buffer for next read, this buffer will be
           // evicted later based on eviction policy.
           pushToFreeList(buffer.getBufferindex());
+          buffer.setStatus(result);
         }
         // completed list also contains FAILED read buffers
         // for sending exception message to clients.
-        buffer.setStatus(result);
         buffer.setTimeStamp(currentTimeMillis());
         getCompletedReadList().add(buffer);
       }
@@ -506,7 +700,7 @@ private ReadBuffer getFromList(final Collection<ReadBuffer> 
list,
       final String eTag,
       final long requestedOffset) {
     for (ReadBuffer buffer : list) {
-      if (eTag.equals(buffer.getETag())) {
+      if (Objects.equals(eTag, buffer.getETag())) {
         if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
             && requestedOffset >= buffer.getOffset()
             && requestedOffset < buffer.getOffset() + buffer.getLength()) {
@@ -749,7 +943,7 @@ private ReadBuffer getBufferFromCompletedQueue(final String 
eTag,
     for (ReadBuffer buffer : getCompletedReadList()) {
       // Buffer is returned if the requestedOffset is at or above buffer's
       // offset but less than buffer's length or the actual requestedLength
-      if (eTag.equals(buffer.getETag())
+      if (Objects.equals(eTag, buffer.getETag())
           && (requestedOffset >= buffer.getOffset())
           && ((requestedOffset < buffer.getOffset() + buffer.getLength())
           || (requestedOffset
@@ -1062,16 +1256,6 @@ public Thread newThread(Runnable r) {
     }
   };
 
-  private void printTraceLog(String message, Object... args) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace(message, args);
-    }
-  }
-
-  private void printDebugLog(String message, Object... args) {
-    LOGGER.debug(message, args);
-  }
-
   @VisibleForTesting
   synchronized static ReadBufferManagerV2 getInstance() {
     return bufferManager;
@@ -1120,6 +1304,21 @@ public ScheduledExecutorService getCpuMonitoringThread() 
{
     return cpuMonitorThread;
   }
 
+  @VisibleForTesting
+  public VectoredReadStrategy getVectoredReadStrategy() {
+    return vectoredReadStrategy;
+  }
+
+  @VisibleForTesting
+  public int getMaxReadSizeForVectoredReads() {
+    return maxReadSizeForVectoredReads;
+  }
+
+  @VisibleForTesting
+  public int getMaxReadSizeForeVectoredReadsThroughput() {
+    return maxReadSizeForeVectoredReadsThroughput;
+  }
+
   /**
    * Returns the maximum JVM CPU utilization observed during the current
    * monitoring interval or since the last reset.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java
new file mode 100644
index 00000000000..f8590ceee71
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java
@@ -0,0 +1,696 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.VectoredReadUtils;
+import org.apache.hadoop.fs.azurebfs.constants.ReadType;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
+
+/**
+ * Handles vectored read operations by coordinating with a ReadBufferManager
+ * and applying the configured VectoredReadStrategy.
+ * This class acts as the orchestration layer that decides how vectored reads
+ * are executed, while delegating buffer management and read behavior to
+ * dedicated components.
+ */
+class VectoredReadHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VectoredReadHandler.class);
+
+  /**
+   * Manages allocation, lifecycle, and reuse of read buffers
+   * used during vectored read operations.
+   */
+  private final ReadBufferManager readBufferManager;
+
+  /**
+   * Strategy defining how vectored reads should be performed.
+   */
+  private final VectoredReadStrategy strategy;
+
+  /**
+   * Shared reconstruction buffers for ranges that span multiple chunks.
+   * Keyed by the original FileRange instance.
+   */
+  private final ConcurrentHashMap<RangeKey, ByteBuffer> partialBuffers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Tracks remaining bytes to be received for each logical range.
+   * Keyed by the original FileRange instance.
+   */
+  private final ConcurrentHashMap<RangeKey, AtomicInteger> pendingBytes =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates a VectoredReadHandler using the provided ReadBufferManager.
+   * The vectored read strategy is obtained from the manager to ensure
+   * consistent configuration across the read pipeline.
+   *
+   * @param readBufferManager manager responsible for buffer handling
+   *                          and providing the vectored read strategy
+   */
+  VectoredReadHandler(ReadBufferManager readBufferManager) {
+    this.readBufferManager = readBufferManager;
+    this.strategy = readBufferManager.getVectoredReadStrategy();
+    LOG.debug("VectoredReadHandler initialized with strategy={}", strategy);
+  }
+
+  /**
+   * Perform a vectored read over multiple logical file ranges.
+   *
+   * <p>Logical ranges are first merged using a span-first strategy determined
+   * by the configured {@link VectoredReadStrategy}. The merged ranges are then
+   * split into buffer-sized physical read units and queued for asynchronous
+   * execution. If a pooled buffer is unavailable, the read falls back to a
+   * direct read path.</p>
+   *
+   * @param stream    input stream for the file being read
+   * @param ranges    logical file ranges to read; each range will be completed
+   *                  with data or failure via its associated future
+   * @param allocator allocator used to create buffers for direct reads and
+   *                  vectored fan-out
+   */
+  public void readVectored(
+      AbfsInputStream stream,
+      List<? extends FileRange> ranges,
+      IntFunction<ByteBuffer> allocator) throws EOFException {
+    LOG.debug("readVectored invoked: path={}, rangeCount={}",
+        stream.getPath(), ranges.size());
+
+    /* Initialize a future for each logical file range */
+    long fileLength = stream.getContentLength();
+    List<FileRange> validRanges = validateAndPrepareRanges(ranges, fileLength);
+
+    /* Select the maximum allowed merge span based on the configured strategy 
*/
+    int maxSpan =
+        (strategy == VectoredReadStrategy.TPS_OPTIMIZED)
+            ? readBufferManager.getMaxReadSizeForVectoredReads()
+            : readBufferManager.getMaxReadSizeForeVectoredReadsThroughput();
+
+    LOG.debug("readVectored: path={}, strategy={}, maxSpan={}",
+        stream.getPath(), strategy, maxSpan);
+
+    /* Merge logical ranges using a span-first coalescing strategy */
+    List<CombinedFileRange> merged = mergeBySpanAndGap(validRanges, maxSpan, 
fileLength);
+
+    LOG.debug("readVectored: path={}, mergedRangeCount={}",
+        stream.getPath(), merged.size());
+
+    /* Read buffer size acts as a hard upper bound for physical reads */
+    int readBufferSize = ReadBufferManager.getReadAheadBlockSize();
+
+    /* Split merged ranges into buffer-sized chunks and queue each for read */
+    for (CombinedFileRange unit : merged) {
+      List<CombinedFileRange> chunks = splitByBufferSize(unit, readBufferSize);
+
+      LOG.debug("readVectored: path={}, mergedOffset={}, mergedLength={}, 
chunkCount={}",
+          stream.getPath(), unit.getOffset(), unit.getLength(), chunks.size());
+
+      for (CombinedFileRange chunk : chunks) {
+        try {
+          VectoredReadUtils.validateRangeRequest(chunk);
+          boolean queued = queueVectoredRead(stream, chunk, allocator);
+          if (!queued) {
+            LOG.debug("readVectored: buffer pool exhausted, falling back to 
directRead: path={}, offset={}, length={}",
+                stream.getPath(), chunk.getOffset(), chunk.getLength());
+            directRead(stream, chunk, allocator);
+          }
+        } catch (Exception e) {
+          LOG.warn("readVectored: chunk read failed, failing underlying 
ranges:"
+                  + " path={}, offset={}, length={}",
+              stream.getPath(), chunk.getOffset(), chunk.getLength(), e);
+          failUnit(chunk, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Queues a vectored read request with the buffer manager.
+   *
+   * @return true if successfully queued, false if the queue is full and 
fallback is required.
+   */
+  @VisibleForTesting
+  boolean queueVectoredRead(AbfsInputStream stream, CombinedFileRange unit,
+      IntFunction<ByteBuffer> allocator) {
+    LOG.debug("queueVectoredRead: path={}, offset={}, length={}",
+        stream.getPath(), unit.getOffset(), unit.getLength());
+    TracingContext tracingContext = stream.getTracingContext();
+    tracingContext.setReadType(ReadType.VECTORED_READ);
+    return getReadBufferManager().queueVectoredRead(stream, unit, 
tracingContext, allocator);
+  }
+
+  /**
+   * Accesses the shared manager responsible for coordinating asynchronous 
read buffers.
+   *
+   * @return the {@link ReadBufferManager} instance.
+   */
+  public ReadBufferManager getReadBufferManager() {
+    return readBufferManager;
+  }
+
+  /**
+   * Validates and prepares a list of {@link FileRange} instances for vectored 
reads.
+   *
+   * <p>This method performs the following steps for each input range:
+   * <ul>
+   *   <li>Validates the range using {@link 
VectoredReadUtils#validateRangeRequest(FileRange)}</li>
+   *   <li>Initializes a {@link CompletableFuture} to hold the read result</li>
+   *   <li>Checks that the range falls within the bounds of the file</li>
+   * </ul>
+   *
+   * <p>Ranges that are invalid (e.g., negative offset/length or exceeding 
file bounds)
+   * are not included in the returned list. Instead, their associated future is
+   * completed exceptionally with an {@link EOFException}.
+   *
+   * @param ranges     the input list of logical file ranges to validate and 
prepare
+   * @param fileLength the total length of the file, used for bounds checking
+   * @return a list of valid {@link FileRange} instances ready for vectored 
read processing
+   */
+  private List<FileRange> validateAndPrepareRanges(
+      List<? extends FileRange> ranges, long fileLength) throws EOFException {
+
+    List<FileRange> validRanges = new ArrayList<>();
+
+    for (FileRange r : ranges) {
+      VectoredReadUtils.validateRangeRequest(r);
+      r.setData(new CompletableFuture<>());
+
+      long offset = r.getOffset();
+      long length = r.getLength();
+
+      if (offset < 0 || length < 0 || offset > fileLength || length > 
fileLength - offset) {
+        r.getData().completeExceptionally(new EOFException(
+            "Invalid range: offset=" + offset + ", length=" + length + ", 
fileLength=" + fileLength));
+        continue;
+      }
+      validRanges.add(r);
+    }
+    return validRanges;
+  }
+
+  /**
+   * Splits a merged logical {@link CombinedFileRange} into smaller
+   * buffer-sized physical read units.
+   *
+   * <p>Each resulting unit will have a maximum size equal to the provided
+   * {@code bufferSize}. Any handling of multi-chunk ranges or reassembly
+   * of underlying {@link FileRange} data is delegated to the fan-out 
logic.</p>
+   *
+   * @param unit       the combined logical range to be split
+   * @param bufferSize the maximum size (in bytes) of each physical read unit
+   * @return a list of buffer-sized {@link CombinedFileRange} instances
+   */
+  private List<CombinedFileRange> splitByBufferSize(
+      CombinedFileRange unit,
+      int bufferSize) {
+    LOG.debug("splitByBufferSize: offset={}, length={}, bufferSize={}",
+        unit.getOffset(), unit.getLength(), bufferSize);
+
+    List<CombinedFileRange> parts = new ArrayList<>();
+    long unitStart = unit.getOffset();
+    long unitEnd = unitStart + unit.getLength();
+    long start = unitStart;
+
+    while (start < unitEnd) {
+      long partEnd = Math.min(start + bufferSize, unitEnd);
+
+      CombinedFileRange part =
+          new CombinedFileRange(start, partEnd, unit.getUnderlying().get(0));
+      part.getUnderlying().clear();
+
+      for (FileRange r : unit.getUnderlying()) {
+        long rStart = r.getOffset();
+        long rEnd = rStart + r.getLength();
+        if (rEnd > start && rStart < partEnd) {
+          part.getUnderlying().add(r);
+        }
+      }
+      parts.add(part);
+      start = partEnd;
+    }
+
+    LOG.debug("splitByBufferSize: offset={}, produced {} parts",
+        unit.getOffset(), parts.size());
+    return parts;
+  }
+
+  /**
+   * Merge logical {@link FileRange}s into {@link CombinedFileRange}s using a
+   * span-first coalescing strategy.
+   *
+   * <p>Ranges are merged as long as the total span from the first offset to 
the
+   * end of the last range does not exceed {@code maxSpan}. Gaps between ranges
+   * are ignored.</p>
+   *
+   * @param ranges  logical file ranges to merge
+   * @param maxSpan maximum allowed span (in bytes) for a combined read
+   * @return merged {@link CombinedFileRange}s covering the input ranges
+   */
+  private List<CombinedFileRange> mergeBySpanAndGap(
+      List<? extends FileRange> ranges,
+      int maxSpan, long  fileLength) throws EOFException {
+
+    LOG.debug("mergeBySpanAndGap: rangeCount={}, maxSpan={}", ranges.size(), 
maxSpan);
+    List<? extends FileRange> sortedRanges = 
VectoredReadUtils.validateAndSortRanges(
+        ranges, Optional.of(fileLength));
+
+    List<CombinedFileRange> out = new ArrayList<>();
+    CombinedFileRange current = null;
+
+    for (FileRange r : sortedRanges) {
+      long rOffset = r.getOffset();
+      long rEnd = rOffset + r.getLength();
+
+      /* Initialize the first combined range */
+      if (current == null) {
+        current = new CombinedFileRange(rOffset, rEnd, r);
+        continue;
+      }
+
+      /* Check whether adding this range keeps the total span within the limit 
*/
+      long newSpan = rEnd - current.getOffset();
+
+      if (newSpan <= maxSpan) {
+        current.setLength((int) newSpan);
+        current.getUnderlying().add(r);
+      } else {
+        /* Span exceeded; finalize current range and start a new one */
+        out.add(current);
+        current = new CombinedFileRange(rOffset, rEnd, r);
+      }
+    }
+
+    /* Add the final combined range, if any */
+    if (current != null) {
+      out.add(current);
+    }
+
+    LOG.debug("mergeBySpanAndGap: produced {} combined ranges", out.size());
+    return out;
+  }
+
+  /**
+   * Distributes data from a physical read buffer into the corresponding
+   * logical {@link FileRange}s.
+   *
+   * <p>This method performs a "fan-out" operation where a single physical
+   * read (represented by {@link ReadBuffer}) may contain data for multiple
+   * logical ranges. The relevant portions are copied into per-range buffers
+   * and completed once fully populated.</p>
+   *
+   * <p>Partial reads are accumulated using {@code partialBuffers} and
+   * {@code pendingBytes}. A range is only completed when all expected
+   * bytes have been received.</p>
+   *
+   * <p>Thread safety:
+   * <ul>
+   *   <li>Each logical range buffer is synchronized independently</li>
+   *   <li>Writes use {@code System.arraycopy} directly into the backing array
+   *       to avoid shared {@link ByteBuffer} position mutation</li>
+   * </ul>
+   * </p>
+   *
+   * @param buffer    the physical read buffer containing merged data
+   * @param bytesRead number of valid bytes in the buffer
+   */
+  void fanOut(ReadBuffer buffer, int bytesRead) {
+    LOG.debug("fanOut: path={}, bufferOffset={}, bytesRead={}",
+        buffer.getPath(), buffer.getOffset(), bytesRead);
+
+    List<CombinedFileRange> units = buffer.getVectoredUnits();
+    if (units == null) {
+      LOG.warn("fanOut: no vectored units found for path={}, offset={}",
+          buffer.getPath(), buffer.getOffset());
+      return;
+    }
+
+    long bufferStart = buffer.getOffset();
+    long bufferEnd = bufferStart + bytesRead;
+
+    /* Iterate over all combined logical units mapped to this buffer */
+    for (CombinedFileRange unit : units) {
+      /* Each unit may contain multiple logical FileRanges */
+      for (FileRange r : unit.getUnderlying()) {
+        CompletableFuture<ByteBuffer> future = r.getData();
+        RangeKey key = new RangeKey(r);
+
+        /* Skip already completed or cancelled ranges */
+        if (future.isCancelled()) {
+          LOG.debug("fanOut: range cancelled, cleaning up: path={}, 
rangeOffset={}",
+              buffer.getPath(), r.getOffset());
+          partialBuffers.remove(key);
+          pendingBytes.remove(key);
+          continue;
+        }
+        if (future.isDone()) {
+          continue;
+        }
+
+        try {
+          long rangeStart = r.getOffset();
+          long rangeEnd = rangeStart + r.getLength();
+
+          /* Compute overlap between buffer and logical range */
+          long overlapStart = Math.max(rangeStart, bufferStart);
+          long overlapEnd = Math.min(rangeEnd, bufferEnd);
+
+          /* No overlap nothing to copy */
+          if (overlapStart >= overlapEnd) {
+            LOG.debug("fanOut: no overlap for path={}, rangeOffset={}, 
bufferOffset={}",
+                buffer.getPath(), r.getOffset(), bufferStart);
+            continue;
+          }
+
+          int srcOffset = (int) (overlapStart - bufferStart);
+          int destOffset = (int) (overlapStart - rangeStart);
+          int length = (int) (overlapEnd - overlapStart);
+
+          LOG.debug("fanOut: copying path={}, rangeOffset={}, rangeLength={},"
+                  + " bufferOffset={}, srcOffset={}, destOffset={}, length={}",
+              buffer.getPath(), r.getOffset(), r.getLength(),
+              bufferStart, srcOffset, destOffset, length);
+
+          /* Allocate or reuse the full buffer for this logical range */
+          ByteBuffer fullBuf = partialBuffers.computeIfAbsent(
+              key, k -> buffer.getAllocator().apply(r.getLength()));
+
+          /* Track remaining bytes required to complete this range */
+          AtomicInteger pending = pendingBytes.computeIfAbsent(
+              key, k -> new AtomicInteger(r.getLength()));
+
+          synchronized (fullBuf) {
+            /* Double-check completion inside lock */
+            if (future.isDone()) {
+              continue;
+            }
+
+            ByteBuffer dst = fullBuf.duplicate();
+            dst.position(destOffset);
+            dst.put(buffer.getBuffer(), srcOffset, length);
+
+            int left = pending.addAndGet(-length);
+
+            LOG.debug("fanOut: wrote chunk: path={}, rangeOffset={}, 
destOffset={},"
+                    + " length={}, pendingBytes={}",
+                buffer.getPath(), r.getOffset(), destOffset, length, left);
+
+            if (left < 0) {
+              LOG.error("fanOut: pending bytes went negative possible 
duplicate write:"
+                      + " path={}, rangeOffset={}, pending={}",
+                  buffer.getPath(), r.getOffset(), left);
+              future.completeExceptionally(new IllegalStateException(
+                  "Pending bytes negative for offset=" + r.getOffset()));
+              partialBuffers.remove(key);
+              pendingBytes.remove(key);
+              continue;
+            }
+
+            /* Complete future once all bytes are received */
+            if (left == 0 && !future.isDone()) {
+              /*
+               * Prepare buffer for reading.
+               * DO NOT use flip() because writes may arrive out-of-order.
+               * Instead, explicitly expose the full buffer.
+               */
+              fullBuf.position(0);
+              fullBuf.limit(fullBuf.capacity());
+
+              if (fullBuf.limit() != r.getLength()) {
+                LOG.warn("fanOut: buffer size mismatch: path={}, 
rangeOffset={},"
+                        + " expected={}, actual={}",
+                    buffer.getPath(), r.getOffset(), r.getLength(), 
fullBuf.limit());
+              }
+
+              future.complete(fullBuf);
+              partialBuffers.remove(key);
+              pendingBytes.remove(key);
+
+              LOG.debug("fanOut: completed range: path={}, rangeOffset={}, 
rangeLength={}",
+                  buffer.getPath(), r.getOffset(), r.getLength());
+            }
+          }
+        } catch (Exception e) {
+          LOG.warn("fanOut: exception processing range: path={}, 
rangeOffset={}",
+              buffer.getPath(), r.getOffset(), e);
+          partialBuffers.remove(key);
+          pendingBytes.remove(key);
+          if (!future.isDone()) {
+            future.completeExceptionally(e);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Fails all logical {@link FileRange}s associated with a given
+   * {@link CombinedFileRange}.
+   *
+   * <p>This method is invoked when a vectored read for the combined unit
+   * fails. It ensures that:
+   * <ul>
+   *   <li>Any partially accumulated buffers are cleaned up</li>
+   *   <li>Pending byte tracking state is removed</li>
+   *   <li>All corresponding {@link CompletableFuture}s are completed 
exceptionally</li>
+   * </ul>
+   * </p>
+   *
+   * @param unit the combined vectored read unit whose underlying ranges must 
be failed
+   * @param t    the exception that caused the failure
+   */
+  private void failUnit(CombinedFileRange unit, Throwable t) {
+    for (FileRange r : unit.getUnderlying()) {
+      RangeKey key = new RangeKey(r);
+      partialBuffers.remove(key);
+      pendingBytes.remove(key);
+      CompletableFuture<ByteBuffer> future = r.getData();
+      if (future != null && !future.isDone()) {
+        future.completeExceptionally(t);
+      }
+    }
+  }
+
+  /**
+   * Fails all {@link FileRange} futures associated with the given
+   * {@link ReadBuffer} and clears any partial state.
+   *
+   * @param buffer the read buffer whose ranges should be failed
+   * @param t      the exception causing the failure
+   */
+  void failBufferFutures(ReadBuffer buffer, Throwable t) {
+    List<CombinedFileRange> units = buffer.getVectoredUnits();
+    if (units == null) {
+      LOG.warn("fanOut: no vectored units found for path={}, offset={}",
+          buffer.getPath(), buffer.getOffset());
+      return;
+    }
+
+    for (CombinedFileRange unit : units) {
+      for (FileRange r : unit.getUnderlying()) {
+        RangeKey key = new RangeKey(r);
+        partialBuffers.remove(key);
+        pendingBytes.remove(key);
+        CompletableFuture<ByteBuffer> future = r.getData();
+        if (future != null && !future.isDone()) {
+          future.completeExceptionally(t);
+        }
+      }
+    }
+  }
+
+  /**
+   * Performs a synchronous direct read for a {@link CombinedFileRange}
+   * when pooled buffering is not available.
+   *
+   * <p>Data is accumulated into per-range partial buffers shared with
+   * {@link #fanOut}, ensuring that ranges spanning multiple chunks are
+   * correctly reassembled regardless of whether individual chunks were
+   * served via the async queue or this direct fallback path.</p>
+   *
+   * @param stream    input stream to read from
+   * @param unit      combined range to read
+   * @param allocator buffer allocator for logical ranges
+   * @throws IOException if the read fails
+   */
+  void directRead(
+      AbfsInputStream stream,
+      CombinedFileRange unit,
+      IntFunction<ByteBuffer> allocator) throws IOException {
+
+    LOG.debug("directRead: path={}, offset={}, length={}",
+        stream.getPath(), unit.getOffset(), unit.getLength());
+
+    /* Read entire combined range into a temporary buffer */
+    byte[] tmp = new byte[unit.getLength()];
+    TracingContext tracingContext = new 
TracingContext(stream.getTracingContext());
+    tracingContext.setReadType(ReadType.VECTORED_DIRECT_READ);
+
+    int total = 0;
+    int requested = unit.getLength();
+    while (total < requested) {
+      int n = stream.readRemote(unit.getOffset() + total, tmp, total,
+          requested - total, tracingContext);
+      if (n <= 0) {
+        throw new IOException(
+            "Unexpected end of stream during direct read: path=" + 
stream.getPath()
+                + ", offset=" + (unit.getOffset() + total)
+                + ", requested=" + requested);
+      }
+      total += n;
+    }
+
+    LOG.debug("directRead: read complete: path={}, offset={}, bytesRead={}",
+        stream.getPath(), unit.getOffset(), total);
+
+    long unitStart = unit.getOffset();
+    long unitEnd = unitStart + unit.getLength();
+
+    /* Distribute data to each logical FileRange */
+    for (FileRange r : unit.getUnderlying()) {
+      CompletableFuture<ByteBuffer> future = r.getData();
+      if (future == null || future.isDone()) {
+        continue;
+      }
+
+      long rangeStart = r.getOffset();
+      long rangeEnd = rangeStart + r.getLength();
+
+      /* Compute overlap between unit and logical range */
+      long overlapStart = Math.max(rangeStart, unitStart);
+      long overlapEnd = Math.min(rangeEnd, unitEnd);
+      if (overlapStart >= overlapEnd) {
+        continue;
+      }
+
+      int srcOffset = (int) (overlapStart - unitStart);
+      int destOffset = (int) (overlapStart - rangeStart);
+      int length = (int) (overlapEnd - overlapStart);
+
+      LOG.debug("directRead: copying: path={}, rangeOffset={}, rangeLength={},"
+              + " srcOffset={}, destOffset={}, length={}",
+          stream.getPath(), r.getOffset(), r.getLength(),
+          srcOffset, destOffset, length);
+
+      RangeKey key = new RangeKey(r);
+
+      /*
+       * Use the shared partialBuffers/pendingBytes maps so that ranges
+       * spanning multiple chunks are correctly reassembled even when some
+       * chunks are served via directRead and others via the async fanOut path.
+       */
+      ByteBuffer fullBuf = partialBuffers.computeIfAbsent(
+          key, k -> allocator.apply(r.getLength()));
+      AtomicInteger pending = pendingBytes.computeIfAbsent(
+          key, k -> new AtomicInteger(r.getLength()));
+
+      synchronized (fullBuf) {
+        /* Re-check inside lock in case another chunk already completed this 
range */
+        if (future.isDone()) {
+          continue;
+        }
+
+        if (fullBuf.hasArray()) {
+          System.arraycopy(tmp, srcOffset,
+              fullBuf.array(), fullBuf.arrayOffset() + destOffset,
+              length);
+        } else {
+          ByteBuffer dst = fullBuf.duplicate();
+          dst.position(destOffset);
+          dst.put(tmp, srcOffset, length);
+        }
+
+        int left = pending.addAndGet(-length);
+
+        LOG.debug("directRead: wrote chunk: path={}, rangeOffset={}, 
destOffset={},"
+                + " length={}, pendingBytes={}",
+            stream.getPath(), r.getOffset(), destOffset, length, left);
+
+        if (left < 0) {
+          LOG.error("directRead: pending bytes went negative  possible 
duplicate write:"
+                  + " path={}, rangeOffset={}, pending={}",
+              stream.getPath(), r.getOffset(), left);
+          future.completeExceptionally(new IllegalStateException(
+              "Pending bytes negative in directRead for offset=" + 
r.getOffset()));
+          partialBuffers.remove(key);
+          pendingBytes.remove(key);
+          continue;
+        }
+
+        if (left == 0) {
+          fullBuf.position(0);
+          fullBuf.limit(r.getLength());
+          future.complete(fullBuf);
+          partialBuffers.remove(key);
+          pendingBytes.remove(key);
+          LOG.debug("directRead: completed range: path={}, rangeOffset={}, 
rangeLength={}",
+              stream.getPath(), r.getOffset(), r.getLength());
+        }
+      }
+    }
+  }
+
+  /**
+   * Identity-based key wrapper for {@link FileRange}.
+   *
+   * <p>This class ensures that {@link FileRange} instances are compared
+   * using reference equality rather than logical equality. It is used as
+   * a key in maps where multiple ranges may have identical offsets and
+   * lengths but must be treated as distinct objects.</p>
+   *
+   * <p>Equality and hash code are based on object identity
+   * ({@code ==}) via {@link System#identityHashCode(Object)}.</p>
+   */
+  static final class RangeKey {
+    private final FileRange range;
+
+    RangeKey(FileRange range) {
+      this.range = range;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return this == o || (o instanceof RangeKey && this.range == ((RangeKey) 
o).range);
+    }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(range);
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index e663db9bbdd..57a82ee7c40 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -174,7 +174,7 @@ private ReadBufferManager 
getBufferManager(AzureBlobFileSystem fs) {
           getConfiguration());
       return 
ReadBufferManagerV2.getBufferManager(fs.getAbfsStore().getClient().getAbfsCounters());
     }
-    ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
+    ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize, 
fs.getAbfsStore().getClient().getAbfsConfiguration());
     return ReadBufferManagerV1.getBufferManager();
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java
new file mode 100644
index 00000000000..054fb9bbe37
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java
@@ -0,0 +1,703 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.FilterInputStream;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
+import org.apache.hadoop.fs.impl.CombinedFileRange;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_VECTORED_READ_STRATEGY;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+
+public class ITestVectoredRead extends AbstractAbfsIntegrationTest {
+  private static final int DATA_2_MB = 2 * ONE_MB;
+  private static final int DATA_4_MB = 4 * ONE_MB;
+  private static final int DATA_8_MB = 8 * ONE_MB;
+  private static final int DATA_10_MB = 10 * ONE_MB;
+  private static final int DATA_12_MB = 12 * ONE_MB;
+  private static final int DATA_16_MB = 16 * ONE_MB;
+  private static final int DATA_32_MB = 32 * ONE_MB;
+  private static final int DATA_100_MB = 100 * ONE_MB;
+  private static final int OFFSET_100_B = 100;
+  private static final int OFFSET_15K_B = 15_000;
+  private static final int OFFSET_42K_B = 42_500;
+  private static final int LEN_10K_B = 10_000;
+  private static final int LEN_27K_B = 27_000;
+  private static final int LEN_40K_B = 40_000;
+  private static final double MB_1_2 = 1.2;
+  private static final double MB_3_1 = 3.1;
+  private static final double MB_4_1 = 4.1;
+  private static final double MB_6_2 = 6.2;
+  private static final double MB_0_8 = 0.8;
+  private static final double MB_0_9 = 0.9;
+  private static final double MB_1_8 = 1.8;
+  private static final double MB_1_9 = 1.9;
+  private static final double MB_3_8 = 3.8;
+  private static final double MB_4_0 = 4.0;
+  private static final double MB_3_2 = 3.2;
+  private static final double MB_8_0 = 8.0;
+  private static final double MB_2_0 = 2.0;
+  private static final double MB_12_0 = 12.0;
+  private static final double MB_16_0 = 16.0;
+  private static final int HUGE_OFFSET_1 = 5_856_368;
+  private static final int HUGE_OFFSET_2 = 3_520_861;
+  private static final int HUGE_OFFSET_3 = 8_191_913;
+  private static final int HUGE_OFFSET_4 = 1_520_861;
+  private static final int HUGE_OFFSET_5 = 2_520_861;
+  private static final int HUGE_OFFSET_6 = 9_191_913;
+  private static final int HUGE_OFFSET_7 = 2_820_861;
+  private static final int HUGE_RANGE = 116_770;
+  private static final int HUGE_RANGE_LARGE = 156_770;
+  private static final int LOOKUP_RETRIES = 100;
+  private static final int EXEC_THREADS = 3;
+  private static final int SEQ_READ_ITERATIONS = 5;
+  private static final int FUTURE_TIMEOUT_SEC = 50;
+  public static final int SLEEP_TIME = 10;
+  private static final long SHUFFLE_SEED = 12345L;
+  private static final double PERFORMANCE_TOLERANCE_FACTOR = 1.2;
+
+  public ITestVectoredRead() throws Exception {
+  }
+
+  /**
+   * Verifies basic correctness of vectored reads using simple disjoint ranges.
+   * Compares vectored read output against a full sequential read to ensure
+   * data integrity is preserved.
+   */
+  @Test
+  public void testDisjointRangesWithVectoredRead() throws Throwable {
+    int fileSize = ONE_MB;
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName() + 1;
+    byte[] fileContent = getRandomBytesArray(fileSize);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    List<FileRange> rangeList = new ArrayList<>();
+    rangeList.add(FileRange.createFileRange(OFFSET_100_B, LEN_10K_B));
+    rangeList.add(FileRange.createFileRange(OFFSET_15K_B, LEN_27K_B));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+    CompletableFuture<FSDataInputStream> builder = fs.openFile(testFilePath)
+        .build();
+
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(rangeList, allocate);
+      byte[] readFullRes = new byte[(int) fileSize];
+      in.readFully(0, readFullRes);
+      // Comparing vectored read results with read fully.
+      validateVectoredReadResult(rangeList, readFullRes, 0);
+    }
+  }
+
+  /**
+   * Ensures disjoint but mergeable ranges result in fewer backend reads.
+   * Validates that vectored read coalescing reduces remote calls
+   * while still returning correct data.
+   */
+  @Test
+  public void testVectoredReadDisjointRangesExpectTwoBackendReads()
+      throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(DATA_16_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+    List<FileRange> ranges = new ArrayList<>();
+    ranges.add(FileRange.createFileRange(0L, ONE_MB));
+    ranges.add(FileRange.createFileRange((long) (MB_1_2 * ONE_MB),
+        (int) (MB_0_8 * ONE_MB)));
+    ranges.add(FileRange.createFileRange((long) (MB_3_1 * ONE_MB),
+        (int) (MB_0_9 * ONE_MB)));
+    ranges.add(FileRange.createFileRange((long) (MB_4_1 * ONE_MB),
+        (int) (MB_1_9 * ONE_MB)));
+    ranges.add(FileRange.createFileRange((long) (MB_6_2 * ONE_MB),
+        (int) (MB_1_8 * ONE_MB)));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+    try (FSDataInputStream in =
+             fs.openFile(testFilePath).build().get()) {
+      AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+      AbfsInputStream spyIn = Mockito.spy(abfsIn);
+      spyIn.readVectored(ranges, allocate);
+      CompletableFuture<?>[] futures =
+          new CompletableFuture<?>[ranges.size()];
+      int i = 0;
+      for (FileRange range : ranges) {
+        futures[i++] = range.getData();
+      }
+      CompletableFuture.allOf(futures).get();
+      validateVectoredReadResult(ranges, fileContent, 0);
+      Mockito.verify(spyIn, Mockito.times(2))
+          .readRemote(
+              Mockito.anyLong(),
+              Mockito.any(byte[].class),
+              Mockito.anyInt(),
+              Mockito.anyInt(),
+              Mockito.any());
+    }
+  }
+
+  /**
+   * Validates fallback behavior when vectored read queuing fails.
+   * Ensures the implementation switches to direct reads and still
+   * completes all requested ranges correctly.
+   */
+  @Test
+  public void testVectoredReadFallsBackToDirectReadWhenQueuingFails()
+      throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(DATA_4_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    List<FileRange> ranges = List.of(
+        FileRange.createFileRange(0, ONE_MB),
+        FileRange.createFileRange(DATA_2_MB, ONE_MB));
+    IntFunction<ByteBuffer> allocator = ByteBuffer::allocate;
+
+    try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) {
+      AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+      AbfsInputStream spyIn = Mockito.spy(abfsIn);
+      VectoredReadHandler realHandler = abfsIn.getVectoredReadHandler();
+      VectoredReadHandler spyHandler = Mockito.spy(realHandler);
+      Mockito.doReturn(spyHandler).when(spyIn).getVectoredReadHandler();
+      Mockito.doReturn(false)
+          .when(spyHandler)
+          .queueVectoredRead(
+              Mockito.any(AbfsInputStream.class),
+              Mockito.any(CombinedFileRange.class),
+              ArgumentMatchers.<IntFunction<ByteBuffer>>any());
+      spyIn.readVectored(ranges, allocator);
+      CompletableFuture<?>[] futures
+          = new CompletableFuture<?>[ranges.size()];
+      for (int i = 0; i < ranges.size(); i++) {
+        futures[i] = ranges.get(i).getData();
+      }
+      CompletableFuture.allOf(futures).get();
+      Mockito.verify(spyHandler, Mockito.atLeastOnce())
+          .directRead(
+              Mockito.any(AbfsInputStream.class),
+              Mockito.any(CombinedFileRange.class),
+              Mockito.eq(allocator));
+
+      validateVectoredReadResult(ranges, fileContent, 0);
+    }
+  }
+
+  /**
+   * Tests vectored read correctness with multiple non-contiguous ranges.
+   * Confirms that all ranges are read correctly even when more than two
+   * disjoint segments are requested.
+   */
+  @Test
+  public void testMultipleDisjointRangesWithVectoredRead() throws Throwable {
+    int fileSize = ONE_MB;
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName() + 1;
+    byte[] fileContent = getRandomBytesArray(fileSize);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    List<FileRange> ranges = List.of(
+        FileRange.createFileRange(OFFSET_100_B, LEN_10K_B),
+        FileRange.createFileRange(OFFSET_15K_B, LEN_27K_B),
+        FileRange.createFileRange(OFFSET_42K_B, LEN_40K_B));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+    CompletableFuture<FSDataInputStream> builder = fs.openFile(testFilePath)
+        .build();
+
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(ranges, allocate);
+      byte[] readFullRes = new byte[(int) fileSize];
+      in.readFully(0, readFullRes);
+      // Comparing vectored read results with read fully.
+      validateVectoredReadResult(ranges, readFullRes, 0);
+    }
+  }
+
+  /**
+   * Exercises vectored reads on a large file with many scattered ranges.
+   * Ensures correctness and stability of vectored read logic under
+   * high-offset and large-file conditions.
+   */
+  @Test
+  public void testVectoredIOHugeFile() throws Throwable {
+    int fileSize = DATA_100_MB;
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName() + 1;
+    byte[] fileContent = getRandomBytesArray(fileSize);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    List<FileRange> ranges = List.of(
+        FileRange.createFileRange(HUGE_OFFSET_1, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_2, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_3, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_4, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_5, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_6, HUGE_RANGE),
+        FileRange.createFileRange(HUGE_OFFSET_7, HUGE_RANGE_LARGE));
+    IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+
+    CompletableFuture<FSDataInputStream> builder =
+        fs.openFile(testFilePath).build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(ranges, allocate);
+      byte[] readFullRes = new byte[(int) fileSize];
+      in.readFully(0, readFullRes);
+      // Comparing vectored read results with read fully.
+      validateVectoredReadResult(ranges, readFullRes, 0);
+    }
+  }
+
+  /**
+   * Verifies that vectored reads and sequential reads can execute 
concurrently.
+   * Ensures correct behavior when prefetch and vectored I/O overlap in time.
+   */
+  @Test
+  public void testSimultaneousPrefetchAndVectoredRead() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(DATA_16_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+    try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) {
+      AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+      IntFunction<ByteBuffer> allocator = ByteBuffer::allocate;
+      List<FileRange> vRanges = new ArrayList<>();
+      vRanges.add(FileRange.createFileRange(DATA_10_MB, (int) ONE_MB));
+      vRanges.add(FileRange.createFileRange(DATA_12_MB, (int) ONE_MB));
+      byte[] seqBuffer = new byte[(int) ONE_MB];
+      CountDownLatch latch = new CountDownLatch(1);
+      CompletableFuture<Void> vectoredTask = CompletableFuture.runAsync(() -> {
+        try {
+          latch.await();
+          abfsIn.readVectored(vRanges, allocator);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+      CompletableFuture<Void> sequentialTask = CompletableFuture.runAsync(
+          () -> {
+            try {
+              latch.await();
+              abfsIn.read(0, seqBuffer, 0, (int) ONE_MB);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          });
+      latch.countDown();
+      CompletableFuture.allOf(vectoredTask, sequentialTask).get();
+      CompletableFuture<?>[] vFutures = vRanges.stream()
+          .map(FileRange::getData)
+          .toArray(CompletableFuture[]::new);
+      CompletableFuture.allOf(vFutures).get();
+      assertArrayEquals(Arrays.copyOfRange(fileContent, 0, (int) ONE_MB),
+          seqBuffer, "Sequential read data mismatch");
+      validateVectoredReadResult(vRanges, fileContent, 0);
+    }
+  }
+
+  /**
+   * Tests concurrent access using separate streams on different files.
+   * Ensures vectored reads on one file do not interfere with sequential
+   * reads and readahead on another file.
+   */
+  @Test
+  public void testConcurrentStreamsOnDifferentFiles() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    // Create two distinct files with random content
+    byte[] content1 = getRandomBytesArray(DATA_16_MB);
+    byte[] content2 = getRandomBytesArray(DATA_16_MB);
+    Path path1 = createFileWithContent(fs, "file1", content1);
+    Path path2 = createFileWithContent(fs, "file2", content2);
+
+    // Open two separate input streams for concurrent access
+    try (FSDataInputStream in1 = fs.openFile(path1).build().get();
+         FSDataInputStream in2 = fs.openFile(path2).build().get()) {
+
+      AbfsInputStream streamVectored = (AbfsInputStream) 
in1.getWrappedStream();
+      AbfsInputStream streamSequential
+          = (AbfsInputStream) in2.getWrappedStream();
+      IntFunction<ByteBuffer> allocator = ByteBuffer::allocate;
+
+      // Define non-contiguous ranges for the vectored read on file 1
+      List<FileRange> ranges = List.of(
+          FileRange.createFileRange(DATA_2_MB, ONE_MB),
+          FileRange.createFileRange(DATA_4_MB, ONE_MB));
+
+      // Use a latch to ensure both threads start their I/O at the same time
+      CountDownLatch latch = new CountDownLatch(1);
+
+      // Thread 1: Perform asynchronous vectored reads on file 1
+      CompletableFuture<Void> vectoredTask = CompletableFuture.runAsync(() -> {
+        try {
+          latch.await();
+          streamVectored.readVectored(ranges, allocator);
+        } catch (Exception e) {
+          throw new RuntimeException("Vectored read task failed", e);
+        }
+      });
+
+      // Thread 2: Perform multiple sequential reads on file 2 to trigger 
readahead
+      CompletableFuture<Void> sequentialTask = CompletableFuture.runAsync(
+          () -> {
+            try {
+              latch.await();
+              for (int i = 0; i < SEQ_READ_ITERATIONS; i++) {
+                byte[] tempBuf = new byte[(int) ONE_MB];
+                streamSequential.read(i * ONE_MB, tempBuf, 0, (int) ONE_MB);
+                // Validate data integrity for file 2 immediately
+                assertArrayEquals(Arrays.copyOfRange(content2, i * (int) 
ONE_MB,
+                        (i + 1) * (int) ONE_MB), tempBuf,
+                    "Sequential read mismatch in file 2 at block " + i);
+              }
+            } catch (Exception e) {
+              throw new RuntimeException("Sequential read task failed", e);
+            }
+          });
+      // Trigger simultaneous execution
+      latch.countDown();
+      // Wait for both high-level tasks to finish
+      CompletableFuture.allOf(vectoredTask, sequentialTask).get();
+
+      // Explicitly wait for the vectored read futures to complete their data 
transfer
+      CompletableFuture<?>[] vFutures = ranges.stream()
+          .map(FileRange::getData)
+          .toArray(CompletableFuture[]::new);
+      CompletableFuture.allOf(vFutures).get();
+
+      // Final validation of vectored read content for file 1
+      validateVectoredReadResult(ranges, content1, 0);
+    }
+  }
+
+  @Test
+  public void testVectoredReadHitchhikesOnExistingPrefetch() throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(DATA_8_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) {
+      AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+
+      // Since client is final in AbfsInputStream, we cannot inject a spy into 
it.
+      // Instead, spy on abfsIn itself and stub readRemote to delegate to the 
real
+      // implementation while still being tracked by Mockito.
+      AbfsInputStream spyIn = Mockito.spy(abfsIn);
+      Mockito.doCallRealMethod().when(spyIn).readRemote(
+          Mockito.anyLong(),
+          Mockito.any(byte[].class),
+          Mockito.anyInt(),
+          Mockito.anyInt(),
+          Mockito.any());
+
+      // Replace the wrapped stream inside FSDataInputStream with our spy,
+      // so all subsequent calls go through spyIn.
+      Field wrappedField = FilterInputStream.class.getDeclaredField("in");
+      wrappedField.setAccessible(true);
+      wrappedField.set(in, spyIn);
+
+      // 1. Trigger sequential read → starts readahead covering [0, 
readAheadSize).
+      byte[] seqBuf = new byte[1];
+      in.read(seqBuf, 0, 1);
+
+      // 2. Queue a vectored read fully inside the readahead window.
+      List<FileRange> vRanges = new ArrayList<>();
+      vRanges.add(FileRange.createFileRange(ONE_MB, (int) ONE_MB));
+      IntFunction<ByteBuffer> allocator = ByteBuffer::allocate;
+      in.readVectored(vRanges, allocator);
+
+      // 3. Wait for completion.
+      vRanges.get(0).getData().get();
+
+      // 4. Validate data integrity.
+      validateVectoredReadResult(vRanges, fileContent, ZERO);
+
+      // 5. THE CRITICAL VALIDATION:
+      // Max 2 remote reads acceptable:
+      //   - Read #1: readahead triggered by the sequential read
+      //   - Read #2: only if vectored read just missed the prefetch window 
(race edge)
+      // 3+ means hitchhiking is broken — vectored read issued a redundant 
remote fetch.
+      final int maxExpectedRemoteReads = 2;
+      Mockito.verify(spyIn, Mockito.atMost(maxExpectedRemoteReads))
+          .readRemote(
+              Mockito.anyLong(),
+              Mockito.any(byte[].class),
+              Mockito.anyInt(),
+              Mockito.anyInt(),
+              Mockito.any());
+    }
+  }
+
+  @Test
+  public void testMultipleReadsWhileBufferInProgressEventuallyComplete() 
throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(DATA_8_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+    CountDownLatch blockCompletion = new CountDownLatch(1);
+
+    try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) {
+      AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+      AbfsInputStream spyIn = Mockito.spy(abfsIn);
+
+      // Inject spy into FSDataInputStream so all calls go through spyIn,
+      // including those from background threads spawned during read().
+      Field wrappedField = FilterInputStream.class.getDeclaredField("in");
+      wrappedField.setAccessible(true);
+      wrappedField.set(in, spyIn);
+
+      ReadBufferManager rbm = spyIn.getReadBufferManager();
+      AtomicBoolean firstCall = new AtomicBoolean(true);
+
+      Mockito.doAnswer(invocation -> {
+            if (firstCall.getAndSet(false)) {
+              blockCompletion.await();
+            }
+            return invocation.callRealMethod();
+          })
+          .when(spyIn)
+          .readRemote(Mockito.anyLong(), Mockito.any(byte[].class),
+              Mockito.anyInt(), Mockito.anyInt(), Mockito.any());
+
+      ExecutorService exec = Executors.newFixedThreadPool(EXEC_THREADS);
+      try {
+        // r1 triggers a readahead; the first readRemote call will block on
+        // blockCompletion, simulating an in-progress buffer.
+        Future<?> r1 = exec.submit(() -> {
+          try {
+            in.read(new byte[1], 0, 1);  // use 'in', not 'spyIn' directly
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        // Poll until the buffer appears in the inProgressList.
+        ReadBuffer inProgress = null;
+        for (int i = 0; i < LOOKUP_RETRIES; i++) {
+          synchronized (rbm) {
+            inProgress = rbm.findInList(rbm.getInProgressList(), spyIn, 0);
+          }
+          if (inProgress != null) {
+            break;
+          }
+          Thread.sleep(SLEEP_TIME);
+        }
+        assertNotNull(inProgress,
+            "Expected buffer to be in inProgressList while completion is 
blocked");
+
+        // r2 reads the same offset — should wait on the in-progress buffer,
+        // not issue a new remote read.
+        Future<?> r2 = exec.submit(() -> {
+          try {
+            in.read(new byte[1], 0, 1);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        // Vectored read targeting the same in-progress buffer range —
+        // should hitchhike rather than issue a new remote read.
+        long bufferOffset = inProgress.getOffset();
+        int length = (int) Math.min(ONE_MB, DATA_8_MB - bufferOffset);
+        List<FileRange> ranges = new ArrayList<>();
+        ranges.add(FileRange.createFileRange(bufferOffset, length));
+        Future<?> vr = exec.submit(() -> {
+          try {
+            in.readVectored(ranges, ByteBuffer::allocate);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+        // Give r2 and vr time to reach their wait state before unblocking.
+        Thread.sleep(FUTURE_TIMEOUT_SEC);
+        blockCompletion.countDown();
+
+        r1.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS);
+        r2.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS);
+        vr.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS);
+        ranges.get(0).getData().get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS);
+
+        validateVectoredReadResult(ranges, fileContent, bufferOffset);
+      } finally {
+        exec.shutdownNow();
+        // Restore original stream reference to avoid affecting shared state.
+        wrappedField.set(in, abfsIn);
+      }
+    }
+  }
+
+  /**
+   * Verifies vectored read behavior under throughput-optimized strategy.
+   * Confirms that ranges are split as expected and that the number of
+   * backend reads matches the throughput-oriented execution model.
+   */
+  @Test
+  public void testThroughputOptimizedReadVectored() throws Exception {
+    Configuration configuration = getRawConfiguration();
+    configuration.set(FS_AZURE_VECTORED_READ_STRATEGY,
+        VectoredReadStrategy.THROUGHPUT_OPTIMIZED.getName());
+    FileSystem fileSystem = FileSystem.newInstance(configuration);
+    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+      String fileName = methodName.getMethodName();
+      byte[] fileContent = getRandomBytesArray(DATA_32_MB);
+      Path testFilePath = createFileWithContent(abfs, fileName, fileContent);
+      List<FileRange> fileRanges = new ArrayList<>();
+      fileRanges.add(
+          FileRange.createFileRange(0L, (int) (MB_3_8 * ONE_MB)));
+      fileRanges.add(
+          FileRange.createFileRange(
+              (long) (MB_4_0 * ONE_MB), (int) (MB_3_2 * ONE_MB)));
+      fileRanges.add(
+          FileRange.createFileRange(
+              (long) (MB_8_0 * ONE_MB), (int) (MB_2_0 * ONE_MB)));
+      fileRanges.add(
+          FileRange.createFileRange(
+              (long) (MB_12_0 * ONE_MB), (int) (MB_4_0 * ONE_MB)));
+      fileRanges.add(
+          FileRange.createFileRange(
+              (long) (MB_16_0 * ONE_MB), (int) (MB_2_0 * ONE_MB)));
+      IntFunction<ByteBuffer> allocate = ByteBuffer::allocate;
+      try (FSDataInputStream in =
+               abfs.openFile(testFilePath).build().get()) {
+        AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream();
+        AbfsInputStream spyIn = Mockito.spy(abfsIn);
+        spyIn.readVectored(fileRanges, allocate);
+        CompletableFuture<?>[] futures =
+            new CompletableFuture<?>[fileRanges.size()];
+        int i = 0;
+        for (FileRange range : fileRanges) {
+          futures[i++] = range.getData();
+        }
+        CompletableFuture.allOf(futures).get();
+        validateVectoredReadResult(fileRanges, fileContent, 0);
+        Mockito.verify(spyIn, Mockito.times(5))
+            .readRemote(
+                Mockito.anyLong(),
+                Mockito.any(byte[].class),
+                Mockito.anyInt(),
+                Mockito.anyInt(),
+                Mockito.any());
+      }
+    }
+  }
+
+  /**
+   * Compares performance of many random reads using:
+   * 1. Individual random (non-vectored) reads
+   * 2. A single vectored read covering the same offsets
+   *
+   * All non-vectored reads complete first, followed by vectored reads.
+   * This is a relative performance test intended to catch regressions,
+   * not to assert absolute timing guarantees.
+   */
+  @Test
+  public void testRandomReadsNonVectoredThenVectoredPerformance()
+      throws Exception {
+    final AzureBlobFileSystem fs = getFileSystem();
+    final int fileSize = 128 * ONE_MB;
+    final int readSize = 64 * ONE_KB;
+    final int readCount = 512;
+    byte[] fileContent = getRandomBytesArray(fileSize);
+    Path testPath =
+        createFileWithContent(fs, methodName.getMethodName(), fileContent);
+    /* ----------------------------------------------------
+     * Generate NON-overlapping offsets (shuffled)
+     * ---------------------------------------------------- */
+    List<Long> offsets = new ArrayList<>();
+    for (long offset = 0; offset + readSize <= fileSize; offset += readSize) {
+      offsets.add(offset);
+    }
+    // Shuffle to simulate randomness without overlap
+    Collections.shuffle(offsets, new Random(SHUFFLE_SEED));
+    // Limit to readCount
+    offsets = offsets.subList(0, Math.min(readCount, offsets.size()));
+
+    /* ----------------------------------------------------
+     * Build vectored ranges
+     * ---------------------------------------------------- */
+    List<FileRange> vectoredRanges = new ArrayList<>();
+    for (long offset : offsets) {
+      vectoredRanges.add(
+          FileRange.createFileRange(offset, readSize));
+    }
+    try (FSDataInputStream in = fs.openFile(testPath).build().get()) {
+
+      /* ----------------------------------------------------
+       * Phase 1: Random non-vectored reads
+       * ---------------------------------------------------- */
+      byte[] buffer = new byte[readSize];
+      long nonVectoredStartNs = System.nanoTime();
+      for (long offset : offsets) {
+        in.seek(offset);
+        in.read(buffer, 0, readSize);
+      }
+      long nonVectoredTimeNs =
+          System.nanoTime() - nonVectoredStartNs;
+      /* ----------------------------------------------------
+       * Phase 2: Vectored reads
+       * ---------------------------------------------------- */
+      long vectoredStartNs = System.nanoTime();
+      in.readVectored(vectoredRanges, ByteBuffer::allocate);
+      CompletableFuture.allOf(
+          vectoredRanges.stream()
+              .map(FileRange::getData)
+              .toArray(CompletableFuture[]::new)
+      ).get();
+      long vectoredTimeNs =
+          System.nanoTime() - vectoredStartNs;
+      /* ----------------------------------------------------
+       * Assertion (less flaky)
+       * ---------------------------------------------------- */
+      assertTrue(
+          vectoredTimeNs <= nonVectoredTimeNs * PERFORMANCE_TOLERANCE_FACTOR,
+          String.format(
+              "Vectored read slower: vectored=%d ns, non-vectored=%d ns",
+              vectoredTimeNs, nonVectoredTimeNs)
+      );
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to