kerneltime commented on code in PR #7550:
URL: https://github.com/apache/ozone/pull/7550#discussion_r2153471570


##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -4218,6 +4218,15 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.allocate.block.cache.enabled</name>
+    <value>false</value>

Review Comment:
   I think we should enable it by default. Any optinions?



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.

Review Comment:
   ```suggestion
      * Enum representing the replication configurations that are supported by 
default. Clients requesting replication configurations outside of these will 
not benefit from the cache.
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {
+    RATIS_THREE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE)),
+
+    RATIS_ONE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE)),
+
+    RS_3_2_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(3, 2, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    RS_6_3_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(6, 3, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    XOR_10_4_4096(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(10, 4, ECReplicationConfig.EcCodec.XOR, 4096)));
+
+    private final ReplicationConfig config;
+
+    TestedReplicationConfig(ReplicationConfig config) {
+      this.config = config;
+    }
+
+    public ReplicationConfig getConfig() {
+      return config;
+    }
+
+    public static HddsProtos.ECReplicationConfig toProto(int data, int parity, 
ECReplicationConfig.EcCodec codec,
+                                                         int ecChunkSize) {
+      return HddsProtos.ECReplicationConfig.newBuilder()
+          .setData(data)
+          .setParity(parity)
+          .setCodec(codec.toString())
+          .setEcChunkSize(ecChunkSize)
+          .build();
+    }
+  }
+
+  private void initializeBlockQueueMap() {
+    for (TestedReplicationConfig config : TestedReplicationConfig.values()) {
+      blockQueueMap.put(config.getConfig(), new ConcurrentLinkedDeque<>());
+    }
+  }
+
+  /**
+   * Tracks an allocated block to be cached with its cache expiry.
+   */
+  public static final class ExpiringAllocatedBlock {
+    private final AllocatedBlock block;
+    private final long expiryTime;
+
+    public ExpiringAllocatedBlock(AllocatedBlock block, long expiryTime) {
+      this.block = block;
+      this.expiryTime = expiryTime;
+    }
+
+    public AllocatedBlock getBlock() {
+      return block;
+    }
+
+    public long getExpiryTime() {
+      return expiryTime;
+    }
+  }
+
+  public void start(ConfigurationSource conf) throws IOException, 
InterruptedException, TimeoutException {
+    maxBlocks = conf.getInt(OZONE_OM_PREFETCH_MAX_BLOCKS, 
OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT);
+    minBlocks = conf.getInt(OZONE_OM_PREFETCH_MIN_BLOCKS, 
OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT);
+    expiryDuration = 
conf.getTimeDuration(OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL,
+        OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        conf.getClass(ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, OzoneConfiguration.of(conf));
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
+    metrics = OMBlockPrefetchMetrics.register();
+    prefetchExecutor = Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "OMBlockPrefetchClient-AsyncPrefetcher");
+      t.setDaemon(true);
+      return t;
+    });
+    LOG.info("OMBlockPrefetchClient started with minBlocks={}, maxBlocks={}, 
expiryDuration={}ms. Prefetch executor " +
+            "initialized.", minBlocks, maxBlocks, expiryDuration);
+  }
+
+  public void stop() {
+    if (prefetchExecutor != null) {
+      prefetchExecutor.shutdown();
+      try {
+        if (!prefetchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          prefetchExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    OMBlockPrefetchMetrics.unregister();
+  }
+
+  @SuppressWarnings("parameternumber")
+  public List<AllocatedBlock> getBlocks(long scmBlockSize, int numBlocks, 
ReplicationConfig replicationConfig,
+                                        String serviceID, ExcludeList 
excludeList, String clientMachine,
+                                        NetworkTopology clusterMap) throws 
IOException {
+    if (isAllocateBlockCacheEnabled) {
+      long readStartTime = Time.monotonicNowNanos();
+      List<AllocatedBlock> allocatedBlocks = new ArrayList<>();
+      ConcurrentLinkedDeque<ExpiringAllocatedBlock> queue = 
blockQueueMap.get(replicationConfig);
+
+      // We redirect to the allocateBlock RPC call to SCM when we encounter an 
untested ReplicationConfig or a populated
+      // ExcludeList, otherwise we return blocks from cache.
+      if (queue != null && excludeList.isEmpty()) {
+        List<ExpiringAllocatedBlock> tempValidBlocks = new ArrayList<>();
+        long now = System.currentTimeMillis();
+        while (tempValidBlocks.size() < numBlocks) {
+          ExpiringAllocatedBlock expiringBlock = queue.poll();
+          if (expiringBlock == null) {
+            break;
+          }
+
+          if (now > expiringBlock.getExpiryTime()) {
+            continue;
+          }
+
+          tempValidBlocks.add(expiringBlock);
+        }
+
+        // If there aren't enough blocks in cache, we fallback to SCM.

Review Comment:
   This logic can be improved. We would want to use the partial list of blocks 
from the cache as they are closer to expiration than the freshly acquired 
blocks from SCM. We should use the locally found blocks, add the additional 
blocks needed from SCM and the remaining we can put in the cache.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {
+    RATIS_THREE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE)),
+
+    RATIS_ONE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE)),
+
+    RS_3_2_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(3, 2, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    RS_6_3_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(6, 3, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    XOR_10_4_4096(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(10, 4, ECReplicationConfig.EcCodec.XOR, 4096)));
+
+    private final ReplicationConfig config;
+
+    TestedReplicationConfig(ReplicationConfig config) {
+      this.config = config;
+    }
+
+    public ReplicationConfig getConfig() {
+      return config;
+    }
+
+    public static HddsProtos.ECReplicationConfig toProto(int data, int parity, 
ECReplicationConfig.EcCodec codec,
+                                                         int ecChunkSize) {
+      return HddsProtos.ECReplicationConfig.newBuilder()
+          .setData(data)
+          .setParity(parity)
+          .setCodec(codec.toString())
+          .setEcChunkSize(ecChunkSize)
+          .build();
+    }
+  }
+
+  private void initializeBlockQueueMap() {
+    for (TestedReplicationConfig config : TestedReplicationConfig.values()) {
+      blockQueueMap.put(config.getConfig(), new ConcurrentLinkedDeque<>());
+    }
+  }
+
+  /**
+   * Tracks an allocated block to be cached with its cache expiry.
+   */
+  public static final class ExpiringAllocatedBlock {
+    private final AllocatedBlock block;
+    private final long expiryTime;
+
+    public ExpiringAllocatedBlock(AllocatedBlock block, long expiryTime) {
+      this.block = block;
+      this.expiryTime = expiryTime;
+    }
+
+    public AllocatedBlock getBlock() {
+      return block;
+    }
+
+    public long getExpiryTime() {
+      return expiryTime;
+    }
+  }
+
+  public void start(ConfigurationSource conf) throws IOException, 
InterruptedException, TimeoutException {
+    maxBlocks = conf.getInt(OZONE_OM_PREFETCH_MAX_BLOCKS, 
OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT);
+    minBlocks = conf.getInt(OZONE_OM_PREFETCH_MIN_BLOCKS, 
OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT);
+    expiryDuration = 
conf.getTimeDuration(OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL,
+        OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =

Review Comment:
   This code needs a java doc as to what is going on. Seeing 
`DNSToSwitchMapping` for some one who is not aware of this code can be 
confusing.



##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -4218,6 +4218,15 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.allocate.block.cache.enabled</name>
+    <value>false</value>
+    <tag>OZONE, OM, SCM</tag>

Review Comment:
   Should this be just OM?



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {
+    RATIS_THREE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE)),
+
+    RATIS_ONE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE)),
+
+    RS_3_2_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(3, 2, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    RS_6_3_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(6, 3, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    XOR_10_4_4096(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(10, 4, ECReplicationConfig.EcCodec.XOR, 4096)));
+
+    private final ReplicationConfig config;
+
+    TestedReplicationConfig(ReplicationConfig config) {
+      this.config = config;
+    }
+
+    public ReplicationConfig getConfig() {
+      return config;
+    }
+
+    public static HddsProtos.ECReplicationConfig toProto(int data, int parity, 
ECReplicationConfig.EcCodec codec,
+                                                         int ecChunkSize) {
+      return HddsProtos.ECReplicationConfig.newBuilder()
+          .setData(data)
+          .setParity(parity)
+          .setCodec(codec.toString())
+          .setEcChunkSize(ecChunkSize)
+          .build();
+    }
+  }
+
+  private void initializeBlockQueueMap() {
+    for (TestedReplicationConfig config : TestedReplicationConfig.values()) {
+      blockQueueMap.put(config.getConfig(), new ConcurrentLinkedDeque<>());
+    }
+  }
+
+  /**
+   * Tracks an allocated block to be cached with its cache expiry.
+   */
+  public static final class ExpiringAllocatedBlock {
+    private final AllocatedBlock block;
+    private final long expiryTime;
+
+    public ExpiringAllocatedBlock(AllocatedBlock block, long expiryTime) {
+      this.block = block;
+      this.expiryTime = expiryTime;
+    }
+
+    public AllocatedBlock getBlock() {
+      return block;
+    }
+
+    public long getExpiryTime() {
+      return expiryTime;
+    }
+  }
+
+  public void start(ConfigurationSource conf) throws IOException, 
InterruptedException, TimeoutException {
+    maxBlocks = conf.getInt(OZONE_OM_PREFETCH_MAX_BLOCKS, 
OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT);
+    minBlocks = conf.getInt(OZONE_OM_PREFETCH_MIN_BLOCKS, 
OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT);
+    expiryDuration = 
conf.getTimeDuration(OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL,
+        OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        conf.getClass(ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, OzoneConfiguration.of(conf));
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
+    metrics = OMBlockPrefetchMetrics.register();
+    prefetchExecutor = Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "OMBlockPrefetchClient-AsyncPrefetcher");
+      t.setDaemon(true);
+      return t;
+    });
+    LOG.info("OMBlockPrefetchClient started with minBlocks={}, maxBlocks={}, 
expiryDuration={}ms. Prefetch executor " +
+            "initialized.", minBlocks, maxBlocks, expiryDuration);
+  }
+
+  public void stop() {
+    if (prefetchExecutor != null) {
+      prefetchExecutor.shutdown();
+      try {
+        if (!prefetchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          prefetchExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    OMBlockPrefetchMetrics.unregister();
+  }
+
+  @SuppressWarnings("parameternumber")
+  public List<AllocatedBlock> getBlocks(long scmBlockSize, int numBlocks, 
ReplicationConfig replicationConfig,
+                                        String serviceID, ExcludeList 
excludeList, String clientMachine,
+                                        NetworkTopology clusterMap) throws 
IOException {
+    if (isAllocateBlockCacheEnabled) {
+      long readStartTime = Time.monotonicNowNanos();
+      List<AllocatedBlock> allocatedBlocks = new ArrayList<>();
+      ConcurrentLinkedDeque<ExpiringAllocatedBlock> queue = 
blockQueueMap.get(replicationConfig);
+
+      // We redirect to the allocateBlock RPC call to SCM when we encounter an 
untested ReplicationConfig or a populated
+      // ExcludeList, otherwise we return blocks from cache.
+      if (queue != null && excludeList.isEmpty()) {
+        List<ExpiringAllocatedBlock> tempValidBlocks = new ArrayList<>();
+        long now = System.currentTimeMillis();
+        while (tempValidBlocks.size() < numBlocks) {
+          ExpiringAllocatedBlock expiringBlock = queue.poll();
+          if (expiringBlock == null) {
+            break;
+          }
+
+          if (now > expiringBlock.getExpiryTime()) {
+            continue;
+          }
+
+          tempValidBlocks.add(expiringBlock);
+        }
+
+        // If there aren't enough blocks in cache, we fallback to SCM.
+        if (tempValidBlocks.size() < numBlocks) {
+          List<AllocatedBlock> newBlocks = 
scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks,
+              replicationConfig, serviceID, excludeList, clientMachine);
+          allocatedBlocks.addAll(newBlocks);
+
+          // Return unused valid blocks back to the front of the queue 
(preserving original order).
+          for (int i = tempValidBlocks.size() - 1; i >= 0; i--) {
+            queue.addFirst(tempValidBlocks.get(i));
+          }
+          metrics.incrementCacheMisses();
+        } else {
+          for (ExpiringAllocatedBlock expiringBlock : tempValidBlocks) {
+            AllocatedBlock block = expiringBlock.getBlock();
+            List<DatanodeDetails> sortedNodes =
+                sortDatanodes(block.getPipeline().getNodes(), clientMachine, 
clusterMap);
+            if (!Objects.equals(sortedNodes, 
block.getPipeline().getNodesInOrder())) {
+              block = block.toBuilder()
+                  
.setPipeline(block.getPipeline().copyWithNodesInOrder(sortedNodes))
+                  .build();
+            }
+            allocatedBlocks.add(block);
+          }
+          metrics.incrementCacheHits();
+        }
+
+        int queueSize = queue.size();
+        if (queueSize < minBlocks) {
+          int blocksToPrefetch = maxBlocks - queueSize;
+          LOG.debug(
+              "Cache for {} is below threshold (size: {}, min: {}). Submitting 
async prefetch task for {} blocks.",
+              replicationConfig, queueSize, minBlocks, blocksToPrefetch);
+          submitPrefetchTask(scmBlockSize, blocksToPrefetch, 
replicationConfig, serviceID);
+        }
+
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        return allocatedBlocks;
+
+      } else {
+        LOG.debug("Bypassing cache for {}. Reason: {}", replicationConfig, 
queue == null ?
+            "Unsupported replication config for caching." : "ExcludeList 
provided.");
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        metrics.incrementCacheMisses();
+        return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID,
+            excludeList, clientMachine);
+      }
+    } else {
+      return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID, excludeList,
+          clientMachine);
+    }
+  }
+
+  private void submitPrefetchTask(long blockSize, int blocksToPrefetch, 
ReplicationConfig repConfig, String serviceID) {
+
+    if (!isPrefetching.compareAndSet(false, true)) {
+      LOG.debug("Prefetch already in progress. Skipping new task for {}.", 
repConfig);
+      return;
+    }
+
+    if (prefetchExecutor == null || prefetchExecutor.isShutdown()) {
+      LOG.warn("Async prefetch executor is not running or shutdown. Skipping 
prefetch task for {}.", repConfig);
+      return;
+    }
+
+    prefetchExecutor.submit(() -> {
+      try {
+        List<AllocatedBlock> prefetchedBlocks = 
captureLatencyNs(metrics.getPrefetchLatencyNs(),
+            () -> scmBlockLocationProtocol.allocateBlock(blockSize, 
blocksToPrefetch,
+            repConfig, serviceID, (ExcludeList) Collections.emptyList(), 
null));

Review Comment:
   ```suggestion
               repConfig, serviceID, new ExcludeList(), null));
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {

Review Comment:
   ```suggestion
     public enum DefaultReplicationConfigsSupported {
   ```
   



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {
+    RATIS_THREE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE)),
+
+    RATIS_ONE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE)),
+
+    RS_3_2_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(3, 2, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    RS_6_3_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(6, 3, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    XOR_10_4_4096(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(10, 4, ECReplicationConfig.EcCodec.XOR, 4096)));
+
+    private final ReplicationConfig config;
+
+    TestedReplicationConfig(ReplicationConfig config) {
+      this.config = config;
+    }
+
+    public ReplicationConfig getConfig() {
+      return config;
+    }
+
+    public static HddsProtos.ECReplicationConfig toProto(int data, int parity, 
ECReplicationConfig.EcCodec codec,
+                                                         int ecChunkSize) {
+      return HddsProtos.ECReplicationConfig.newBuilder()
+          .setData(data)
+          .setParity(parity)
+          .setCodec(codec.toString())
+          .setEcChunkSize(ecChunkSize)
+          .build();
+    }
+  }
+
+  private void initializeBlockQueueMap() {
+    for (TestedReplicationConfig config : TestedReplicationConfig.values()) {
+      blockQueueMap.put(config.getConfig(), new ConcurrentLinkedDeque<>());
+    }
+  }
+
+  /**
+   * Tracks an allocated block to be cached with its cache expiry.
+   */
+  public static final class ExpiringAllocatedBlock {
+    private final AllocatedBlock block;
+    private final long expiryTime;
+
+    public ExpiringAllocatedBlock(AllocatedBlock block, long expiryTime) {
+      this.block = block;
+      this.expiryTime = expiryTime;
+    }
+
+    public AllocatedBlock getBlock() {
+      return block;
+    }
+
+    public long getExpiryTime() {
+      return expiryTime;
+    }
+  }
+
+  public void start(ConfigurationSource conf) throws IOException, 
InterruptedException, TimeoutException {
+    maxBlocks = conf.getInt(OZONE_OM_PREFETCH_MAX_BLOCKS, 
OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT);
+    minBlocks = conf.getInt(OZONE_OM_PREFETCH_MIN_BLOCKS, 
OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT);
+    expiryDuration = 
conf.getTimeDuration(OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL,
+        OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        conf.getClass(ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, OzoneConfiguration.of(conf));
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
+    metrics = OMBlockPrefetchMetrics.register();
+    prefetchExecutor = Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "OMBlockPrefetchClient-AsyncPrefetcher");
+      t.setDaemon(true);
+      return t;
+    });
+    LOG.info("OMBlockPrefetchClient started with minBlocks={}, maxBlocks={}, 
expiryDuration={}ms. Prefetch executor " +
+            "initialized.", minBlocks, maxBlocks, expiryDuration);
+  }
+
+  public void stop() {
+    if (prefetchExecutor != null) {
+      prefetchExecutor.shutdown();
+      try {
+        if (!prefetchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          prefetchExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    OMBlockPrefetchMetrics.unregister();
+  }
+
+  @SuppressWarnings("parameternumber")
+  public List<AllocatedBlock> getBlocks(long scmBlockSize, int numBlocks, 
ReplicationConfig replicationConfig,
+                                        String serviceID, ExcludeList 
excludeList, String clientMachine,
+                                        NetworkTopology clusterMap) throws 
IOException {
+    if (isAllocateBlockCacheEnabled) {
+      long readStartTime = Time.monotonicNowNanos();
+      List<AllocatedBlock> allocatedBlocks = new ArrayList<>();
+      ConcurrentLinkedDeque<ExpiringAllocatedBlock> queue = 
blockQueueMap.get(replicationConfig);
+
+      // We redirect to the allocateBlock RPC call to SCM when we encounter an 
untested ReplicationConfig or a populated
+      // ExcludeList, otherwise we return blocks from cache.
+      if (queue != null && excludeList.isEmpty()) {
+        List<ExpiringAllocatedBlock> tempValidBlocks = new ArrayList<>();
+        long now = System.currentTimeMillis();
+        while (tempValidBlocks.size() < numBlocks) {
+          ExpiringAllocatedBlock expiringBlock = queue.poll();
+          if (expiringBlock == null) {
+            break;
+          }
+
+          if (now > expiringBlock.getExpiryTime()) {
+            continue;
+          }
+
+          tempValidBlocks.add(expiringBlock);
+        }
+
+        // If there aren't enough blocks in cache, we fallback to SCM.
+        if (tempValidBlocks.size() < numBlocks) {
+          List<AllocatedBlock> newBlocks = 
scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks,
+              replicationConfig, serviceID, excludeList, clientMachine);
+          allocatedBlocks.addAll(newBlocks);
+
+          // Return unused valid blocks back to the front of the queue 
(preserving original order).
+          for (int i = tempValidBlocks.size() - 1; i >= 0; i--) {
+            queue.addFirst(tempValidBlocks.get(i));
+          }
+          metrics.incrementCacheMisses();
+        } else {
+          for (ExpiringAllocatedBlock expiringBlock : tempValidBlocks) {
+            AllocatedBlock block = expiringBlock.getBlock();
+            List<DatanodeDetails> sortedNodes =
+                sortDatanodes(block.getPipeline().getNodes(), clientMachine, 
clusterMap);
+            if (!Objects.equals(sortedNodes, 
block.getPipeline().getNodesInOrder())) {
+              block = block.toBuilder()
+                  
.setPipeline(block.getPipeline().copyWithNodesInOrder(sortedNodes))
+                  .build();
+            }
+            allocatedBlocks.add(block);
+          }
+          metrics.incrementCacheHits();
+        }
+
+        int queueSize = queue.size();
+        if (queueSize < minBlocks) {
+          int blocksToPrefetch = maxBlocks - queueSize;
+          LOG.debug(
+              "Cache for {} is below threshold (size: {}, min: {}). Submitting 
async prefetch task for {} blocks.",
+              replicationConfig, queueSize, minBlocks, blocksToPrefetch);
+          submitPrefetchTask(scmBlockSize, blocksToPrefetch, 
replicationConfig, serviceID);
+        }
+
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        return allocatedBlocks;
+
+      } else {
+        LOG.debug("Bypassing cache for {}. Reason: {}", replicationConfig, 
queue == null ?
+            "Unsupported replication config for caching." : "ExcludeList 
provided.");
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        metrics.incrementCacheMisses();
+        return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID,
+            excludeList, clientMachine);
+      }
+    } else {
+      return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID, excludeList,
+          clientMachine);
+    }
+  }
+
+  private void submitPrefetchTask(long blockSize, int blocksToPrefetch, 
ReplicationConfig repConfig, String serviceID) {
+
+    if (!isPrefetching.compareAndSet(false, true)) {

Review Comment:
   We need to account for replication configuration for pre fetching. Not all 
pre fetching tasks are the same. 



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/client/OMBlockPrefetchClient.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.client;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODE_COST_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT;
+import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.net.InnerNode;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.net.NodeImpl;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.TableMapping;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OMBlockPrefetchClient manages block prefetching for efficient write 
operations.
+ * It maintains a queue of allocated blocks per replication configuration and 
removes expired blocks lazily.
+ * The client refills the queue in the background to handle high-throughput 
scenarios efficiently.
+ */
+public class OMBlockPrefetchClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OMBlockPrefetchClient.class);
+  private final ScmBlockLocationProtocol scmBlockLocationProtocol;
+  private int maxBlocks;
+  private int minBlocks;
+  private final boolean isAllocateBlockCacheEnabled;
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private final Map<ReplicationConfig, 
ConcurrentLinkedDeque<ExpiringAllocatedBlock>> blockQueueMap =
+      new ConcurrentHashMap<>();
+  private long expiryDuration;
+  private OMBlockPrefetchMetrics metrics;
+  private ExecutorService prefetchExecutor;
+  private final AtomicBoolean isPrefetching = new AtomicBoolean(false);
+
+  public OMBlockPrefetchClient(ScmBlockLocationProtocol scmBlockClient, 
boolean isAllocateBlockCacheEnabled) {
+    this.scmBlockLocationProtocol = scmBlockClient;
+    this.isAllocateBlockCacheEnabled = isAllocateBlockCacheEnabled;
+    initializeBlockQueueMap();
+  }
+
+  /**
+   * Enum representing the replication configurations that are tested.
+   */
+  public enum TestedReplicationConfig {
+    RATIS_THREE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE)),
+
+    RATIS_ONE(ReplicationConfig.fromProtoTypeAndFactor(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE)),
+
+    RS_3_2_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(3, 2, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    RS_6_3_1024(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(6, 3, ECReplicationConfig.EcCodec.RS, 1024))),
+
+    XOR_10_4_4096(ReplicationConfig.fromProto(
+        HddsProtos.ReplicationType.EC, null,
+        toProto(10, 4, ECReplicationConfig.EcCodec.XOR, 4096)));
+
+    private final ReplicationConfig config;
+
+    TestedReplicationConfig(ReplicationConfig config) {
+      this.config = config;
+    }
+
+    public ReplicationConfig getConfig() {
+      return config;
+    }
+
+    public static HddsProtos.ECReplicationConfig toProto(int data, int parity, 
ECReplicationConfig.EcCodec codec,
+                                                         int ecChunkSize) {
+      return HddsProtos.ECReplicationConfig.newBuilder()
+          .setData(data)
+          .setParity(parity)
+          .setCodec(codec.toString())
+          .setEcChunkSize(ecChunkSize)
+          .build();
+    }
+  }
+
+  private void initializeBlockQueueMap() {
+    for (TestedReplicationConfig config : TestedReplicationConfig.values()) {
+      blockQueueMap.put(config.getConfig(), new ConcurrentLinkedDeque<>());
+    }
+  }
+
+  /**
+   * Tracks an allocated block to be cached with its cache expiry.
+   */
+  public static final class ExpiringAllocatedBlock {
+    private final AllocatedBlock block;
+    private final long expiryTime;
+
+    public ExpiringAllocatedBlock(AllocatedBlock block, long expiryTime) {
+      this.block = block;
+      this.expiryTime = expiryTime;
+    }
+
+    public AllocatedBlock getBlock() {
+      return block;
+    }
+
+    public long getExpiryTime() {
+      return expiryTime;
+    }
+  }
+
+  public void start(ConfigurationSource conf) throws IOException, 
InterruptedException, TimeoutException {
+    maxBlocks = conf.getInt(OZONE_OM_PREFETCH_MAX_BLOCKS, 
OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT);
+    minBlocks = conf.getInt(OZONE_OM_PREFETCH_MIN_BLOCKS, 
OZONE_OM_PREFETCH_MIN_BLOCKS_DEFAULT);
+    expiryDuration = 
conf.getTimeDuration(OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL,
+        OZONE_OM_PREFETCHED_BLOCKS_EXPIRY_INTERVAL_DEFAULT, 
TimeUnit.MILLISECONDS);
+
+    Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
+        conf.getClass(ScmConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            TableMapping.class, DNSToSwitchMapping.class);
+    DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
+        dnsToSwitchMappingClass, OzoneConfiguration.of(conf));
+    dnsToSwitchMapping =
+        ((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
+            : new CachedDNSToSwitchMapping(newInstance));
+    metrics = OMBlockPrefetchMetrics.register();
+    prefetchExecutor = Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "OMBlockPrefetchClient-AsyncPrefetcher");
+      t.setDaemon(true);
+      return t;
+    });
+    LOG.info("OMBlockPrefetchClient started with minBlocks={}, maxBlocks={}, 
expiryDuration={}ms. Prefetch executor " +
+            "initialized.", minBlocks, maxBlocks, expiryDuration);
+  }
+
+  public void stop() {
+    if (prefetchExecutor != null) {
+      prefetchExecutor.shutdown();
+      try {
+        if (!prefetchExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          prefetchExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while shutting down executor service.", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    OMBlockPrefetchMetrics.unregister();
+  }
+
+  @SuppressWarnings("parameternumber")
+  public List<AllocatedBlock> getBlocks(long scmBlockSize, int numBlocks, 
ReplicationConfig replicationConfig,
+                                        String serviceID, ExcludeList 
excludeList, String clientMachine,
+                                        NetworkTopology clusterMap) throws 
IOException {
+    if (isAllocateBlockCacheEnabled) {
+      long readStartTime = Time.monotonicNowNanos();
+      List<AllocatedBlock> allocatedBlocks = new ArrayList<>();
+      ConcurrentLinkedDeque<ExpiringAllocatedBlock> queue = 
blockQueueMap.get(replicationConfig);
+
+      // We redirect to the allocateBlock RPC call to SCM when we encounter an 
untested ReplicationConfig or a populated
+      // ExcludeList, otherwise we return blocks from cache.
+      if (queue != null && excludeList.isEmpty()) {
+        List<ExpiringAllocatedBlock> tempValidBlocks = new ArrayList<>();
+        long now = System.currentTimeMillis();
+        while (tempValidBlocks.size() < numBlocks) {
+          ExpiringAllocatedBlock expiringBlock = queue.poll();
+          if (expiringBlock == null) {
+            break;
+          }
+
+          if (now > expiringBlock.getExpiryTime()) {
+            continue;
+          }
+
+          tempValidBlocks.add(expiringBlock);
+        }
+
+        // If there aren't enough blocks in cache, we fallback to SCM.
+        if (tempValidBlocks.size() < numBlocks) {
+          List<AllocatedBlock> newBlocks = 
scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks,
+              replicationConfig, serviceID, excludeList, clientMachine);
+          allocatedBlocks.addAll(newBlocks);
+
+          // Return unused valid blocks back to the front of the queue 
(preserving original order).
+          for (int i = tempValidBlocks.size() - 1; i >= 0; i--) {
+            queue.addFirst(tempValidBlocks.get(i));
+          }
+          metrics.incrementCacheMisses();
+        } else {
+          for (ExpiringAllocatedBlock expiringBlock : tempValidBlocks) {
+            AllocatedBlock block = expiringBlock.getBlock();
+            List<DatanodeDetails> sortedNodes =
+                sortDatanodes(block.getPipeline().getNodes(), clientMachine, 
clusterMap);
+            if (!Objects.equals(sortedNodes, 
block.getPipeline().getNodesInOrder())) {
+              block = block.toBuilder()
+                  
.setPipeline(block.getPipeline().copyWithNodesInOrder(sortedNodes))
+                  .build();
+            }
+            allocatedBlocks.add(block);
+          }
+          metrics.incrementCacheHits();
+        }
+
+        int queueSize = queue.size();
+        if (queueSize < minBlocks) {
+          int blocksToPrefetch = maxBlocks - queueSize;
+          LOG.debug(
+              "Cache for {} is below threshold (size: {}, min: {}). Submitting 
async prefetch task for {} blocks.",
+              replicationConfig, queueSize, minBlocks, blocksToPrefetch);
+          submitPrefetchTask(scmBlockSize, blocksToPrefetch, 
replicationConfig, serviceID);
+        }
+
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        return allocatedBlocks;
+
+      } else {
+        LOG.debug("Bypassing cache for {}. Reason: {}", replicationConfig, 
queue == null ?
+            "Unsupported replication config for caching." : "ExcludeList 
provided.");
+        metrics.addReadFromQueueLatency(Time.monotonicNowNanos() - 
readStartTime);
+        metrics.incrementCacheMisses();
+        return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID,
+            excludeList, clientMachine);
+      }
+    } else {
+      return scmBlockLocationProtocol.allocateBlock(scmBlockSize, numBlocks, 
replicationConfig, serviceID, excludeList,
+          clientMachine);
+    }
+  }
+
+  private void submitPrefetchTask(long blockSize, int blocksToPrefetch, 
ReplicationConfig repConfig, String serviceID) {
+
+    if (!isPrefetching.compareAndSet(false, true)) {
+      LOG.debug("Prefetch already in progress. Skipping new task for {}.", 
repConfig);
+      return;
+    }
+
+    if (prefetchExecutor == null || prefetchExecutor.isShutdown()) {
+      LOG.warn("Async prefetch executor is not running or shutdown. Skipping 
prefetch task for {}.", repConfig);
+      return;
+    }
+
+    prefetchExecutor.submit(() -> {
+      try {
+        List<AllocatedBlock> prefetchedBlocks = 
captureLatencyNs(metrics.getPrefetchLatencyNs(),
+            () -> scmBlockLocationProtocol.allocateBlock(blockSize, 
blocksToPrefetch,
+            repConfig, serviceID, (ExcludeList) Collections.emptyList(), 
null));
+        if (prefetchedBlocks != null && !prefetchedBlocks.isEmpty()) {
+          ConcurrentLinkedDeque<ExpiringAllocatedBlock> queue = 
blockQueueMap.get(repConfig);
+          if (queue != null) {
+            long expiryTime = System.currentTimeMillis() + expiryDuration;
+            int addedCount = 0;
+            for (AllocatedBlock block : prefetchedBlocks) {
+              if (queue.size() < maxBlocks) {
+                queue.offer(new ExpiringAllocatedBlock(block, expiryTime));
+                addedCount++;
+              }
+            }
+            LOG.debug("Prefetched {} blocks for replication config {}.", 
addedCount, repConfig);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Exception occurred while prefetching blocks.", e);
+      } finally {
+        isPrefetching.set(false);
+      }
+    });
+  }
+
+  public List<DatanodeDetails> sortDatanodes(List<DatanodeDetails> nodes, 
String clientMachine,
+                                             NetworkTopology clusterMap) {
+    long sortStartTime = Time.monotonicNowNanos();
+    final Node client = getClientNode(clientMachine, nodes, clusterMap);
+    List<DatanodeDetails> sortedNodes = clusterMap.sortByDistanceCost(client, 
nodes, nodes.size());
+    metrics.addSortingLogicLatency(Time.monotonicNowNanos() - sortStartTime);
+    return sortedNodes;
+  }
+
+  private Node getClientNode(String clientMachine, List<DatanodeDetails> 
nodes, NetworkTopology clusterMap) {
+    if (StringUtils.isEmpty(clientMachine)) {
+      return null;
+    }
+    List<DatanodeDetails> matchingNodes = new ArrayList<>();
+    for (DatanodeDetails node : nodes) {
+      if (node.getIpAddress().equals(clientMachine)) {
+        matchingNodes.add(node);
+      }
+    }
+    return !matchingNodes.isEmpty() ? matchingNodes.get(0) :
+        getOtherNode(clientMachine, clusterMap);
+  }
+
+  private Node getOtherNode(String clientMachine, NetworkTopology clusterMap) {
+    try {
+      String clientLocation = resolveNodeLocation(clientMachine);
+      if (clientLocation != null) {
+        Node rack = clusterMap.getNode(clientLocation);
+        if (rack instanceof InnerNode) {
+          return new NodeImpl(clientMachine, clientLocation,
+              (InnerNode) rack, rack.getLevel() + 1,
+              NODE_COST_DEFAULT);
+        }
+      }
+    } catch (Exception e) {
+      LOG.info("Could not resolve client {}: {}", clientMachine, 
e.getMessage());

Review Comment:
   ```suggestion
         LOG.warn("Could not resolve client {}: {}", clientMachine, 
e.getMessage());
   ```



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java:
##########
@@ -681,6 +681,21 @@ public final class OzoneConfigKeys {
       "ozone.security.crypto.compliance.mode";
   public static final String 
OZONE_SECURITY_CRYPTO_COMPLIANCE_MODE_UNRESTRICTED = "unrestricted";
 
+  // Sets the upper limit on the number of prefetched blocks OM can hold.
+  public static final String OZONE_OM_PREFETCH_MAX_BLOCKS = 
"ozone.om.prefetch.max.blocks";
+  public static final int OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT = 10000;
+
+  // Sets the lower limit on the number of prefetched blocks OM can hold.
+  public static final String OZONE_OM_PREFETCH_MIN_BLOCKS = 
"ozone.om.prefetch.min.blocks";

Review Comment:
   ```suggestion
     public static final String OZONE_OM_PREFETCH_MAX_BLOCKS = 
"ozone.om.prefetch.blocks.max";
     public static final int OZONE_OM_PREFETCH_MAX_BLOCKS_DEFAULT = 10000;
   
     // Sets the lower limit on the number of prefetched blocks OM can hold.
     public static final String OZONE_OM_PREFETCH_MIN_BLOCKS = 
"ozone.om.prefetch.blocks.min";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to