This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 63689bd24bf HDDS-13054. [DiskBalancer] Stop DiskBalancer when DN
enters DECOMMISSIONING or MAINTENANCE states (#8488)
63689bd24bf is described below
commit 63689bd24bf044bd49d8a2a100aa24ffa7f680b8
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Wed Jun 18 16:46:37 2025 +0530
HDDS-13054. [DiskBalancer] Stop DiskBalancer when DN enters DECOMMISSIONING
or MAINTENANCE states (#8488)
---
.../common/statemachine/DatanodeStateMachine.java | 2 +-
.../commandhandler/DiskBalancerCommandHandler.java | 12 +-
.../SetNodeOperationalStateCommandHandler.java | 11 +-
.../container/diskbalancer/DiskBalancerInfo.java | 43 ++-
.../diskbalancer/DiskBalancerService.java | 102 +++++-
.../container/diskbalancer/DiskBalancerYaml.java | 19 +-
.../states/endpoint/TestHeartbeatEndpointTask.java | 4 +-
.../diskbalancer/TestDiskBalancerService.java | 12 +-
.../diskbalancer/TestDiskBalancerYaml.java | 4 +-
...skBalancerDuringDecommissionAndMaintenance.java | 343 +++++++++++++++++++++
10 files changed, 502 insertions(+), 50 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 36c10eed470..fbcc994ba9d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -256,7 +256,7 @@ public DatanodeStateMachine(HddsDatanodeService
hddsDatanodeService,
.addHandler(new CreatePipelineCommandHandler(conf,
pipelineCommandExecutorService))
.addHandler(new SetNodeOperationalStateCommandHandler(conf,
- supervisor::nodeStateUpdated))
+ supervisor::nodeStateUpdated,
container.getDiskBalancerService()::nodeStateUpdated))
.addHandler(new FinalizeNewLayoutVersionCommandHandler())
.addHandler(new RefreshVolumeUsageCommandHandler())
.addHandler(new DiskBalancerCommandHandler())
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
index 38398d391c9..fb4e0dfcab9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DiskBalancerCommandHandler.java
@@ -25,6 +25,7 @@
import
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.DiskBalancerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@@ -74,11 +75,18 @@ public void handle(SCMCommand command, OzoneContainer
ozoneContainer,
try {
switch (opType) {
case START:
- diskBalancerInfo.setShouldRun(true);
+ HddsProtos.NodeOperationalState state =
context.getParent().getDatanodeDetails().getPersistedOpState();
+
+ if (state == HddsProtos.NodeOperationalState.IN_SERVICE) {
+
diskBalancerInfo.setOperationalState(DiskBalancerOperationalState.RUNNING);
+ } else {
+ LOG.warn("Cannot start DiskBalancer as node is in {} state. Pausing
instead.", state);
+
diskBalancerInfo.setOperationalState(DiskBalancerOperationalState.PAUSED_BY_NODE_STATE);
+ }
diskBalancerInfo.updateFromConf(diskBalancerConf);
break;
case STOP:
- diskBalancerInfo.setShouldRun(false);
+
diskBalancerInfo.setOperationalState(DiskBalancerOperationalState.STOPPED);
break;
case UPDATE:
diskBalancerInfo.updateFromConf(diskBalancerConf);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
index 25a158bb45d..f6663bfc438 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -50,6 +50,7 @@ public class SetNodeOperationalStateCommandHandler implements
CommandHandler {
LoggerFactory.getLogger(SetNodeOperationalStateCommandHandler.class);
private final ConfigurationSource conf;
private final Consumer<HddsProtos.NodeOperationalState>
replicationSupervisor;
+ private final Consumer<HddsProtos.NodeOperationalState> diskBalancerService;
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final MutableRate opsLatencyMs;
@@ -59,9 +60,11 @@ public class SetNodeOperationalStateCommandHandler
implements CommandHandler {
* @param conf - Configuration for the datanode.
*/
public SetNodeOperationalStateCommandHandler(ConfigurationSource conf,
- Consumer<HddsProtos.NodeOperationalState> replicationSupervisor) {
+ Consumer<HddsProtos.NodeOperationalState> replicationSupervisor,
+ Consumer<HddsProtos.NodeOperationalState> diskBalancerService) {
this.conf = conf;
this.replicationSupervisor = replicationSupervisor;
+ this.diskBalancerService = diskBalancerService;
MetricsRegistry registry = new MetricsRegistry(
SetNodeOperationalStateCommandHandler.class.getSimpleName());
this.opsLatencyMs = registry.newRate(Type.setNodeOperationalStateCommand +
"Ms");
@@ -102,6 +105,12 @@ public void handle(SCMCommand<?> command, OzoneContainer
container,
// TODO - this should probably be raised, but it will break the command
// handler interface.
}
+
+ // Handle DiskBalancerService state changes
+ if (diskBalancerService != null) {
+ diskBalancerService.accept(state);
+ }
+
replicationSupervisor.accept(state);
this.opsLatencyMs.add(Time.monotonicNow() - startTime);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 60eabf2376c..eaad116dcf3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -22,12 +22,13 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
/**
* DiskBalancer's information to persist.
*/
public class DiskBalancerInfo {
- private boolean shouldRun;
+ private DiskBalancerOperationalState operationalState;
private double threshold;
private long bandwidthInMB;
private int parallelThread;
@@ -38,15 +39,15 @@ public class DiskBalancerInfo {
private long bytesToMove;
private long balancedBytes;
- public DiskBalancerInfo(boolean shouldRun, double threshold,
+ public DiskBalancerInfo(DiskBalancerOperationalState operationalState,
double threshold,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven) {
- this(shouldRun, threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
+ this(operationalState, threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
DiskBalancerVersion.DEFAULT_VERSION);
}
- public DiskBalancerInfo(boolean shouldRun, double threshold,
+ public DiskBalancerInfo(DiskBalancerOperationalState operationalState,
double threshold,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven,
DiskBalancerVersion version) {
- this.shouldRun = shouldRun;
+ this.operationalState = operationalState;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
@@ -55,10 +56,10 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
}
@SuppressWarnings("checkstyle:ParameterNumber")
- public DiskBalancerInfo(boolean shouldRun, double threshold,
+ public DiskBalancerInfo(DiskBalancerOperationalState operationalState,
double threshold,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven,
DiskBalancerVersion version,
long successCount, long failureCount, long bytesToMove, long
balancedBytes) {
- this.shouldRun = shouldRun;
+ this.operationalState = operationalState;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
@@ -72,7 +73,11 @@ public DiskBalancerInfo(boolean shouldRun, double threshold,
public DiskBalancerInfo(boolean shouldRun,
DiskBalancerConfiguration diskBalancerConf) {
- this.shouldRun = shouldRun;
+ if (shouldRun) {
+ this.operationalState = DiskBalancerOperationalState.RUNNING;
+ } else {
+ this.operationalState = DiskBalancerOperationalState.STOPPED;
+ }
this.threshold = diskBalancerConf.getThreshold();
this.bandwidthInMB = diskBalancerConf.getDiskBandwidthInMB();
this.parallelThread = diskBalancerConf.getParallelThread();
@@ -102,7 +107,7 @@ public
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBala
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder
builder =
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.newBuilder();
- builder.setIsRunning(shouldRun);
+ builder.setIsRunning(this.operationalState ==
DiskBalancerOperationalState.RUNNING);
builder.setDiskBalancerConf(confProto);
builder.setSuccessMoveCount(successCount);
builder.setFailureMoveCount(failureCount);
@@ -111,12 +116,16 @@ public
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBala
return builder.build();
}
- public boolean isShouldRun() {
- return shouldRun;
+ public DiskBalancerOperationalState getOperationalState() {
+ return operationalState;
}
- public void setShouldRun(boolean shouldRun) {
- this.shouldRun = shouldRun;
+ public void setOperationalState(DiskBalancerOperationalState
operationalState) {
+ this.operationalState = operationalState;
+ }
+
+ public boolean isShouldRun() {
+ return this.operationalState == DiskBalancerOperationalState.RUNNING;
}
public double getThreshold() {
@@ -151,6 +160,10 @@ public void setStopAfterDiskEven(boolean
stopAfterDiskEven) {
this.stopAfterDiskEven = stopAfterDiskEven;
}
+ public boolean isPaused() {
+ return this.operationalState ==
DiskBalancerOperationalState.PAUSED_BY_NODE_STATE;
+ }
+
public DiskBalancerVersion getVersion() {
return version;
}
@@ -168,7 +181,7 @@ public boolean equals(Object o) {
return false;
}
DiskBalancerInfo that = (DiskBalancerInfo) o;
- return shouldRun == that.shouldRun &&
+ return operationalState == that.operationalState &&
Double.compare(that.threshold, threshold) == 0 &&
bandwidthInMB == that.bandwidthInMB &&
parallelThread == that.parallelThread &&
@@ -178,7 +191,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(shouldRun, threshold, bandwidthInMB, parallelThread,
stopAfterDiskEven,
+ return Objects.hash(operationalState, threshold, bandwidthInMB,
parallelThread, stopAfterDiskEven,
version);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 610fe816a85..be7f092852e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -78,14 +78,16 @@ public class DiskBalancerService extends BackgroundService {
private OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
- private boolean shouldRun = false;
private double threshold;
private long bandwidthInMB;
private int parallelThread;
private boolean stopAfterDiskEven;
-
private DiskBalancerVersion version;
+ // State field using the new enum
+ private volatile DiskBalancerOperationalState operationalState =
+ DiskBalancerOperationalState.STOPPED;
+
private AtomicLong totalBalancedBytes = new AtomicLong(0L);
private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
@@ -112,6 +114,31 @@ public class DiskBalancerService extends BackgroundService
{
private DiskBalancerServiceMetrics metrics;
private long bytesToMove;
+ /**
+ * Defines the operational states of the DiskBalancerService.
+ */
+ public enum DiskBalancerOperationalState {
+ /**
+ * DiskBalancer is stopped and will not run unless explicitly started.
+ * This is the initial state, can be set by admin STOP commands,
+ * or if the balancer stops itself after disks are even.
+ */
+ STOPPED,
+
+ /**
+ * DiskBalancer is running normally.
+ * The service is actively performing disk balancing operations.
+ */
+ RUNNING,
+
+ /**
+ * DiskBalancer was running but is temporarily paused due to node state
changes
+ * (e.g., node entering maintenance or decommissioning).
+ * When the node returns to IN_SERVICE, it can resume to RUNNING state.
+ */
+ PAUSED_BY_NODE_STATE
+ }
+
public DiskBalancerService(OzoneContainer ozoneContainer,
long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
int workerSize, ConfigurationSource conf) throws IOException {
@@ -153,7 +180,7 @@ public DiskBalancerService(OzoneContainer ozoneContainer,
* @param diskBalancerInfo
* @throws IOException
*/
- public void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException {
+ public synchronized void refresh(DiskBalancerInfo diskBalancerInfo) throws
IOException {
applyDiskBalancerInfo(diskBalancerInfo);
}
@@ -203,7 +230,8 @@ private void applyDiskBalancerInfo(DiskBalancerInfo
diskBalancerInfo)
// First store in local file, then update in memory variables
writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
- setShouldRun(diskBalancerInfo.isShouldRun());
+ updateOperationalStateFromInfo(diskBalancerInfo);
+
setThreshold(diskBalancerInfo.getThreshold());
setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
setParallelThread(diskBalancerInfo.getParallelThread());
@@ -218,6 +246,24 @@ private void applyDiskBalancerInfo(DiskBalancerInfo
diskBalancerInfo)
}
}
+ /**
+ * Determines the new operational state based on the provided
DiskBalancerInfo
+ * and updates the service's operationalState if it has changed.
+ *
+ * @param diskBalancerInfo The DiskBalancerInfo containing shouldRun and
paused flags.
+ */
+ private void updateOperationalStateFromInfo(DiskBalancerInfo
diskBalancerInfo) {
+ DiskBalancerOperationalState newOperationalState =
diskBalancerInfo.getOperationalState();
+
+ if (this.operationalState != newOperationalState) {
+ LOG.info("DiskBalancer operational state changing from {} to {} " +
+ "based on DiskBalancerInfo (derived: shouldRun={}, paused={}).",
+ this.operationalState, newOperationalState,
+ diskBalancerInfo.isShouldRun(), diskBalancerInfo.isPaused());
+ this.operationalState = newOperationalState;
+ }
+ }
+
private String getDiskBalancerInfoPath() {
String diskBalancerInfoDir =
conf.getObject(DiskBalancerConfiguration.class)
@@ -281,10 +327,6 @@ private synchronized void writeDiskBalancerInfoTo(
DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
}
- public void setShouldRun(boolean shouldRun) {
- this.shouldRun = shouldRun;
- }
-
public void setThreshold(double threshold) {
this.threshold = threshold;
}
@@ -308,7 +350,7 @@ public void setVersion(DiskBalancerVersion version) {
public DiskBalancerReportProto getDiskBalancerReportProto() {
DiskBalancerReportProto.Builder builder =
DiskBalancerReportProto.newBuilder();
- return builder.setIsRunning(shouldRun)
+ return builder.setIsRunning(this.operationalState ==
DiskBalancerOperationalState.RUNNING)
.setBalancedBytes(totalBalancedBytes.get())
.setDiskBalancerConf(
HddsProtos.DiskBalancerConfigurationProto.newBuilder()
@@ -324,7 +366,8 @@ public DiskBalancerReportProto getDiskBalancerReportProto()
{
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
- if (!shouldRun) {
+ if (this.operationalState == DiskBalancerOperationalState.STOPPED ||
+ this.operationalState ==
DiskBalancerOperationalState.PAUSED_BY_NODE_STATE) {
return queue;
}
metrics.incrRunningLoopCount();
@@ -365,7 +408,7 @@ public BackgroundTaskQueue getTasks() {
if (stopAfterDiskEven) {
LOG.info("Disk balancer is stopped due to disk even as" +
" the property StopAfterDiskEven is set to true.");
- setShouldRun(false);
+ this.operationalState = DiskBalancerOperationalState.STOPPED;
try {
// Persist the updated shouldRun status into the YAML file
writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile);
@@ -535,7 +578,7 @@ private void postCall() {
}
public DiskBalancerInfo getDiskBalancerInfo() {
- return new DiskBalancerInfo(shouldRun, threshold, bandwidthInMB,
+ return new DiskBalancerInfo(operationalState, threshold, bandwidthInMB,
parallelThread, stopAfterDiskEven, version, metrics.getSuccessCount(),
metrics.getFailureCount(), bytesToMove, metrics.getSuccessBytes());
}
@@ -600,6 +643,41 @@ public VolumeChoosingPolicy getVolumeChoosingPolicy() {
return volumeChoosingPolicy;
}
+ /**
+ * Handle state changes for DiskBalancerService.
+ */
+ public synchronized void nodeStateUpdated(HddsProtos.NodeOperationalState
state) {
+ DiskBalancerOperationalState originalServiceState = this.operationalState;
+ boolean stateChanged = false;
+
+ if ((state == HddsProtos.NodeOperationalState.DECOMMISSIONING ||
+ state == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE) &&
+ this.operationalState == DiskBalancerOperationalState.RUNNING) {
+ LOG.info("Stopping DiskBalancerService as Node state changed to {}.",
state);
+ this.operationalState =
DiskBalancerOperationalState.PAUSED_BY_NODE_STATE;
+ stateChanged = true;
+ } else if (state == HddsProtos.NodeOperationalState.IN_SERVICE &&
+ this.operationalState ==
DiskBalancerOperationalState.PAUSED_BY_NODE_STATE) {
+ LOG.info("Resuming DiskBalancerService to running state as Node state
changed to {}. ", state);
+ this.operationalState = DiskBalancerOperationalState.RUNNING;
+ stateChanged = true;
+ }
+
+ if (stateChanged) {
+ LOG.info("DiskBalancer operational state changed from {} to {} due to
Datanode state update . Persisting.",
+ originalServiceState, this.operationalState);
+ try {
+ writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile);
+ } catch (IOException e) {
+ LOG.error("Failed to persist DiskBalancerInfo after state change in
nodeStateUpdated. " +
+ "Reverting operational state to {} to maintain consistency.",
originalServiceState, e);
+ // Revert state on persistence error to keep in-memory state
consistent with last known persisted state.
+ this.operationalState = originalServiceState;
+ LOG.warn("DiskBalancer operational state reverted to {} due to
persistence failure.", this.operationalState);
+ }
+ }
+ }
+
@Override
public void shutdown() {
super.shutdown();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
index d33e75cded7..c7c0cfb56a1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java
@@ -22,6 +22,7 @@
import java.io.InputStream;
import java.nio.file.Files;
import org.apache.hadoop.hdds.server.YamlUtils;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.DumperOptions;
@@ -75,7 +76,7 @@ public static DiskBalancerInfo readDiskBalancerInfoFile(File
path)
}
diskBalancerInfo = new DiskBalancerInfo(
- diskBalancerInfoYaml.isShouldRun(),
+ diskBalancerInfoYaml.operationalState,
diskBalancerInfoYaml.getThreshold(),
diskBalancerInfoYaml.getBandwidthInMB(),
diskBalancerInfoYaml.getParallelThread(),
@@ -91,7 +92,7 @@ public static DiskBalancerInfo readDiskBalancerInfoFile(File
path)
* Datanode DiskBalancer Info to be written to the yaml file.
*/
public static class DiskBalancerInfoYaml {
- private boolean shouldRun;
+ private DiskBalancerOperationalState operationalState;
private double threshold;
private long bandwidthInMB;
private int parallelThread;
@@ -103,9 +104,9 @@ public DiskBalancerInfoYaml() {
// Needed for snake-yaml introspection.
}
- private DiskBalancerInfoYaml(boolean shouldRun, double threshold,
+ private DiskBalancerInfoYaml(DiskBalancerOperationalState
operationalState, double threshold,
long bandwidthInMB, int parallelThread, boolean stopAfterDiskEven, int
version) {
- this.shouldRun = shouldRun;
+ this.operationalState = operationalState;
this.threshold = threshold;
this.bandwidthInMB = bandwidthInMB;
this.parallelThread = parallelThread;
@@ -113,12 +114,12 @@ private DiskBalancerInfoYaml(boolean shouldRun, double
threshold,
this.version = version;
}
- public boolean isShouldRun() {
- return shouldRun;
+ public DiskBalancerOperationalState getOperationalState() {
+ return operationalState;
}
- public void setShouldRun(boolean shouldRun) {
- this.shouldRun = shouldRun;
+ public void setOperationalState(DiskBalancerOperationalState
operationalState) {
+ this.operationalState = operationalState;
}
public void setThreshold(double threshold) {
@@ -166,7 +167,7 @@ private static DiskBalancerInfoYaml getDiskBalancerInfoYaml(
DiskBalancerInfo diskBalancerInfo) {
return new DiskBalancerInfoYaml(
- diskBalancerInfo.isShouldRun(),
+ diskBalancerInfo.getOperationalState(),
diskBalancerInfo.getThreshold(),
diskBalancerInfo.getBandwidthInMB(),
diskBalancerInfo.getParallelThread(),
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 47cd07a9506..4022914dd30 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -57,6 +57,7 @@
import
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
@@ -79,7 +80,8 @@ public class TestHeartbeatEndpointTask {
public void setup() {
datanodeStateMachine = mock(DatanodeStateMachine.class);
container = mock(OzoneContainer.class);
- when(container.getDiskBalancerInfo()).thenReturn(new
DiskBalancerInfo(true, 10, 20, 30, true));
+ when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(
+ DiskBalancerOperationalState.RUNNING, 10, 20, 30, true));
when(datanodeStateMachine.getContainer()).thenReturn(container);
PipelineReportsProto pipelineReportsProto =
mock(PipelineReportsProto.class);
when(pipelineReportsProto.getPipelineReportList()).thenReturn(Collections.emptyList());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
index eb591215d61..ce415114c9a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerService.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.ContainerTestVersionInfo;
@@ -116,12 +117,9 @@ public void testUpdateService(ContainerTestVersionInfo
versionInfo) throws Excep
getDiskBalancerService(containerSet, conf, keyValueHandler, null, 1);
// Set a low bandwidth to delay job
- svc.setShouldRun(true);
- svc.setThreshold(10.0d);
- svc.setBandwidthInMB(1L);
- svc.setParallelThread(5);
- svc.setStopAfterDiskEven(true);
- svc.setVersion(DiskBalancerVersion.DEFAULT_VERSION);
+ DiskBalancerInfo initialInfo = new
DiskBalancerInfo(DiskBalancerOperationalState.RUNNING, 10.0d, 1L, 5,
+ true, DiskBalancerVersion.DEFAULT_VERSION);
+ svc.refresh(initialInfo);
svc.start();
@@ -131,7 +129,7 @@ public void testUpdateService(ContainerTestVersionInfo
versionInfo) throws Excep
assertEquals(5, svc.getDiskBalancerInfo().getParallelThread());
assertTrue(svc.getDiskBalancerInfo().isStopAfterDiskEven());
- DiskBalancerInfo newInfo = new DiskBalancerInfo(false, 20.0d, 5L, 10,
false);
+ DiskBalancerInfo newInfo = new
DiskBalancerInfo(DiskBalancerOperationalState.STOPPED, 20.0d, 5L, 10, false);
svc.refresh(newInfo);
assertFalse(svc.getDiskBalancerInfo().isShouldRun());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
index 6cdd087097b..813dd5976e8 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerYaml.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -35,7 +36,6 @@ public class TestDiskBalancerYaml {
@Test
public void testCreateYaml() throws IOException {
- boolean shouldRun = true;
double threshold = 10;
long bandwidthInMB = 100;
int parallelThread = 5;
@@ -45,7 +45,7 @@ public void testCreateYaml() throws IOException {
File file = new File(tmpDir.toString(),
OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT);
- DiskBalancerInfo info = new DiskBalancerInfo(shouldRun, threshold,
+ DiskBalancerInfo info = new
DiskBalancerInfo(DiskBalancerOperationalState.RUNNING, threshold,
bandwidthInMB, parallelThread, stopAfterDiskEven, version);
DiskBalancerYaml.createDiskBalancerInfoFile(info, file);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
new file mode 100644
index 00000000000..3f684cecbd7
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancerDuringDecommissionAndMaintenance.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.node.TestNodeUtil.getDNHostAndPort;
+import static
org.apache.hadoop.hdds.scm.node.TestNodeUtil.waitForDnToReachOpState;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService;
+import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * This class tests disk balancer operations during
+ * decommission and maintenance of DNs.
+ */
+@Timeout(300)
+public class TestDiskBalancerDuringDecommissionAndMaintenance {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static DiskBalancerManager diskBalancerManager;
+ private static ScmClient scmClient;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+ conf.setTimeDuration("hdds.datanode.disk.balancer.service.interval", 2,
TimeUnit.SECONDS);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(5)
+ .build();
+ cluster.waitForClusterToBeReady();
+
+ diskBalancerManager =
cluster.getStorageContainerManager().getDiskBalancerManager();
+ scmClient = new ContainerOperationClient(conf);
+
+ for (DatanodeDetails dn : cluster.getStorageContainerManager()
+ .getScmNodeManager().getAllNodes()) {
+ ((DatanodeInfo) dn).updateStorageReports(
+ HddsTestUtils.getRandomNodeReport(20, 1).getStorageReportList());
+ }
+ }
+
+ @AfterAll
+ public static void cleanup() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @AfterEach
+ public void stopDiskBalancer() throws IOException, InterruptedException,
TimeoutException {
+ // Stop disk balancer after each test
+ diskBalancerManager.stopDiskBalancer(Optional.empty());
+ // Verify that all DNs have stopped DiskBalancerService
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ GenericTestUtils.waitFor(() -> {
+ return
!dn.getDatanodeStateMachine().getContainer().getDiskBalancerInfo().isShouldRun();
+ }, 100, 5000);
+ }
+ }
+
+ @Test
+ public void testDiskBalancerWithDecommissionAndMaintenanceNodes()
+ throws IOException, InterruptedException, TimeoutException {
+ LogCapturer dnStateChangeLog = LogCapturer.captureLogs(
+ DiskBalancerService.class);
+
+ List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
+ DatanodeDetails dnToDecommission = dns.get(0).getDatanodeDetails();
+ DatanodeDetails dnToMaintenance = dns.get(1).getDatanodeDetails();
+
+ // Start disk balancer on all DNs
+ diskBalancerManager.startDiskBalancer(
+ Optional.of(10.0),
+ Optional.of(10L),
+ Optional.of(5),
+ Optional.of(false),
+ Optional.empty());
+
+ NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+
+ // Decommission DN1
+ scmClient.decommissionNodes(
+ Arrays.asList(getDNHostAndPort(dnToDecommission)), false);
+ waitForDnToReachOpState(nm, dnToDecommission, DECOMMISSIONING);
+
+ // Start maintenance on DN2
+ scmClient.startMaintenanceNodes(
+ Arrays.asList(getDNHostAndPort(dnToMaintenance)), 0, false);
+ waitForDnToReachOpState(nm, dnToMaintenance, ENTERING_MAINTENANCE);
+
+ //get diskBalancer report
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+ diskBalancerManager.getDiskBalancerReport(5,
+ ClientVersion.CURRENT_VERSION);
+
+ //get diskBalancer status
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
+ diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
+ Optional.empty(),
+ ClientVersion.CURRENT_VERSION);
+
+ // Verify that decommissioning and maintenance DN is not
+ // included in DiskBalancer report and status
+ boolean isDecommissionedDnInReport = reportProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(dnToDecommission.getUuid().toString()));
+ boolean isMaintenanceDnInReport = reportProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(dnToMaintenance.getUuid().toString()));
+ boolean isDecommissionedDnInStatus = statusProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(dnToDecommission.getUuid().toString()));
+ boolean isMaintenanceDnInStatus = statusProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(dnToMaintenance.getUuid().toString()));
+
+ // Assert that the decommissioned DN is not present in both report and
status
+ assertFalse(isDecommissionedDnInReport);
+ assertFalse(isMaintenanceDnInReport);
+ assertFalse(isDecommissionedDnInStatus);
+ assertFalse(isMaintenanceDnInStatus);
+
+ // verify using logs that DiskBalancerService is stopped
+ // on DN with state is DECOMMISSIONING or ENTERING_MAINTENANCE
+ GenericTestUtils.waitFor(() -> {
+ String dnLogs = dnStateChangeLog.getOutput();
+ return
+ dnLogs.contains("Stopping DiskBalancerService as Node state changed
to DECOMMISSIONING.")
+ && dnLogs.contains("Stopping DiskBalancerService as Node state
changed to ENTERING_MAINTENANCE.");
+ }, 100, 5000);
+
+ // Recommission DN1
+ scmClient.recommissionNodes(
+ Arrays.asList(getDNHostAndPort(dnToDecommission)));
+ waitForDnToReachOpState(nm, dnToDecommission, IN_SERVICE);
+
+ DatanodeDetails recommissionedDn = dnToDecommission;
+
+ // Verify that recommissioned DN is included in DiskBalancer report and
status
+ reportProtoList = diskBalancerManager.getDiskBalancerReport(5,
+ ClientVersion.CURRENT_VERSION);
+ statusProtoList =
diskBalancerManager.getDiskBalancerStatus(Optional.empty(),
+ Optional.empty(),
+ ClientVersion.CURRENT_VERSION);
+
+ boolean isRecommissionedDnInReport = reportProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(recommissionedDn.getUuid().toString()));
+ boolean isRecommissionedDnInStatus = statusProtoList.stream()
+ .anyMatch(proto -> proto.getNode().getUuid().
+ equals(recommissionedDn.getUuid().toString()));
+
+ // Verify that the recommissioned DN is included in both report and status
+ assertTrue(isRecommissionedDnInReport);
+ assertTrue(isRecommissionedDnInStatus);
+
+ //Verify using logs when DN is recommissioned
+ //if the DN was previously in stopped state it will not be resumed
+ //otherwise it will be resumed
+ GenericTestUtils.waitFor(() -> {
+ String dnLogs = dnStateChangeLog.getOutput();
+ return dnLogs.contains("Resuming DiskBalancerService to running state as
Node state changed to IN_SERVICE.");
+ }, 100, 5000);
+ }
+
+ @Test
+ public void testStopDiskBalancerOnDecommissioningNode() throws Exception {
+ LogCapturer serviceLog =
LogCapturer.captureLogs(DiskBalancerService.class);
+ List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
+ DatanodeDetails dn = dns.get(3).getDatanodeDetails();
+ List<String> dnAddressList =
Collections.singletonList(getDNHostAndPort(dn));
+ NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+
+
+ // Start disk balancer on this specific DN
+ diskBalancerManager.startDiskBalancer(
+ Optional.of(10.0),
+ Optional.of(10L),
+ Optional.of(1),
+ Optional.of(false),
+ Optional.of(dnAddressList));
+
+ // Verify diskBalancer is running
+ GenericTestUtils.waitFor(() -> {
+ try {
+ HddsProtos.DatanodeDiskBalancerInfoProto status =
+
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
+ Optional.empty(),
+
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
+ return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.RUNNING;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 5000);
+
+ // Decommission the DN
+ scmClient.decommissionNodes(dnAddressList, false);
+ waitForDnToReachOpState(nm, dn, DECOMMISSIONING);
+
+ // Verify DiskBalancerService on DN automatically paused
+ final String expectedLogForPause =
+ "Stopping DiskBalancerService as Node state changed to
DECOMMISSIONING.";
+ GenericTestUtils.waitFor(() ->
serviceLog.getOutput().contains(expectedLogForPause),
+ 100, 5000);
+
+ // Attempt to stop disk balancer on the decommissioning DN
+ diskBalancerManager.stopDiskBalancer(Optional.of(dnAddressList));
+
+ // Verify disk balancer is now explicitly stopped (operationalState
becomes STOPPED)
+ final String expectedLogForStop =
+ "DiskBalancer operational state changing from PAUSED_BY_NODE_STATE to
STOPPED";
+ GenericTestUtils.waitFor(() ->
serviceLog.getOutput().contains(expectedLogForStop),
+ 100, 5000);
+
+ //Recommission the node
+ scmClient.recommissionNodes(dnAddressList);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
+
+ // Verify it does not automatically restart (since it was explicitly
stopped)
+ HddsProtos.DatanodeDiskBalancerInfoProto statusAfterRecommission =
+ diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
+ Optional.empty(),
+ ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
+ assertEquals(HddsProtos.DiskBalancerRunningStatus.STOPPED,
statusAfterRecommission.getRunningStatus());
+ }
+
+ @Test
+ public void testStartDiskBalancerOnDecommissioningNode() throws Exception {
+ LogCapturer serviceLog =
LogCapturer.captureLogs(DiskBalancerService.class);
+ LogCapturer supervisorLog =
LogCapturer.captureLogs(ReplicationSupervisor.class);
+
+ List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
+ DatanodeDetails dn = dns.get(4).getDatanodeDetails();
+ List<String> dnAddressList =
Collections.singletonList(getDNHostAndPort(dn));
+ NodeManager nm = cluster.getStorageContainerManager().getScmNodeManager();
+
+ // Verify diskBalancer is stopped
+ GenericTestUtils.waitFor(() -> {
+ try {
+ HddsProtos.DatanodeDiskBalancerInfoProto status =
+
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
+ Optional.empty(),
+
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
+ return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.STOPPED;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 5000);
+
+ // Decommission the DN
+ scmClient.decommissionNodes(dnAddressList, false);
+ waitForDnToReachOpState(nm, dn, DECOMMISSIONING);
+
+ final String nodeStateChangeLogs =
+ "Node state updated to DECOMMISSIONING, scaling executor pool size to
20";
+ GenericTestUtils.waitFor(() ->
supervisorLog.getOutput().contains(nodeStateChangeLogs),
+ 100, 5000);
+
+ // Attempt to start disk balancer on the decommissioning DN
+ diskBalancerManager.startDiskBalancer(
+ Optional.of(10.0),
+ Optional.of(10L),
+ Optional.of(1),
+ Optional.of(false),
+ Optional.of(dnAddressList));
+
+ // Verify disk balancer goes to PAUSED_BY_NODE_STATE
+ final String expectedLogForPause =
+ "DiskBalancer operational state changing from STOPPED to
PAUSED_BY_NODE_STATE";
+ GenericTestUtils.waitFor(() ->
serviceLog.getOutput().contains(expectedLogForPause),
+ 100, 5000);
+
+ //Recommission the node
+ scmClient.recommissionNodes(dnAddressList);
+ waitForDnToReachOpState(nm, dn, IN_SERVICE);
+
+ // Verify it automatically restart (since it was explicitly started)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ HddsProtos.DatanodeDiskBalancerInfoProto status =
+
diskBalancerManager.getDiskBalancerStatus(Optional.of(dnAddressList),
+ Optional.empty(),
+
ClientVersion.CURRENT_VERSION).stream().findFirst().orElse(null);
+ return status != null && status.getRunningStatus() ==
HddsProtos.DiskBalancerRunningStatus.RUNNING;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 5000);
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]