This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 2f2d7304aa HDDS-12437. [DiskBalancer] Estimate the total size pending
to move before disk usage becomes even (#8056)
2f2d7304aa is described below
commit 2f2d7304aa35661d7ea9f7e26d45ea5ca7558222
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Mon Mar 24 13:10:09 2025 +0530
HDDS-12437. [DiskBalancer] Estimate the total size pending to move before
disk usage becomes even (#8056)
---
.../container/diskbalancer/DiskBalancerInfo.java | 6 +-
.../diskbalancer/DiskBalancerService.java | 41 +++++++++++-
.../diskbalancer/TestDiskBalancerService.java | 72 ++++++++++++++++++++++
.../interface-client/src/main/proto/hdds.proto | 1 +
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 1 +
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 11 ++--
.../hadoop/hdds/scm/node/DiskBalancerStatus.java | 8 ++-
.../cli/datanode/DiskBalancerStatusSubcommand.java | 30 ++++++++-
.../cli/datanode/TestDiskBalancerSubCommand.java | 4 +-
9 files changed, 162 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 87269f2a68..0a296d5d9f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -34,6 +34,7 @@ public class DiskBalancerInfo {
private DiskBalancerVersion version;
private long successCount;
private long failureCount;
+ private long bytesToMove;
public DiskBalancerInfo(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread) {
@@ -50,9 +51,10 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
this.version = version;
}
+ @SuppressWarnings("checkstyle:ParameterNumber")
public DiskBalancerInfo(boolean shouldRun, double threshold,
long bandwidthInMB, int parallelThread, DiskBalancerVersion version,
- long successCount, long failureCount) {
+ long successCount, long failureCount, long bytesToMove) {
this.shouldRun = shouldRun;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
@@ -60,6 +62,7 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
this.version = version;
this.successCount = successCount;
this.failureCount = failureCount;
+ this.bytesToMove = bytesToMove;
}
public DiskBalancerInfo(boolean shouldRun,
@@ -94,6 +97,7 @@ public
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBala
builder.setDiskBalancerConf(confProto);
builder.setSuccessMoveCount(successCount);
builder.setFailureMoveCount(failureCount);
+ builder.setBytesToMove(bytesToMove);
return builder.build();
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 9897a0acdb..5bad084835 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -103,6 +103,7 @@ public class DiskBalancerService extends BackgroundService {
private final File diskBalancerInfoFile;
private DiskBalancerServiceMetrics metrics;
+ private long bytesToMove;
public DiskBalancerService(OzoneContainer ozoneContainer,
long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
@@ -351,7 +352,10 @@ public BackgroundTaskQueue getTasks() {
if (queue.isEmpty()) {
metrics.incrIdleLoopNoAvailableVolumePairCount();
+ } else {
+ bytesToMove = calculateBytesToMove(volumeSet);
}
+
return queue;
}
@@ -505,7 +509,42 @@ private void postCall() {
public DiskBalancerInfo getDiskBalancerInfo() {
return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
- parallelThread, version, metrics.getSuccessCount(),
metrics.getFailureCount());
+ parallelThread, version, metrics.getSuccessCount(),
+ metrics.getFailureCount(), bytesToMove);
+ }
+
+ public long calculateBytesToMove(MutableVolumeSet inputVolumeSet) {
+ long bytesPendingToMove = 0;
+ long totalUsedSpace = 0;
+ long totalCapacity = 0;
+
+ for (HddsVolume volume :
StorageVolumeUtil.getHddsVolumesList(inputVolumeSet.getVolumesList())) {
+ totalUsedSpace += volume.getCurrentUsage().getUsedSpace();
+ totalCapacity += volume.getCurrentUsage().getCapacity();
+ }
+
+ if (totalCapacity == 0) {
+ return 0;
+ }
+
+ double datanodeUtilization = (double) totalUsedSpace / totalCapacity;
+
+ double thresholdFraction = threshold / 100.0;
+ double upperLimit = datanodeUtilization + thresholdFraction;
+
+ // Calculate excess data in overused volumes
+ for (HddsVolume volume :
StorageVolumeUtil.getHddsVolumesList(inputVolumeSet.getVolumesList())) {
+ long usedSpace = volume.getCurrentUsage().getUsedSpace();
+ long capacity = volume.getCurrentUsage().getCapacity();
+ double volumeUtilization = (double) usedSpace / capacity;
+
+ // Consider only volumes exceeding the upper threshold
+ if (volumeUtilization > upperLimit) {
+ long excessData = usedSpace - (long) (upperLimit * capacity);
+ bytesPendingToMove += excessData;
+ }
+ }
+ return bytesPendingToMove;
}
private Path getDiskBalancerTmpDir(HddsVolume hddsVolume) {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index 34e1c634ab..b5eb50f69a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -28,7 +28,9 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.List;
import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -37,6 +39,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
@@ -50,6 +53,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* This is a test class for DiskBalancerService.
@@ -89,6 +95,7 @@ public void init() throws IOException {
public void cleanup() throws IOException {
BlockUtils.shutdownCache(conf);
FileUtils.deleteDirectory(testRoot);
+ volumeSet.shutdown();
}
@ContainerTestVersionInfo.ContainerTest
@@ -152,6 +159,10 @@ public void
testPolicyClassInitialization(ContainerTestVersionInfo versionInfo)
}
private String generateVolumeLocation(String base, int volumeCount) {
+ if (volumeCount == 0) {
+ return "";
+ }
+
StringBuilder sb = new StringBuilder();
for (int i = 0; i < volumeCount; i++) {
sb.append(base + "/vol" + i);
@@ -170,6 +181,67 @@ private DiskBalancerServiceTestImpl getDiskBalancerService(
threadCount);
}
+ public static Stream<Arguments> values() {
+ return Stream.of(
+ Arguments.arguments(0, 0, 0),
+ Arguments.arguments(1, 0, 0),
+ Arguments.arguments(1, 50, 0),
+ Arguments.arguments(2, 0, 0),
+ Arguments.arguments(2, 10, 0),
+ Arguments.arguments(2, 50, 40), // one disk is 50% above average, the
other disk is 50% below average
+ Arguments.arguments(3, 0, 0),
+ Arguments.arguments(3, 10, 0),
+ Arguments.arguments(4, 0, 0),
+ Arguments.arguments(4, 50, 40) // two disks are 50% above average, the
other two disks are 50% below average
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("values")
+ public void testCalculateBytesToMove(int volumeCount, int deltaUsagePercent,
+ long expectedBytesToMovePercent) throws IOException {
+ int updatedVolumeCount = volumeCount == 0 ? 1 : volumeCount;
+ conf.set(ScmConfigKeys.HDDS_DATANODE_DIR_KEY,
+ generateVolumeLocation(testRoot.getAbsolutePath(),
updatedVolumeCount));
+ volumeSet = new MutableVolumeSet(datanodeUuid, scmId, conf, null,
+ StorageVolume.VolumeType.DATA_VOLUME, null);
+ createDbInstancesForTestIfNeeded(volumeSet, scmId, scmId, conf);
+ if (volumeCount == 0) {
+ volumeSet.failVolume(((HddsVolume)
volumeSet.getVolumesList().get(0)).getHddsRootDir().getAbsolutePath());
+ }
+
+ double avgUtilization = 0.5;
+ int totalOverUtilisedVolumes = 0;
+
+ List<StorageVolume> volumes = volumeSet.getVolumesList();
+ for (int i = 0; i < volumes.size(); i++) {
+ StorageVolume vol = volumes.get(i);
+ long totalCapacityPerVolume = vol.getCurrentUsage().getCapacity();
+ if (i % 2 == 0) {
+ vol.incrementUsedSpace((long) (totalCapacityPerVolume *
(avgUtilization + deltaUsagePercent / 100.0)));
+ totalOverUtilisedVolumes++;
+ } else {
+ vol.incrementUsedSpace((long) (totalCapacityPerVolume *
(avgUtilization - deltaUsagePercent / 100.0)));
+ }
+ }
+
+ ContainerSet containerSet = new ContainerSet(1000);
+ ContainerMetrics metrics = ContainerMetrics.create(conf);
+ KeyValueHandler keyValueHandler =
+ new KeyValueHandler(conf, datanodeUuid, containerSet, volumeSet,
+ metrics, c -> {
+ });
+ DiskBalancerServiceTestImpl svc =
+ getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1);
+
+ long totalCapacity = volumes.isEmpty() ? 0 :
volumes.get(0).getCurrentUsage().getCapacity();
+ long expectedBytesToMove = (long) Math.ceil(
+ (totalCapacity * expectedBytesToMovePercent) / 100.0 *
totalOverUtilisedVolumes);
+
+ // data precision loss due to double data involved in calculation
+ assertEquals(Math.abs(expectedBytesToMove -
svc.calculateBytesToMove(volumeSet)) <= 1, true);
+ }
+
private OzoneContainer mockDependencies(ContainerSet containerSet,
KeyValueHandler keyValueHandler, ContainerController controller) {
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 27ca714e8a..74934fe938 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -563,4 +563,5 @@ message DatanodeDiskBalancerInfoProto {
optional DiskBalancerConfigurationProto diskBalancerConf = 4;
optional uint64 successMoveCount = 5;
optional uint64 failureMoveCount = 6;
+ optional uint64 bytesToMove = 7;
}
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 2f0bb37b59..487ac8a136 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -501,6 +501,7 @@ message DiskBalancerReportProto {
optional DiskBalancerConfigurationProto diskBalancerConf = 3;
optional uint64 successMoveCount = 4;
optional uint64 failureMoveCount = 5;
+ optional uint64 bytesToMove = 6;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
index 6ca5e610d8..5567a61b03 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -267,7 +267,8 @@ private HddsProtos.DatanodeDiskBalancerInfoProto
getInfoProto(
.setCurrentVolumeDensitySum(volumeDensitySum)
.setRunningStatus(status.getRunningStatus())
.setSuccessMoveCount(status.getSuccessMoveCount())
- .setFailureMoveCount(status.getFailureMoveCount());
+ .setFailureMoveCount(status.getFailureMoveCount())
+ .setBytesToMove(status.getBytesToMove());
if (status.getRunningStatus() != DiskBalancerRunningStatus.UNKNOWN) {
builder.setDiskBalancerConf(statusMap.get(dn)
.getDiskBalancerConfiguration().toProtobufBuilder());
@@ -306,13 +307,14 @@ private double getVolumeDataDensitySumForDatanodeDetails(
private DiskBalancerStatus getStatus(DatanodeDetails datanodeDetails) {
return statusMap.computeIfAbsent(datanodeDetails,
- dn -> new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN, new
DiskBalancerConfiguration(), 0, 0));
+ dn -> new DiskBalancerStatus(DiskBalancerRunningStatus.UNKNOWN,
+ new DiskBalancerConfiguration(), 0, 0, 0));
}
@VisibleForTesting
public void addRunningDatanode(DatanodeDetails datanodeDetails) {
statusMap.put(datanodeDetails, new
DiskBalancerStatus(DiskBalancerRunningStatus.RUNNING,
- new DiskBalancerConfiguration(), 0, 0));
+ new DiskBalancerConfiguration(), 0, 0, 0));
}
public void processDiskBalancerReport(DiskBalancerReportProto reportProto,
@@ -325,9 +327,10 @@ public void
processDiskBalancerReport(DiskBalancerReportProto reportProto,
new DiskBalancerConfiguration();
long successMoveCount = reportProto.getSuccessMoveCount();
long failureMoveCount = reportProto.getFailureMoveCount();
+ long bytesToMove = reportProto.getBytesToMove();
statusMap.put(dn, new DiskBalancerStatus(
isRunning ? DiskBalancerRunningStatus.RUNNING :
DiskBalancerRunningStatus.STOPPED,
- diskBalancerConfiguration, successMoveCount, failureMoveCount));
+ diskBalancerConfiguration, successMoveCount, failureMoveCount,
bytesToMove));
if (reportProto.hasBalancedBytes()) {
balancedBytesMap.put(dn, reportProto.getBalancedBytes());
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
index dc8f4836ab..0b99bfbaa3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -33,13 +33,15 @@ public class DiskBalancerStatus {
private DiskBalancerConfiguration diskBalancerConfiguration;
private long successMoveCount;
private long failureMoveCount;
+ private long bytesToMove;
public DiskBalancerStatus(DiskBalancerRunningStatus isRunning,
DiskBalancerConfiguration conf,
- long successMoveCount, long failureMoveCount) {
+ long successMoveCount, long failureMoveCount, long bytesToMove) {
this.isRunning = isRunning;
this.diskBalancerConfiguration = conf;
this.successMoveCount = successMoveCount;
this.failureMoveCount = failureMoveCount;
+ this.bytesToMove = bytesToMove;
}
public DiskBalancerRunningStatus getRunningStatus() {
@@ -57,4 +59,8 @@ public long getSuccessMoveCount() {
public long getFailureMoveCount() {
return failureMoveCount;
}
+
+ public long getBytesToMove() {
+ return bytesToMove;
+ }
}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
index 75d3759ffd..2f14079327 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/datanode/DiskBalancerStatusSubcommand.java
@@ -60,7 +60,7 @@ public void execute(ScmClient scmClient) throws IOException {
private String generateStatus(
List<HddsProtos.DatanodeDiskBalancerInfoProto> protos) {
StringBuilder formatBuilder = new StringBuilder("Status result:%n" +
- "%-40s %-20s %-10s %-10s %-15s %-15s %-15s %-15s%n");
+ "%-35s %-25s %-15s %-15s %-15s %-12s %-12s %-12s %-12s %-12s%n");
List<String> contentList = new ArrayList<>();
contentList.add("Datanode");
@@ -71,11 +71,16 @@ private String generateStatus(
contentList.add("Threads");
contentList.add("SuccessMove");
contentList.add("FailureMove");
+ contentList.add("EstBytesToMove(MB)");
+ contentList.add("EstTimeLeft(min)");
for (HddsProtos.DatanodeDiskBalancerInfoProto proto: protos) {
- formatBuilder.append("%-40s %-20s %-10s %-10s %-15s %-15s %-15s
%-15s%n");
+ formatBuilder.append("%-35s %-25s %-15s %-15s %-15s %-12s %-12s %-12s
%-12s %-12s%n");
+ long estimatedTimeLeft = calculateEstimatedTimeLeft(proto);
+ long bytesToMoveMB = proto.getBytesToMove() / (1024 * 1024);
+
contentList.add(proto.getNode().getHostName());
- contentList.add(String.valueOf(proto.getCurrentVolumeDensitySum()));
+ contentList.add(String.format("%.18f",
proto.getCurrentVolumeDensitySum()));
contentList.add(proto.getRunningStatus().name());
contentList.add(
String.format("%.4f", proto.getDiskBalancerConf().getThreshold()));
@@ -85,9 +90,28 @@ private String generateStatus(
String.valueOf(proto.getDiskBalancerConf().getParallelThread()));
contentList.add(String.valueOf(proto.getSuccessMoveCount()));
contentList.add(String.valueOf(proto.getFailureMoveCount()));
+ contentList.add(String.valueOf(bytesToMoveMB));
+ contentList.add(estimatedTimeLeft >= 0 ?
String.valueOf(estimatedTimeLeft) : "N/A");
}
+ formatBuilder.append("%nNote: Estimated time left is calculated" +
+ " based on the estimated bytes to move and the configured disk
bandwidth.");
+
return String.format(formatBuilder.toString(),
contentList.toArray(new String[0]));
}
+
+ private long
calculateEstimatedTimeLeft(HddsProtos.DatanodeDiskBalancerInfoProto proto) {
+ long bytesToMove = proto.getBytesToMove();
+
+ if (bytesToMove == 0) {
+ return 0;
+ }
+ long bandwidth = proto.getDiskBalancerConf().getDiskBandwidthInMB();
+
+ // Convert estimated data from bytes to MB
+ double estimatedDataPendingMB = bytesToMove / (1024.0 * 1024.0);
+ double estimatedTimeLeft = (bandwidth > 0) ? (estimatedDataPendingMB /
bandwidth) / 60 : -1;
+ return (long) Math.ceil(estimatedTimeLeft);
+ }
}
diff --git
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
index 49ce4f3cdd..897eccba53 100644
---
a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
+++
b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/datanode/TestDiskBalancerSubCommand.java
@@ -100,8 +100,8 @@ public void testDiskBalancerStatusSubcommand()
statusCmd.execute(scmClient);
- // 2 Headers + 10 results
- assertEquals(12, newLineCount(outContent.toString(DEFAULT_ENCODING)));
+ // 2 Headers + 10 results + 1 note
+ assertEquals(13, newLineCount(outContent.toString(DEFAULT_ENCODING)));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]