sumitagrawl commented on code in PR #10000:
URL: https://github.com/apache/ozone/pull/10000#discussion_r3021666985
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java:
##########
@@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig {
)
private int transactionToDNsCommitMapLimit = 5000000;
+ @Config(key = "hdds.scm.container.pending-allocation.roll-interval",
Review Comment:
we can have config as hdds.scm.container.pending.allocation.roll.interval
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java:
##########
@@ -89,6 +89,15 @@ protected void
processICR(IncrementalContainerReportFromDatanode report,
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
final ContainerInfo container;
try {
+ // Check if container is already known to this DN before adding
+ boolean alreadyOnDn = false;
Review Comment:
Do we really need check if container exist? or just remove if exist as
single call ?
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ if (now - lastRollTime >= rollIntervalMs) {
+ // Shift: current becomes previous
+ previousWindow = currentWindow;
+ // Reset: new empty current window
+ currentWindow = ConcurrentHashMap.newKeySet();
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {}, Current window
reset to empty", previousWindow.size());
+ }
+ }
+
+ /**
+ * Get union of both windows (all pending containers).
+ */
+ synchronized Set<ContainerID> getAllPending() {
+ Set<ContainerID> all = new HashSet<>();
+ all.addAll(currentWindow);
+ all.addAll(previousWindow);
+ return all;
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID) {
+ return currentWindow.add(containerID);
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ return removedFromCurrent || removedFromPrevious;
+ }
+
+ /**
+ * Check if either window is non-empty.
+ */
+ synchronized boolean isEmpty() {
+ return currentWindow.isEmpty() && previousWindow.isEmpty();
+ }
+
+ /**
+ * Get count of all pending containers (union).
+ */
+ synchronized int getCount() {
+ return getAllPending().size();
+ }
+ }
+
+ public PendingContainerTracker(long maxContainerSize) {
+ this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+ }
+
+ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
+ SCMContainerManagerMetrics metrics) {
+ this.datanodeBuckets = new ConcurrentHashMap<>();
+ this.maxContainerSize = maxContainerSize;
+ this.rollIntervalMs = rollIntervalMs;
+ this.metrics = metrics;
+ LOG.info("PendingContainerTracker initialized with maxContainerSize={}B,
rollInterval={}ms",
+ maxContainerSize, rollIntervalMs);
+ }
+
+ /**
+ * Record a pending container allocation for all DataNodes in the pipeline.
+ * Container is added to the current window.
+ *
+ * @param pipeline The pipeline where container is allocated
+ * @param containerID The container being allocated
+ */
+ public void recordPendingAllocation(Pipeline pipeline, ContainerID
containerID) {
Review Comment:
This needs to be part of SCMNodeManager, more specific to SCMNodeStat.
Reason,
1. need handle even like stale node / dead node handler as cleanup
2. May need report this when reporting to CLI for available space in the DN
3. To be used for pipeline allocation policy, where container manager does
not come in role
Its datanode space, just trying to identify already allocated space. And
needs to be part of committed space at SCM when reporting to CLI, or other
breakup.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline,
String owner)
return containerInfo;
}
+ /**
+ * Check if a pipeline has sufficient space after considering pending
allocations.
+ * Tracks containers scheduled but not yet written to DataNodes, preventing
over-allocation.
+ *
+ * @param pipeline The pipeline to check
+ * @return true if sufficient space is available, false otherwise
+ */
+ private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
+ try {
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ // Get DN's storage statistics
+ DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
+ if (datanodeInfo == null) {
+ LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
+ return false;
+ }
+
+ List<StorageReportProto> storageReports =
datanodeInfo.getStorageReports();
+ if (storageReports == null || storageReports.isEmpty()) {
+ LOG.warn("No storage reports for node {}", node.getUuidString());
+ return false;
+ }
+
+ // Calculate total capacity and effective allocatable space
+ // For each disk, calculate how many containers can actually fit,
+ // since containers are written to individual disks, not spread across
them.
+ // Example: disk1=9GB, disk2=14GB with 5GB containers
+ // (1*5GB) + (2*5GB) = 15GB → actually 3 containers
+ long totalCapacity = 0L;
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
Review Comment:
Instead of calcuating all available and then removing, we can do progressive
base, like,
required=pending+newAllocation
for each report
required = required - volumeUsage in roundoff value
if (required <= 0)
return true
But we need to reserve also, can do first add and check, if not present,
remove containerId
OR other way,
when DN report storage handling, total consolidate value can also be added
to memory to avoid looping on every call.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java:
##########
@@ -103,6 +112,13 @@ protected void
processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher,
detailsForLogging);
+
+ // Remove from pending tracker when container is added to DN
+ if (!alreadyOnDn && getContainerManager() instanceof
ContainerManagerImpl) {
Review Comment:
Please check if node report is also send in ICR, this is for reason that
node information should be updated with ICR at same time.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
+ private Set<ContainerID> previousWindow = ConcurrentHashMap.newKeySet();
+ private long lastRollTime = Time.monotonicNow();
+ private final long rollIntervalMs;
+
+ TwoWindowBucket(long rollIntervalMs) {
+ this.rollIntervalMs = rollIntervalMs;
+ }
+
+ /**
+ * Roll the windows: previous = current, current = empty.
+ * Called when current time exceeds lastRollTime + rollIntervalMs.
+ */
+ synchronized void rollIfNeeded() {
+ long now = Time.monotonicNow();
+ if (now - lastRollTime >= rollIntervalMs) {
+ // Shift: current becomes previous
+ previousWindow = currentWindow;
+ // Reset: new empty current window
+ currentWindow = ConcurrentHashMap.newKeySet();
+ lastRollTime = now;
+ LOG.debug("Rolled window. Previous window size: {}, Current window
reset to empty", previousWindow.size());
+ }
+ }
+
+ /**
+ * Get union of both windows (all pending containers).
+ */
+ synchronized Set<ContainerID> getAllPending() {
+ Set<ContainerID> all = new HashSet<>();
+ all.addAll(currentWindow);
+ all.addAll(previousWindow);
+ return all;
+ }
+
+ /**
+ * Add container to current window.
+ */
+ synchronized boolean add(ContainerID containerID) {
+ return currentWindow.add(containerID);
+ }
+
+ /**
+ * Remove container from both windows.
+ */
+ synchronized boolean remove(ContainerID containerID) {
+ boolean removedFromCurrent = currentWindow.remove(containerID);
+ boolean removedFromPrevious = previousWindow.remove(containerID);
+ return removedFromCurrent || removedFromPrevious;
+ }
+
+ /**
+ * Check if either window is non-empty.
+ */
+ synchronized boolean isEmpty() {
+ return currentWindow.isEmpty() && previousWindow.isEmpty();
+ }
+
+ /**
+ * Get count of all pending containers (union).
+ */
+ synchronized int getCount() {
+ return getAllPending().size();
+ }
+ }
+
+ public PendingContainerTracker(long maxContainerSize) {
+ this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes
+ }
+
+ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs,
+ SCMContainerManagerMetrics metrics) {
+ this.datanodeBuckets = new ConcurrentHashMap<>();
+ this.maxContainerSize = maxContainerSize;
+ this.rollIntervalMs = rollIntervalMs;
+ this.metrics = metrics;
+ LOG.info("PendingContainerTracker initialized with maxContainerSize={}B,
rollInterval={}ms",
+ maxContainerSize, rollIntervalMs);
+ }
+
+ /**
+ * Record a pending container allocation for all DataNodes in the pipeline.
+ * Container is added to the current window.
+ *
+ * @param pipeline The pipeline where container is allocated
+ * @param containerID The container being allocated
+ */
+ public void recordPendingAllocation(Pipeline pipeline, ContainerID
containerID) {
+ if (pipeline == null || containerID == null) {
+ LOG.warn("Ignoring null pipeline or containerID");
+ return;
+ }
+
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ recordPendingAllocationForDatanode(node, containerID);
+ }
+ }
+
+ /**
+ * Record a pending container allocation for a single DataNode.
+ * Container is added to the current window.
+ *
+ * @param node The DataNode where container is being allocated/replicated
+ * @param containerID The container being allocated/replicated
+ */
+ public void recordPendingAllocationForDatanode(DatanodeDetails node,
ContainerID containerID) {
+ if (node == null || containerID == null) {
+ LOG.warn("Ignoring null node or containerID");
+ return;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent(
+ node.getUuid(),
+ k -> new TwoWindowBucket(rollIntervalMs)
+ );
+
+ // Roll window if needed before adding
+ bucket.rollIfNeeded();
+
+ boolean added = bucket.add(containerID);
+ LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total
pending={}",
+ containerID, node.getUuidString(), added, bucket.getCount());
+
+ // Increment metrics counter
+ if (added && metrics != null) {
+ metrics.incNumPendingContainersAdded();
+ }
+ }
+
+ /**
+ * Remove a pending container allocation from a specific DataNode.
+ * Removes from both current and previous windows.
+ * Called when container is confirmed.
+ *
+ * @param node The DataNode
+ * @param containerID The container to remove from pending
+ */
+ public void removePendingAllocation(DatanodeDetails node, ContainerID
containerID) {
+ if (node == null || containerID == null) {
+ return;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid());
+ if (bucket != null) {
+ // Roll window if needed before removing
+ bucket.rollIfNeeded();
+
+ boolean removed = bucket.remove(containerID);
+ LOG.info("Removed pending container {} from DataNode {}. Removed={},
Remaining={}",
+ containerID, node.getUuidString(), removed, bucket.getCount());
+
+ // Increment metrics counter
+ if (removed && metrics != null) {
+ metrics.incNumPendingContainersRemoved();
+ }
+
+ // Cleanup empty buckets to prevent memory leak
+ if (bucket.isEmpty()) {
+ LOG.info("Cleanup pending bucket");
+ datanodeBuckets.remove(node.getUuid(), bucket);
+ }
+ }
+ }
+
+ /**
+ * Get the total size of pending allocations on a DataNode.
+ * Returns union of current and previous windows.
+ *
+ * @param node The DataNode
+ * @return Total bytes of pending container allocations
+ */
+ public long getPendingAllocationSize(DatanodeDetails node) {
+ if (node == null) {
+ return 0;
+ }
+
+ TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid());
+ LOG.info("Get pending from DataNode {}",
+ node.getUuidString());
+ if (bucket == null) {
+ LOG.info("Get pending from DataNode {} is null",
+ node.getUuidString());
+ return 0;
+ }
+
+ // Roll window if needed before querying
+ bucket.rollIfNeeded();
+
+ // Each pending container assumes max size
+ return (long) bucket.getCount() * maxContainerSize;
Review Comment:
This is costly operation as first it combine 2 list, and then it performs
count.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks pending container allocations using a Two Window Tumbling Bucket
pattern.
+ * Similar like HDFS HADOOP-3707.
+ *
+ * Two Window Tumbling Bucket for automatic aging and cleanup.
+ *
+ * How It Works:
+ * <li>Each DataNode has two sets: <b>currentWindow</b> and
<b>previousWindow</b></li>
+ * <li>New allocations go into <b>currentWindow</b></li>
+ * <li>Every <b>ROLL_INTERVAL</b> (default 10 minutes):
+ * <ul>
+ * <li>previousWindow = currentWindow (shift)</li>
+ * <li>currentWindow = new empty set (reset)</li>
+ * <li>Old previousWindow is discarded (automatic aging)</li>
+ * </ul>
+ * </li>
+ * <li>When checking pending: return <b>union</b> of currentWindow +
previousWindow</li>
+ *
+ *
+ * Example Timeline:
+ * <pre>
+ * Time | Action | CurrentWindow | PreviousWindow | Total
Pending
+ *
------+---------------------------+---------------+----------------+--------------
+ * 00:00 | Allocate Container-1 | {C1} | {} | {C1}
+ * 00:05 | Allocate Container-2 | {C1, C2} | {} | {C1,
C2}
+ * 00:10 | [ROLL] Window tumbles | {} | {C1, C2} | {C1,
C2}
+ * 00:12 | Allocate Container-3 | {C3} | {C1, C2} | {C1,
C2, C3}
+ * 00:15 | Report confirms C1 | {C3} | {C2} | {C2,
C3}
+ * 00:20 | [ROLL] Window tumbles | {} | {C3} | {C3}
+ * | (C2 aged out if not reported)
+ * </pre>
+ *
+ */
+public class PendingContainerTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PendingContainerTracker.class);
+
+ /**
+ * Roll interval in milliseconds.
+ * Configurable via hdds.scm.container.pending-allocation.roll-interval.
+ * Default: 10 minutes.
+ * Containers automatically age out after 2 × rollIntervalMs.
+ */
+ private final long rollIntervalMs;
+
+ /**
+ * Map of DataNode UUID to TwoWindowBucket.
+ */
+ private final ConcurrentHashMap<UUID, TwoWindowBucket> datanodeBuckets;
+
+ /**
+ * Maximum container size in bytes.
+ */
+ private final long maxContainerSize;
+
+ /**
+ * Metrics for tracking pending containers.
+ */
+ private final SCMContainerManagerMetrics metrics;
+
+ /**
+ * Two-window bucket for a single DataNode.
+ * Contains current and previous window sets, plus last roll timestamp.
+ */
+ private static class TwoWindowBucket {
+ private Set<ContainerID> currentWindow = ConcurrentHashMap.newKeySet();
Review Comment:
if we use synchronized in all methods, then set need not be threadsafe.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java:
##########
@@ -175,6 +175,15 @@ public void onMessage(final ContainerReportFromDatanode
reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);
+
+ // Remove from pending tracker when container is added to DN
+ // This container was just confirmed for the first time on this DN
+ // No need to remove on subsequent reports (it's already been
removed)
+ if (container != null && getContainerManager() instanceof
ContainerManagerImpl) {
Review Comment:
code handling different from ICR and FCR, can be same only.
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java:
##########
@@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig {
)
private int transactionToDNsCommitMapLimit = 5000000;
+ @Config(key = "hdds.scm.container.pending-allocation.roll-interval",
+ defaultValue = "10m",
+ type = ConfigType.TIME,
+ tags = { ConfigTag.SCM, ConfigTag.CONTAINER },
+ description =
+ "Time interval for rolling the pending container allocation window.
" +
+ "Pending container allocations are tracked in a two-window
tumbling bucket " +
+ "pattern. Each window has this duration. " +
+ "After 2x this interval, allocations that haven't been confirmed
via " +
+ "container reports will automatically age out. Default is 10
minutes."
+ )
+ private Duration pendingContainerAllocationRollInterval =
Duration.ofMinutes(10);
Review Comment:
rolling period is 5 min or 10 min ? mean previous bucket to be there for
additional 5 min to capture containerList IN-Progress
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]