aswinshakil commented on code in PR #10000:
URL: https://github.com/apache/ozone/pull/10000#discussion_r3030331856
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline,
String owner)
return containerInfo;
}
+ /**
+ * Check if a pipeline has sufficient space after considering pending
allocations.
+ * Tracks containers scheduled but not yet written to DataNodes, preventing
over-allocation.
+ *
+ * @param pipeline The pipeline to check
+ * @return true if sufficient space is available, false otherwise
+ */
+ private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
+ try {
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ // Get DN's storage statistics
+ DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
+ if (datanodeInfo == null) {
+ LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
+ return false;
+ }
+
+ List<StorageReportProto> storageReports =
datanodeInfo.getStorageReports();
+ if (storageReports == null || storageReports.isEmpty()) {
+ LOG.warn("No storage reports for node {}", node.getUuidString());
+ return false;
+ }
+
+ // Calculate total capacity and effective allocatable space
+ // For each disk, calculate how many containers can actually fit,
+ // since containers are written to individual disks, not spread across
them.
+ // Example: disk1=9GB, disk2=14GB with 5GB containers
+ // (1*5GB) + (2*5GB) = 15GB → actually 3 containers
+ long totalCapacity = 0L;
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ totalCapacity += report.getCapacity();
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ // Calculate how many containers can fit on this disk
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ // Add effective space (containers that fit * container size)
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ }
+
+ // Get pending allocations from tracker
+ long pendingAllocations =
pendingContainerTracker.getPendingAllocationSize(node);
+
+ // Calculate effective remaining space after pending allocations
+ long effectiveRemaining = effectiveAllocatableSpace -
pendingAllocations;
+
+ // Check if there's enough space for a new container
+ if (effectiveRemaining < maxContainerSize) {
+ LOG.info("Node {} insufficient space: capacity={}, effective
allocatable={}, " +
+ "pending allocations={}, effective remaining={},
required={}",
+ node.getUuidString(), totalCapacity, effectiveAllocatableSpace,
+ pendingAllocations, effectiveRemaining, maxContainerSize);
+ return false;
+ }
+ }
+
+ return true;
+ } catch (Exception e) {
+ LOG.warn("Error checking space for pipeline {}", pipeline.getId(), e);
+ return true;
Review Comment:
If we are not sure if we can create container here, Should we still choose
this pipeline? Instead of making it generic, we can specify what to do for each
exception we might see.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
Review Comment:
Pending allocations can persist beyond 2× roll interval after long idle
periods because `rollIfNeeded()` only rolls once.A single roll doesn’t clear
entries older than two windows which can incorrectly block new allocations.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java:
##########
@@ -175,6 +175,15 @@ public void onMessage(final ContainerReportFromDatanode
reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);
+
+ // Remove from pending tracker when container is added to DN
+ // This container was just confirmed for the first time on this DN
+ // No need to remove on subsequent reports (it's already been
removed)
+ if (container != null && getContainerManager() instanceof
ContainerManagerImpl) {
Review Comment:
Why not just add this to the ContainerManager interface? We can avoid these
conversions. Is this because Recon uses the same code path and we don't want it
to this? For Recon we can just make it a No-Op.
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java:
##########
@@ -346,4 +346,9 @@ public int openContainerLimit(List<DatanodeDetails>
datanodes) {
public SCMPipelineMetrics getMetrics() {
return null;
}
+
+ @Override
+ public org.apache.hadoop.hdds.scm.node.DatanodeInfo
getDatanodeInfo(DatanodeDetails datanodeDetails) {
Review Comment:
Nit: We can import `DatanodeInfo`
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline,
String owner)
return containerInfo;
}
+ /**
+ * Check if a pipeline has sufficient space after considering pending
allocations.
+ * Tracks containers scheduled but not yet written to DataNodes, preventing
over-allocation.
+ *
+ * @param pipeline The pipeline to check
+ * @return true if sufficient space is available, false otherwise
+ */
+ private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
+ try {
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ // Get DN's storage statistics
+ DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
+ if (datanodeInfo == null) {
+ LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
+ return false;
+ }
+
+ List<StorageReportProto> storageReports =
datanodeInfo.getStorageReports();
+ if (storageReports == null || storageReports.isEmpty()) {
+ LOG.warn("No storage reports for node {}", node.getUuidString());
+ return false;
+ }
+
+ // Calculate total capacity and effective allocatable space
+ // For each disk, calculate how many containers can actually fit,
+ // since containers are written to individual disks, not spread across
them.
+ // Example: disk1=9GB, disk2=14GB with 5GB containers
+ // (1*5GB) + (2*5GB) = 15GB → actually 3 containers
+ long totalCapacity = 0L;
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ totalCapacity += report.getCapacity();
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ // Calculate how many containers can fit on this disk
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ // Add effective space (containers that fit * container size)
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ }
+
+ // Get pending allocations from tracker
+ long pendingAllocations =
pendingContainerTracker.getPendingAllocationSize(node);
+
+ // Calculate effective remaining space after pending allocations
+ long effectiveRemaining = effectiveAllocatableSpace -
pendingAllocations;
+
+ // Check if there's enough space for a new container
+ if (effectiveRemaining < maxContainerSize) {
Review Comment:
This makes the allocation little aggressive right? Even if we just have 5GB
we allocate it. Should we have leave some buffer when allocating a container?
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ if (now - lastRollTime >= rollIntervalMs) {
+ // Shift: current becomes previous
+ previousWindow = currentWindow;
+ // Reset: new empty current window
+ currentWindow = ConcurrentHashMap.newKeySet();
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {}, Current window
reset to empty", previousWindow.size());
+ }
+ }
+
+ /**
+ * Get union of both windows (all pending containers).
+ */
+ synchronized Set<ContainerID> getAllPending() {
+ Set<ContainerID> all = new HashSet<>();
+ all.addAll(currentWindow);
+ all.addAll(previousWindow);
+ return all;
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID) {
+ return currentWindow.add(containerID);
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ return removedFromCurrent || removedFromPrevious;
+ }
+
+ /**
+ * Check if either window is non-empty.
+ */
+ synchronized boolean isEmpty() {
+ return currentWindow.isEmpty() && previousWindow.isEmpty();
+ }
+
+ /**
+ * Get count of all pending containers (union).
+ */
+ synchronized int getCount() {
+ return getAllPending().size();
+ }
+ }
+
+ public PendingContainerTracker(long maxContainerSize) {
+ this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+ }
+
+ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
+ SCMContainerManagerMetrics metrics) {
+ this.datanodeBuckets = new ConcurrentHashMap<>();
+ this.maxContainerSize = maxContainerSize;
+ this.rollIntervalMs = rollIntervalMs;
+ this.metrics = metrics;
+ LOG.info("PendingContainerTracker initialized with maxContainerSize={}B,
rollInterval={}ms",
+ maxContainerSize, rollIntervalMs);
+ }
+
+ /**
+ * Record a pending container allocation for all DataNodes in the pipeline.
+ * Container is added to the current window.
+ *
+ * @param pipeline The pipeline where container is allocated
+ * @param containerID The container being allocated
+ */
+ public void recordPendingAllocation(Pipeline pipeline, ContainerID
containerID) {
+ if (pipeline == null || containerID == null) {
+ LOG.warn("Ignoring null pipeline or containerID");
+ return;
+ }
+
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ recordPendingAllocationForDatanode(node, containerID);
+ }
+ }
+
+ /**
+ * Record a pending container allocation for a single DataNode.
+ * Container is added to the current window.
+ *
+ * @param node The DataNode where container is being allocated/replicated
+ * @param containerID The container being allocated/replicated
+ */
+ public void recordPendingAllocationForDatanode(DatanodeDetails node,
ContainerID containerID) {
+ if (node == null || containerID == null) {
+ LOG.warn("Ignoring null node or containerID");
+ return;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent(
+ node.getUuid(),
+ k -> new TwoWindowBucket(rollIntervalMs)
+ );
+
+ // Roll window if needed before adding
+ bucket.rollIfNeeded();
+
+ boolean added = bucket.add(containerID);
+ LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total
pending={}",
Review Comment:
We can change all the LOG to debug. This will become too noisy on the SCM
log for every allocation and removal.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ if (now - lastRollTime >= rollIntervalMs) {
+ // Shift: current becomes previous
+ previousWindow = currentWindow;
+ // Reset: new empty current window
+ currentWindow = ConcurrentHashMap.newKeySet();
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {}, Current window
reset to empty", previousWindow.size());
+ }
+ }
+
+ /**
+ * Get union of both windows (all pending containers).
+ */
+ synchronized Set<ContainerID> getAllPending() {
+ Set<ContainerID> all = new HashSet<>();
+ all.addAll(currentWindow);
+ all.addAll(previousWindow);
+ return all;
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID) {
+ return currentWindow.add(containerID);
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ return removedFromCurrent || removedFromPrevious;
+ }
+
+ /**
+ * Check if either window is non-empty.
+ */
+ synchronized boolean isEmpty() {
+ return currentWindow.isEmpty() && previousWindow.isEmpty();
+ }
+
+ /**
+ * Get count of all pending containers (union).
+ */
+ synchronized int getCount() {
+ return getAllPending().size();
Review Comment:
Can we just return the count of both windows instead of adding them to
separate set just to get the count here.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java:
##########
@@ -89,6 +89,15 @@ protected void
processICR(IncrementalContainerReportFromDatanode report,
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
final ContainerInfo container;
try {
+ // Check if container is already known to this DN before adding
+ boolean alreadyOnDn = false;
Review Comment:
Agreed. Instead of checking and then removing. We can just check the pending
list and remove it. It will be the same, We can just avoid one extra op.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ if (now - lastRollTime >= rollIntervalMs) {
+ // Shift: current becomes previous
+ previousWindow = currentWindow;
+ // Reset: new empty current window
+ currentWindow = ConcurrentHashMap.newKeySet();
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {}, Current window
reset to empty", previousWindow.size());
+ }
+ }
+
+ /**
+ * Get union of both windows (all pending containers).
+ */
+ synchronized Set<ContainerID> getAllPending() {
+ Set<ContainerID> all = new HashSet<>();
+ all.addAll(currentWindow);
+ all.addAll(previousWindow);
+ return all;
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID) {
+ return currentWindow.add(containerID);
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ return removedFromCurrent || removedFromPrevious;
+ }
+
+ /**
+ * Check if either window is non-empty.
+ */
+ synchronized boolean isEmpty() {
+ return currentWindow.isEmpty() && previousWindow.isEmpty();
+ }
+
+ /**
+ * Get count of all pending containers (union).
+ */
+ synchronized int getCount() {
+ return getAllPending().size();
+ }
+ }
+
+ public PendingContainerTracker(long maxContainerSize) {
+ this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+ }
+
+ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
+ SCMContainerManagerMetrics metrics) {
+ this.datanodeBuckets = new ConcurrentHashMap<>();
+ this.maxContainerSize = maxContainerSize;
+ this.rollIntervalMs = rollIntervalMs;
+ this.metrics = metrics;
+ LOG.info("PendingContainerTracker initialized with maxContainerSize={}B,
rollInterval={}ms",
+ maxContainerSize, rollIntervalMs);
+ }
+
+ /**
+ * Record a pending container allocation for all DataNodes in the pipeline.
+ * Container is added to the current window.
+ *
+ * @param pipeline The pipeline where container is allocated
+ * @param containerID The container being allocated
+ */
+ public void recordPendingAllocation(Pipeline pipeline, ContainerID
containerID) {
+ if (pipeline == null || containerID == null) {
+ LOG.warn("Ignoring null pipeline or containerID");
+ return;
+ }
+
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ recordPendingAllocationForDatanode(node, containerID);
+ }
+ }
+
+ /**
+ * Record a pending container allocation for a single DataNode.
+ * Container is added to the current window.
+ *
+ * @param node The DataNode where container is being allocated/replicated
+ * @param containerID The container being allocated/replicated
+ */
+ public void recordPendingAllocationForDatanode(DatanodeDetails node,
ContainerID containerID) {
+ if (node == null || containerID == null) {
+ LOG.warn("Ignoring null node or containerID");
+ return;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent(
+ node.getUuid(),
+ k -> new TwoWindowBucket(rollIntervalMs)
+ );
+
+ // Roll window if needed before adding
+ bucket.rollIfNeeded();
+
+ boolean added = bucket.add(containerID);
+ LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total
pending={}",
+ containerID, node.getUuidString(), added, bucket.getCount());
+
+ // Increment metrics counter
+ if (added && metrics != null) {
+ metrics.incNumPendingContainersAdded();
+ }
+ }
+
+ /**
+ * Remove a pending container allocation from a specific DataNode.
+ * Removes from both current and previous windows.
+ * Called when container is confirmed.
+ *
+ * @param node The DataNode
+ * @param containerID The container to remove from pending
+ */
+ public void removePendingAllocation(DatanodeDetails node, ContainerID
containerID) {
+ if (node == null || containerID == null) {
+ return;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid());
+ if (bucket != null) {
+ // Roll window if needed before removing
+ bucket.rollIfNeeded();
+
+ boolean removed = bucket.remove(containerID);
+ LOG.info("Removed pending container {} from DataNode {}. Removed={},
Remaining={}",
Review Comment:
Same as above for all the LOG in this class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]