aswinshakil commented on code in PR #10000:
URL: https://github.com/apache/ozone/pull/10000#discussion_r3030331856


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline, 
String owner)
     return containerInfo;
   }
 
+  /**
+   * Check if a pipeline has sufficient space after considering pending 
allocations.
+   * Tracks containers scheduled but not yet written to DataNodes, preventing 
over-allocation.
+   * 
+   * @param pipeline The pipeline to check
+   * @return true if sufficient space is available, false otherwise
+   */
+  private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
+    try {
+      for (DatanodeDetails node : pipeline.getNodes()) {
+        // Get DN's storage statistics
+        DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
+        if (datanodeInfo == null) {
+          LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
+          return false;
+        }
+
+        List<StorageReportProto> storageReports = 
datanodeInfo.getStorageReports();
+        if (storageReports == null || storageReports.isEmpty()) {
+          LOG.warn("No storage reports for node {}", node.getUuidString());
+          return false;
+        }
+
+        // Calculate total capacity and effective allocatable space
+        // For each disk, calculate how many containers can actually fit,
+        // since containers are written to individual disks, not spread across 
them.
+        // Example: disk1=9GB, disk2=14GB with 5GB containers
+        // (1*5GB) + (2*5GB) = 15GB → actually 3 containers
+        long totalCapacity = 0L;
+        long effectiveAllocatableSpace = 0L;
+        for (StorageReportProto report : storageReports) {
+          totalCapacity += report.getCapacity();
+          long usableSpace = VolumeUsage.getUsableSpace(report);
+          // Calculate how many containers can fit on this disk
+          long containersOnThisDisk = usableSpace / maxContainerSize;
+          // Add effective space (containers that fit * container size)
+          effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+        }
+
+        // Get pending allocations from tracker
+        long pendingAllocations = 
pendingContainerTracker.getPendingAllocationSize(node);
+        
+        // Calculate effective remaining space after pending allocations
+        long effectiveRemaining = effectiveAllocatableSpace - 
pendingAllocations;
+        
+        // Check if there's enough space for a new container
+        if (effectiveRemaining < maxContainerSize) {
+          LOG.info("Node {} insufficient space: capacity={}, effective 
allocatable={}, " +
+                  "pending allocations={}, effective remaining={}, 
required={}",
+              node.getUuidString(), totalCapacity, effectiveAllocatableSpace,
+              pendingAllocations, effectiveRemaining, maxContainerSize);
+          return false;
+        }
+      }
+      
+      return true;
+    } catch (Exception e) {
+      LOG.warn("Error checking space for pipeline {}", pipeline.getId(), e);
+      return true;

Review Comment:
   If we are not sure if we can create container here, Should we still choose 
this pipeline? Instead of making it generic, we can specify what to do for each 
exception we might see. 



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket 
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ *   <li>Each DataNode has two sets: <b>currentWindow</b> and 
<b>previousWindow</b></li>
+ *   <li>New allocations go into <b>currentWindow</b></li>
+ *   <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ *     <ul>
+ *       <li>previousWindow = currentWindow (shift)</li>
+ *       <li>currentWindow = new empty set (reset)</li>
+ *       <li>Old previousWindow is discarded (automatic aging)</li>
+ *     </ul>
+ *   </li>
+ *   <li>When checking pending: return <b>union</b> of currentWindow + 
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time  | Action                    | CurrentWindow | PreviousWindow | Total 
Pending
+ * 
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
+ * 00:05 | Allocate Container-2      | {C1, C2}      | {}             | {C1, 
C2}
+ * 00:10 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, 
C2}
+ * 00:12 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, 
C2, C3}
+ * 00:15 | Report confirms C1        | {C3}          | {C2}           | {C2, 
C3}
+ * 00:20 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
+ *       | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(PendingContainerTracker.class);
+  
+  /**
+   * Roll interval in milliseconds.
+   * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+   * Default: 10 minutes.
+   * Containers automatically age out after 2 × rollIntervalMs.
+   */
+  private final long rollIntervalMs;
+
+  /**
+   * Map of DataNode UUID to TwoWindowBucket.
+   */
+  private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+  /**
+   * Maximum container size in bytes.
+   */
+  private final long maxContainerSize;
+
+  /**
+   * Metrics for tracking pending containers.
+   */
+  private final SCMContainerManagerMetrics metrics;
+  
+  /**
+   * Two-window bucket for a single DataNode.
+   * Contains current and previous window sets, plus last roll timestamp.
+   */
+  private static class TwoWindowBucket {
+    private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+    private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+    private long lastRollTime = Time.monotonicNow();
+    private final long rollIntervalMs;
+    
+    TwoWindowBucket(long rollIntervalMs) {
+      this.rollIntervalMs = rollIntervalMs;
+    }
+    
+    /**
+     * Roll the windows: previous = current, current = empty.
+     * Called when current time exceeds lastRollTime + rollIntervalMs.
+     */
+    synchronized void rollIfNeeded() {

Review Comment:
   Pending allocations can persist beyond 2× roll interval after long idle 
periods because `rollIfNeeded()` only rolls once.A single roll doesn’t clear 
entries older than two windows which can incorrectly block new allocations.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java:
##########
@@ -175,6 +175,15 @@ public void onMessage(final ContainerReportFromDatanode 
reportFromDatanode,
           if (!alreadyInDn) {
             // This is a new Container not in the nodeManager -> dn map yet
             getNodeManager().addContainer(datanodeDetails, cid);
+            
+            // Remove from pending tracker when container is added to DN
+            // This container was just confirmed for the first time on this DN
+            // No need to remove on subsequent reports (it's already been 
removed)
+            if (container != null && getContainerManager() instanceof 
ContainerManagerImpl) {

Review Comment:
   Why not just add this to the ContainerManager interface? We can avoid these 
conversions. Is this because Recon uses the same code path and we don't want it 
to this? For Recon we can just make it a No-Op. 



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java:
##########
@@ -346,4 +346,9 @@ public int openContainerLimit(List<DatanodeDetails> 
datanodes) {
   public SCMPipelineMetrics getMetrics() {
     return null;
   }
+
+  @Override
+  public org.apache.hadoop.hdds.scm.node.DatanodeInfo 
getDatanodeInfo(DatanodeDetails datanodeDetails) {

Review Comment:
   Nit: We can import `DatanodeInfo`



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline, 
String owner)
     return containerInfo;
   }
 
+  /**
+   * Check if a pipeline has sufficient space after considering pending 
allocations.
+   * Tracks containers scheduled but not yet written to DataNodes, preventing 
over-allocation.
+   * 
+   * @param pipeline The pipeline to check
+   * @return true if sufficient space is available, false otherwise
+   */
+  private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
+    try {
+      for (DatanodeDetails node : pipeline.getNodes()) {
+        // Get DN's storage statistics
+        DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
+        if (datanodeInfo == null) {
+          LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
+          return false;
+        }
+
+        List<StorageReportProto> storageReports = 
datanodeInfo.getStorageReports();
+        if (storageReports == null || storageReports.isEmpty()) {
+          LOG.warn("No storage reports for node {}", node.getUuidString());
+          return false;
+        }
+
+        // Calculate total capacity and effective allocatable space
+        // For each disk, calculate how many containers can actually fit,
+        // since containers are written to individual disks, not spread across 
them.
+        // Example: disk1=9GB, disk2=14GB with 5GB containers
+        // (1*5GB) + (2*5GB) = 15GB → actually 3 containers
+        long totalCapacity = 0L;
+        long effectiveAllocatableSpace = 0L;
+        for (StorageReportProto report : storageReports) {
+          totalCapacity += report.getCapacity();
+          long usableSpace = VolumeUsage.getUsableSpace(report);
+          // Calculate how many containers can fit on this disk
+          long containersOnThisDisk = usableSpace / maxContainerSize;
+          // Add effective space (containers that fit * container size)
+          effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+        }
+
+        // Get pending allocations from tracker
+        long pendingAllocations = 
pendingContainerTracker.getPendingAllocationSize(node);
+        
+        // Calculate effective remaining space after pending allocations
+        long effectiveRemaining = effectiveAllocatableSpace - 
pendingAllocations;
+        
+        // Check if there's enough space for a new container
+        if (effectiveRemaining < maxContainerSize) {

Review Comment:
   This makes the allocation little aggressive right? Even if we just have 5GB 
we allocate it. Should we have leave some buffer when allocating a container?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket 
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ *   <li>Each DataNode has two sets: <b>currentWindow</b> and 
<b>previousWindow</b></li>
+ *   <li>New allocations go into <b>currentWindow</b></li>
+ *   <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ *     <ul>
+ *       <li>previousWindow = currentWindow (shift)</li>
+ *       <li>currentWindow = new empty set (reset)</li>
+ *       <li>Old previousWindow is discarded (automatic aging)</li>
+ *     </ul>
+ *   </li>
+ *   <li>When checking pending: return <b>union</b> of currentWindow + 
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time  | Action                    | CurrentWindow | PreviousWindow | Total 
Pending
+ * 
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
+ * 00:05 | Allocate Container-2      | {C1, C2}      | {}             | {C1, 
C2}
+ * 00:10 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, 
C2}
+ * 00:12 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, 
C2, C3}
+ * 00:15 | Report confirms C1        | {C3}          | {C2}           | {C2, 
C3}
+ * 00:20 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
+ *       | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(PendingContainerTracker.class);
+  
+  /**
+   * Roll interval in milliseconds.
+   * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+   * Default: 10 minutes.
+   * Containers automatically age out after 2 × rollIntervalMs.
+   */
+  private final long rollIntervalMs;
+
+  /**
+   * Map of DataNode UUID to TwoWindowBucket.
+   */
+  private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+  /**
+   * Maximum container size in bytes.
+   */
+  private final long maxContainerSize;
+
+  /**
+   * Metrics for tracking pending containers.
+   */
+  private final SCMContainerManagerMetrics metrics;
+  
+  /**
+   * Two-window bucket for a single DataNode.
+   * Contains current and previous window sets, plus last roll timestamp.
+   */
+  private static class TwoWindowBucket {
+    private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+    private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+    private long lastRollTime = Time.monotonicNow();
+    private final long rollIntervalMs;
+    
+    TwoWindowBucket(long rollIntervalMs) {
+      this.rollIntervalMs = rollIntervalMs;
+    }
+    
+    /**
+     * Roll the windows: previous = current, current = empty.
+     * Called when current time exceeds lastRollTime + rollIntervalMs.
+     */
+    synchronized void rollIfNeeded() {
+      long now = Time.monotonicNow();
+      if (now - lastRollTime >= rollIntervalMs) {
+        // Shift: current becomes previous
+        previousWindow = currentWindow;
+        // Reset: new empty current window
+        currentWindow = ConcurrentHashMap.newKeySet();
+        lastRollTime = now;
+        LOG.debug("Rolled window. Previous window size: {}, Current window 
reset to empty", previousWindow.size());
+      }
+    }
+    
+    /**
+     * Get union of both windows (all pending containers).
+     */
+    synchronized Set<ContainerID> getAllPending() {
+      Set<ContainerID> all = new HashSet<>();
+      all.addAll(currentWindow);
+      all.addAll(previousWindow);
+      return all;
+    }
+    
+    /**
+     * Add container to current window.
+     */
+    synchronized boolean add(ContainerID containerID) {
+      return currentWindow.add(containerID);
+    }
+    
+    /**
+     * Remove container from both windows.
+     */
+    synchronized boolean remove(ContainerID containerID) {
+      boolean removedFromCurrent = currentWindow.remove(containerID);
+      boolean removedFromPrevious = previousWindow.remove(containerID);
+      return removedFromCurrent || removedFromPrevious;
+    }
+    
+    /**
+     * Check if either window is non-empty.
+     */
+    synchronized boolean isEmpty() {
+      return currentWindow.isEmpty() && previousWindow.isEmpty();
+    }
+    
+    /**
+     * Get count of all pending containers (union).
+     */
+    synchronized int getCount() {
+      return getAllPending().size();
+    }
+  }
+  
+  public PendingContainerTracker(long maxContainerSize) {
+    this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+  }
+  
+  public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, 
+      SCMContainerManagerMetrics metrics) {
+    this.datanodeBuckets = new ConcurrentHashMap<>();
+    this.maxContainerSize = maxContainerSize;
+    this.rollIntervalMs = rollIntervalMs;
+    this.metrics = metrics;
+    LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, 
rollInterval={}ms",
+        maxContainerSize, rollIntervalMs);
+  }
+  
+  /**
+   * Record a pending container allocation for all DataNodes in the pipeline.
+   * Container is added to the current window.
+   * 
+   * @param pipeline The pipeline where container is allocated
+   * @param containerID The container being allocated
+   */
+  public void recordPendingAllocation(Pipeline pipeline, ContainerID 
containerID) {
+    if (pipeline == null || containerID == null) {
+      LOG.warn("Ignoring null pipeline or containerID");
+      return;
+    }
+    
+    for (DatanodeDetails node : pipeline.getNodes()) {
+      recordPendingAllocationForDatanode(node, containerID);
+    }
+  }
+  
+  /**
+   * Record a pending container allocation for a single DataNode.
+   * Container is added to the current window.
+   * 
+   * @param node The DataNode where container is being allocated/replicated
+   * @param containerID The container being allocated/replicated
+   */
+  public void recordPendingAllocationForDatanode(DatanodeDetails node, 
ContainerID containerID) {
+    if (node == null || containerID == null) {
+      LOG.warn("Ignoring null node or containerID");
+      return;
+    }
+    
+    TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent(
+        node.getUuid(),
+        k -> new TwoWindowBucket(rollIntervalMs)
+    );
+    
+    // Roll window if needed before adding
+    bucket.rollIfNeeded();
+    
+    boolean added = bucket.add(containerID);
+    LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total 
pending={}",

Review Comment:
   We can change all the LOG to debug. This will become too noisy on the SCM 
log for every allocation and removal.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket 
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ *   <li>Each DataNode has two sets: <b>currentWindow</b> and 
<b>previousWindow</b></li>
+ *   <li>New allocations go into <b>currentWindow</b></li>
+ *   <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ *     <ul>
+ *       <li>previousWindow = currentWindow (shift)</li>
+ *       <li>currentWindow = new empty set (reset)</li>
+ *       <li>Old previousWindow is discarded (automatic aging)</li>
+ *     </ul>
+ *   </li>
+ *   <li>When checking pending: return <b>union</b> of currentWindow + 
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time  | Action                    | CurrentWindow | PreviousWindow | Total 
Pending
+ * 
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
+ * 00:05 | Allocate Container-2      | {C1, C2}      | {}             | {C1, 
C2}
+ * 00:10 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, 
C2}
+ * 00:12 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, 
C2, C3}
+ * 00:15 | Report confirms C1        | {C3}          | {C2}           | {C2, 
C3}
+ * 00:20 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
+ *       | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(PendingContainerTracker.class);
+  
+  /**
+   * Roll interval in milliseconds.
+   * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+   * Default: 10 minutes.
+   * Containers automatically age out after 2 × rollIntervalMs.
+   */
+  private final long rollIntervalMs;
+
+  /**
+   * Map of DataNode UUID to TwoWindowBucket.
+   */
+  private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+  /**
+   * Maximum container size in bytes.
+   */
+  private final long maxContainerSize;
+
+  /**
+   * Metrics for tracking pending containers.
+   */
+  private final SCMContainerManagerMetrics metrics;
+  
+  /**
+   * Two-window bucket for a single DataNode.
+   * Contains current and previous window sets, plus last roll timestamp.
+   */
+  private static class TwoWindowBucket {
+    private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+    private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+    private long lastRollTime = Time.monotonicNow();
+    private final long rollIntervalMs;
+    
+    TwoWindowBucket(long rollIntervalMs) {
+      this.rollIntervalMs = rollIntervalMs;
+    }
+    
+    /**
+     * Roll the windows: previous = current, current = empty.
+     * Called when current time exceeds lastRollTime + rollIntervalMs.
+     */
+    synchronized void rollIfNeeded() {
+      long now = Time.monotonicNow();
+      if (now - lastRollTime >= rollIntervalMs) {
+        // Shift: current becomes previous
+        previousWindow = currentWindow;
+        // Reset: new empty current window
+        currentWindow = ConcurrentHashMap.newKeySet();
+        lastRollTime = now;
+        LOG.debug("Rolled window. Previous window size: {}, Current window 
reset to empty", previousWindow.size());
+      }
+    }
+    
+    /**
+     * Get union of both windows (all pending containers).
+     */
+    synchronized Set<ContainerID> getAllPending() {
+      Set<ContainerID> all = new HashSet<>();
+      all.addAll(currentWindow);
+      all.addAll(previousWindow);
+      return all;
+    }
+    
+    /**
+     * Add container to current window.
+     */
+    synchronized boolean add(ContainerID containerID) {
+      return currentWindow.add(containerID);
+    }
+    
+    /**
+     * Remove container from both windows.
+     */
+    synchronized boolean remove(ContainerID containerID) {
+      boolean removedFromCurrent = currentWindow.remove(containerID);
+      boolean removedFromPrevious = previousWindow.remove(containerID);
+      return removedFromCurrent || removedFromPrevious;
+    }
+    
+    /**
+     * Check if either window is non-empty.
+     */
+    synchronized boolean isEmpty() {
+      return currentWindow.isEmpty() && previousWindow.isEmpty();
+    }
+    
+    /**
+     * Get count of all pending containers (union).
+     */
+    synchronized int getCount() {
+      return getAllPending().size();

Review Comment:
   Can we just return the count of both windows instead of adding them to 
separate set just to get the count here. 



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java:
##########
@@ -89,6 +89,15 @@ protected void 
processICR(IncrementalContainerReportFromDatanode report,
         ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
         final ContainerInfo container;
         try {
+          // Check if container is already known to this DN before adding
+          boolean alreadyOnDn = false;

Review Comment:
   Agreed. Instead of checking and then removing. We can just check the pending 
list and remove it. It will be the same, We can just avoid one extra op. 



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket 
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ *   <li>Each DataNode has two sets: <b>currentWindow</b> and 
<b>previousWindow</b></li>
+ *   <li>New allocations go into <b>currentWindow</b></li>
+ *   <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ *     <ul>
+ *       <li>previousWindow = currentWindow (shift)</li>
+ *       <li>currentWindow = new empty set (reset)</li>
+ *       <li>Old previousWindow is discarded (automatic aging)</li>
+ *     </ul>
+ *   </li>
+ *   <li>When checking pending: return <b>union</b> of currentWindow + 
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time  | Action                    | CurrentWindow | PreviousWindow | Total 
Pending
+ * 
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
+ * 00:05 | Allocate Container-2      | {C1, C2}      | {}             | {C1, 
C2}
+ * 00:10 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, 
C2}
+ * 00:12 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, 
C2, C3}
+ * 00:15 | Report confirms C1        | {C3}          | {C2}           | {C2, 
C3}
+ * 00:20 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
+ *       | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+  
+  private static final Logger LOG = 
LoggerFactory.getLogger(PendingContainerTracker.class);
+  
+  /**
+   * Roll interval in milliseconds.
+   * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+   * Default: 10 minutes.
+   * Containers automatically age out after 2 × rollIntervalMs.
+   */
+  private final long rollIntervalMs;
+
+  /**
+   * Map of DataNode UUID to TwoWindowBucket.
+   */
+  private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+  /**
+   * Maximum container size in bytes.
+   */
+  private final long maxContainerSize;
+
+  /**
+   * Metrics for tracking pending containers.
+   */
+  private final SCMContainerManagerMetrics metrics;
+  
+  /**
+   * Two-window bucket for a single DataNode.
+   * Contains current and previous window sets, plus last roll timestamp.
+   */
+  private static class TwoWindowBucket {
+    private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+    private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+    private long lastRollTime = Time.monotonicNow();
+    private final long rollIntervalMs;
+    
+    TwoWindowBucket(long rollIntervalMs) {
+      this.rollIntervalMs = rollIntervalMs;
+    }
+    
+    /**
+     * Roll the windows: previous = current, current = empty.
+     * Called when current time exceeds lastRollTime + rollIntervalMs.
+     */
+    synchronized void rollIfNeeded() {
+      long now = Time.monotonicNow();
+      if (now - lastRollTime >= rollIntervalMs) {
+        // Shift: current becomes previous
+        previousWindow = currentWindow;
+        // Reset: new empty current window
+        currentWindow = ConcurrentHashMap.newKeySet();
+        lastRollTime = now;
+        LOG.debug("Rolled window. Previous window size: {}, Current window 
reset to empty", previousWindow.size());
+      }
+    }
+    
+    /**
+     * Get union of both windows (all pending containers).
+     */
+    synchronized Set<ContainerID> getAllPending() {
+      Set<ContainerID> all = new HashSet<>();
+      all.addAll(currentWindow);
+      all.addAll(previousWindow);
+      return all;
+    }
+    
+    /**
+     * Add container to current window.
+     */
+    synchronized boolean add(ContainerID containerID) {
+      return currentWindow.add(containerID);
+    }
+    
+    /**
+     * Remove container from both windows.
+     */
+    synchronized boolean remove(ContainerID containerID) {
+      boolean removedFromCurrent = currentWindow.remove(containerID);
+      boolean removedFromPrevious = previousWindow.remove(containerID);
+      return removedFromCurrent || removedFromPrevious;
+    }
+    
+    /**
+     * Check if either window is non-empty.
+     */
+    synchronized boolean isEmpty() {
+      return currentWindow.isEmpty() && previousWindow.isEmpty();
+    }
+    
+    /**
+     * Get count of all pending containers (union).
+     */
+    synchronized int getCount() {
+      return getAllPending().size();
+    }
+  }
+  
+  public PendingContainerTracker(long maxContainerSize) {
+    this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+  }
+  
+  public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, 
+      SCMContainerManagerMetrics metrics) {
+    this.datanodeBuckets = new ConcurrentHashMap<>();
+    this.maxContainerSize = maxContainerSize;
+    this.rollIntervalMs = rollIntervalMs;
+    this.metrics = metrics;
+    LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, 
rollInterval={}ms",
+        maxContainerSize, rollIntervalMs);
+  }
+  
+  /**
+   * Record a pending container allocation for all DataNodes in the pipeline.
+   * Container is added to the current window.
+   * 
+   * @param pipeline The pipeline where container is allocated
+   * @param containerID The container being allocated
+   */
+  public void recordPendingAllocation(Pipeline pipeline, ContainerID 
containerID) {
+    if (pipeline == null || containerID == null) {
+      LOG.warn("Ignoring null pipeline or containerID");
+      return;
+    }
+    
+    for (DatanodeDetails node : pipeline.getNodes()) {
+      recordPendingAllocationForDatanode(node, containerID);
+    }
+  }
+  
+  /**
+   * Record a pending container allocation for a single DataNode.
+   * Container is added to the current window.
+   * 
+   * @param node The DataNode where container is being allocated/replicated
+   * @param containerID The container being allocated/replicated
+   */
+  public void recordPendingAllocationForDatanode(DatanodeDetails node, 
ContainerID containerID) {
+    if (node == null || containerID == null) {
+      LOG.warn("Ignoring null node or containerID");
+      return;
+    }
+    
+    TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent(
+        node.getUuid(),
+        k -> new TwoWindowBucket(rollIntervalMs)
+    );
+    
+    // Roll window if needed before adding
+    bucket.rollIfNeeded();
+    
+    boolean added = bucket.add(containerID);
+    LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total 
pending={}",
+        containerID, node.getUuidString(), added, bucket.getCount());
+    
+    // Increment metrics counter
+    if (added && metrics != null) {
+      metrics.incNumPendingContainersAdded();
+    }
+  }
+
+  /**
+   * Remove a pending container allocation from a specific DataNode.
+   * Removes from both current and previous windows.
+   * Called when container is confirmed.
+   * 
+   * @param node The DataNode
+   * @param containerID The container to remove from pending
+   */
+  public void removePendingAllocation(DatanodeDetails node, ContainerID 
containerID) {
+    if (node == null || containerID == null) {
+      return;
+    }
+    
+    TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid());
+    if (bucket != null) {
+      // Roll window if needed before removing
+      bucket.rollIfNeeded();
+      
+      boolean removed = bucket.remove(containerID);
+      LOG.info("Removed pending container {} from DataNode {}. Removed={}, 
Remaining={}",

Review Comment:
   Same as above for all the LOG in this class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to