szetszwo commented on code in PR #10000:
URL: https://github.com/apache/ozone/pull/10000#discussion_r3328959533
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -158,52 +177,36 @@ public PendingContainerTracker(long maxContainerSize,
long rollIntervalMs, SCMNo
}
/**
- * Whether the datanode can fit another container of {@link
#maxContainerSize} after accounting for
- * SCM pending allocations for {@code node} (this tracker) and usable space
across volumes on
- * {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize};
- * effective allocatable space sums full-container slots per storage report.
+ * Atomically checks if the datanode has space for a new container and
records the allocation
+ * if space is available. The check-and-add atomicity is enforced inside
+ * {@link TwoWindowBucket#checkSpaceAndAdd}.
*
- * @param datanodeInfo storage reports for the datanode
+ * @param datanodeInfo datanode whose storage reports and pending bucket
+ * @param containerID the container being allocated
+ * @return true if space was available and the allocation was recorded,
false otherwise
*/
- public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo
datanodeInfo) {
+ public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo,
ContainerID containerID) {
Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
+ Objects.requireNonNull(containerID, "containerID == null");
- long pendingAllocationSize =
datanodeInfo.getPendingContainerAllocations().getCount() * maxContainerSize;
List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
Objects.requireNonNull(storageReports, "storageReports == null");
if (storageReports.isEmpty()) {
return false;
}
- long effectiveAllocatableSpace = 0L;
- for (StorageReportProto report : storageReports) {
- long usableSpace = VolumeUsage.getUsableSpace(report);
- long containersOnThisDisk = usableSpace / maxContainerSize;
- effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
- if (effectiveAllocatableSpace - pendingAllocationSize >=
maxContainerSize) {
- return true;
- }
- }
- if (metrics != null) {
- metrics.incNumSkippedFullNodeContainerAllocation();
- }
- return false;
- }
- /**
- * Record a pending container allocation for a single DataNode.
- * Container is added to the current window.
- *
- * @param containerID The container being allocated/replicated
- */
- public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo,
ContainerID containerID) {
- Objects.requireNonNull(containerID, "containerID == null");
- if (datanodeInfo == null) {
- return;
- }
- final boolean added =
datanodeInfo.getPendingContainerAllocations().add(containerID);
- if (added && metrics != null) {
- metrics.incNumPendingContainersAdded();
+ boolean added = datanodeInfo.getPendingContainerAllocations()
+ .checkSpaceAndAdd(storageReports, maxContainerSize, containerID);
+ if (added) {
+ if (metrics != null) {
+ metrics.incNumPendingContainersAdded();
+ }
+ } else {
+ if (metrics != null) {
+ metrics.incNumSkippedFullNodeContainerAllocation();
+ }
}
Review Comment:
Check null first:
```java
if (metrics != null) {
if (added) {
metrics.incNumPendingContainersAdded();
} else {
metrics.incNumSkippedFullNodeContainerAllocation();
}
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java:
##########
@@ -634,22 +636,23 @@ private boolean isOpenWithUnregisteredNodes(Pipeline
pipeline) {
}
@Override
- public boolean hasEnoughSpace(Pipeline pipeline) {
- for (DatanodeDetails node : pipeline.getNodes()) {
- if (!nodeManager.hasSpaceForNewContainerAllocation(node.getID())) {
+ public boolean checkSpaceAndRecordAllocation(Pipeline pipeline, ContainerID
containerID) {
+ List<DatanodeDetails> successfulNodes = new ArrayList<>();
+ for (DatanodeDetails dn : pipeline.getNodes()) {
Review Comment:
We should getDatanodeInfo first:
```java
public boolean checkSpaceAndRecordAllocation(Pipeline pipeline,
ContainerID containerID) {
final Set<DatanodeDetails> datanodeDetails = pipeline.getNodeSet();
final List<DatanodeInfo> datanodeInfos = new
ArrayList<>(datanodeDetails.size());
for (DatanodeDetails dn : datanodeDetails) {
final DatanodeInfo info = nodeManager.getDatanodeInfo(dn);
if (info == null) {
LOG.warn("DatanodeInfo not found for {}", dn.getID());
return false;
}
datanodeInfos.add(info);
}
final List<DatanodeInfo> successfulNodes = new
ArrayList<>(datanodeInfos.size());
for (DatanodeInfo dn : datanodeInfos) {
if (!nodeManager.checkSpaceAndRecordAllocation(dn, containerID)) {
for (DatanodeInfo rollbackNode : successfulNodes) {
nodeManager.removePendingAllocationForDatanode(rollbackNode,
containerID);
}
return false;
}
successfulNodes.add(dn);
}
return true;
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -148,6 +139,34 @@ synchronized boolean remove(ContainerID containerID) {
synchronized int getCount() {
return currentWindow.size() + previousWindow.size();
}
+
+ /**
+ * Atomically checks whether there is allocatable space for one more
container of
+ * {@code maxContainerSize} given the current pending count, and adds
{@code containerID}
+ * to the current window if so.
+ *
+ * @param storageReports storage reports for the datanode
+ * @param maxContainerSize maximum size of a single container in bytes
+ * @param containerID the container being allocated
+ * @return true if space was available and the container was recorded,
false otherwise
+ */
+ synchronized boolean checkSpaceAndAdd(
+ List<StorageReportProto> storageReports, long maxContainerSize,
ContainerID containerID) {
+ long pendingAllocationSize = getCount() * maxContainerSize;
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ if (effectiveAllocatableSpace - pendingAllocationSize >=
maxContainerSize) {
Review Comment:
Use count instead of size (Cc @rakeshadr):
```java
synchronized boolean checkSpaceAndAdd(
List<StorageReportProto> storageReports, long maxContainerSize,
ContainerID containerID) {
final int pendingAllocationCount = getCount();
long allocatableCount = 0;
for (StorageReportProto report : storageReports) {
final long allocatableCountOnThisDisk =
VolumeUsage.getUsableSpace(report) / maxContainerSize;
allocatableCount += allocatableCountOnThisDisk;
if (allocatableCount > pendingAllocationCount) {
final boolean added = currentWindow.add(containerID);
LOG.debug("Recorded pending container {} on DataNode {}. Added={},
Total pending={}",
containerID, datanodeID, added, getCount());
return added;
}
}
return false;
}
}
```
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/PendingContainerTracker.java:
##########
@@ -148,6 +139,34 @@ synchronized boolean remove(ContainerID containerID) {
synchronized int getCount() {
return currentWindow.size() + previousWindow.size();
}
+
+ /**
+ * Atomically checks whether there is allocatable space for one more
container of
+ * {@code maxContainerSize} given the current pending count, and adds
{@code containerID}
+ * to the current window if so.
+ *
+ * @param storageReports storage reports for the datanode
+ * @param maxContainerSize maximum size of a single container in bytes
+ * @param containerID the container being allocated
+ * @return true if space was available and the container was recorded,
false otherwise
+ */
+ synchronized boolean checkSpaceAndAdd(
+ List<StorageReportProto> storageReports, long maxContainerSize,
ContainerID containerID) {
+ long pendingAllocationSize = getCount() * maxContainerSize;
+ long effectiveAllocatableSpace = 0L;
+ for (StorageReportProto report : storageReports) {
+ long usableSpace = VolumeUsage.getUsableSpace(report);
+ long containersOnThisDisk = usableSpace / maxContainerSize;
+ effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
+ if (effectiveAllocatableSpace - pendingAllocationSize >=
maxContainerSize) {
+ boolean added = currentWindow.add(containerID);
+ LOG.debug("Recorded pending container {} on DataNode {}. Added={},
Total pending={}",
+ containerID, datanodeID, added, getCount());
+ return true;
Review Comment:
return added;
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java:
##########
@@ -185,17 +185,23 @@ default int getAllNodeCount() {
DatanodeInfo getDatanodeInfo(DatanodeDetails dn);
/**
- * True if the node can accept another container of the given size.
+ * Atomically checks if the datanode has space for a new container and
records the allocation
+ * if space is available. This prevents race conditions where multiple
threads check space
+ * concurrently and over-allocate.
+ *
+ * @param datanodeID the ID of the DataNode receiving the allocation
+ * @param containerID the container being allocated
+ * @return true if space was available and allocation was recorded, false
otherwise
*/
- boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID);
+ boolean checkSpaceAndRecordAllocation(DatanodeID datanodeID, ContainerID
containerID);
Review Comment:
Pass DatanodeInfo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]