This is an automated email from the ASF dual-hosted git repository.
ferhui pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 8e31b09c63 HDDS-7106. [DiskBalancer] Client-SCM interface (#3663)
8e31b09c63 is described below
commit 8e31b09c634d79a042126873fc2d7b3010900f4e
Author: Symious <[email protected]>
AuthorDate: Tue Aug 16 10:40:22 2022 +0800
HDDS-7106. [DiskBalancer] Client-SCM interface (#3663)
---
.../apache/hadoop/hdds/scm/client/ScmClient.java | 41 +++++
.../protocol/StorageContainerLocationProtocol.java | 30 ++++
.../scm/storage/DiskBalancerConfiguration.java | 165 +++++++++++++++++++
.../org/apache/hadoop/hdds/conf/ConfigTag.java | 3 +-
...inerLocationProtocolClientSideTranslatorPB.java | 56 +++++++
.../src/main/proto/ScmAdminProtocol.proto | 21 +++
.../interface-client/src/main/proto/hdds.proto | 13 ++
.../hadoop/hdds/scm/node/DiskBalancerManager.java | 176 +++++++++++++++++++++
.../hadoop/hdds/scm/node/DiskBalancerStatus.java | 50 ++++++
.../hdds/scm/node/NodeDecommissionManager.java | 123 +-------------
.../org/apache/hadoop/hdds/scm/node/NodeUtils.java | 150 ++++++++++++++++++
...inerLocationProtocolServerSideTranslatorPB.java | 31 ++++
.../hdds/scm/server/SCMClientProtocolServer.java | 51 ++++++
.../hdds/scm/server/StorageContainerManager.java | 8 +
.../hdds/scm/node/TestDiskBalancerManager.java | 99 ++++++++++++
.../hdds/scm/node/TestNodeDecommissionManager.java | 13 +-
.../hdds/scm/cli/ContainerOperationClient.java | 36 +++++
.../hadoop/ozone/scm/node/TestDiskBalancer.java | 96 +++++++++++
18 files changed, 1037 insertions(+), 125 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index dd5690359e..72330a48a4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -391,4 +391,45 @@ public interface ScmClient extends Closeable {
StatusAndMessages queryUpgradeFinalizationProgress(
String upgradeClientID, boolean force, boolean readonly)
throws IOException;
+
+ /**
+ * Get DiskBalancer status.
+ * @param count top datanodes that need balancing
+ * @return List of DatanodeDiskBalancerInfo.
+ * @throws IOException
+ */
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count) throws IOException;
+
+ /**
+ * Get DiskBalancer status.
+ * @param hosts If hosts is not null, return status of hosts; If hosts is
+ * null, return status of all datanodes in balancing.
+ * @return List of DatanodeDiskBalancerInfo.
+ * @throws IOException
+ */
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts) throws IOException;
+
+ /**
+ * Start DiskBalancer.
+ */
+ void startDiskBalancer(
+ Optional<Double> threshold,
+ Optional<Double> bandwidth,
+ Optional<List<String>> hosts) throws IOException;
+
+ /**
+ * Stop DiskBalancer.
+ */
+ void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+
+ /**
+ * Update DiskBalancer Configuration.
+ */
+ void updateDiskBalancerConfiguration(
+ Optional<Double> threshold,
+ Optional<Double> bandwidth,
+ Optional<List<String>> hosts) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index d915a20c91..038ab3227f 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -417,4 +417,34 @@ public interface StorageContainerLocationProtocol extends
Closeable {
Token<?> getContainerToken(ContainerID containerID) throws IOException;
long getContainerCount() throws IOException;
+
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count, int clientVersion) throws IOException;
+
+ /**
+ * Get DiskBalancer status.
+ */
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts, int clientVersion) throws IOException;
+
+ /**
+ * Start DiskBalancer.
+ */
+ void startDiskBalancer(
+ Optional<Double> threshold,
+ Optional<Double> bandwidth,
+ Optional<List<String>> hosts) throws IOException;
+
+ /**
+ * Stop DiskBalancer.
+ */
+ void stopDiskBalancer(Optional<List<String>> hosts) throws IOException;
+
+ /**
+ * Update DiskBalancer Configuration.
+ */
+ void updateDiskBalancerConfiguration(
+ Optional<Double> threshold,
+ Optional<Double> bandwidth,
+ Optional<List<String>> hosts) throws IOException;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
new file mode 100644
index 0000000000..704b383679
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+ @Config(key = "volume.density.threshold", type = ConfigType.AUTO,
+ defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+ description = "Threshold is a percentage in the range of 0 to 100. A " +
+ "datanode is considered balanced if for each volume, the " +
+ "utilization of the volume(used space to capacity ratio) differs" +
+ " from the utilization of the datanode(used space to capacity ratio"
+
+ " of the entire datanode) no more than the threshold.")
+ private double threshold = 10d;
+
+ @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.AUTO,
+ defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+ description = "The max balance speed.")
+ private double diskBandwidth = 10;
+
+ @Config(key = "parallel.thread", type = ConfigType.AUTO,
+ defaultValue = "5", tags = {ConfigTag.DISKBALANCER},
+ description = "The max parallel balance thread count.")
+ private int parallelThread = 5;
+
+ /**
+ * Gets the threshold value for DiskBalancer.
+ *
+ * @return percentage value in the range 0 to 100
+ */
+ public double getThreshold() {
+ return threshold;
+ }
+
+ public double getThresholdAsRatio() {
+ return threshold / 100;
+ }
+
+ /**
+ * Sets the threshold value for Disk Balancer.
+ *
+ * @param threshold a percentage value in the range 0 to 100
+ */
+ public void setThreshold(double threshold) {
+ if (threshold < 0d || threshold >= 100d) {
+ throw new IllegalArgumentException(
+ "Threshold must be a percentage(double) in the range 0 to 100.");
+ }
+ this.threshold = threshold;
+ }
+
+ /**
+ * Gets the disk bandwidth value for Disk Balancer.
+ *
+ * @return max disk bandwidth per second
+ */
+ public double getDiskBandwidth() {
+ return diskBandwidth;
+ }
+
+ /**
+ * Sets the disk bandwidth value for Disk Balancer.
+ *
+ * @param diskBandwidth the bandwidth to control balance speed
+ */
+ public void setDiskBandwidth(double diskBandwidth) {
+ if (diskBandwidth <= 0d) {
+ throw new IllegalArgumentException(
+ "diskBandwidth must be a value larger than 0.");
+ }
+ this.diskBandwidth = diskBandwidth;
+ }
+
+ /**
+ * Gets the parallel thread for Disk Balancer.
+ *
+ * @return parallel thread
+ */
+ public int getParallelThread() {
+ return parallelThread;
+ }
+
+ /**
+ * Sets the parallel thread for Disk Balancer.
+ *
+ * @param parallelThread the parallel thread count
+ */
+ public void setParallelThread(int parallelThread) {
+ if (parallelThread <= 0) {
+ throw new IllegalArgumentException(
+ "parallelThread must be a value larger than 0.");
+ }
+ this.parallelThread = parallelThread;
+ }
+ @Override
+ public String toString() {
+ return String.format("Disk Balancer Configuration values:%n" +
+ "%-50s %s%n" +
+ "%-50s %s%n" +
+ "%-50s %s%n" +
+ "%-50s %s%n",
+ "Key", "Value",
+ "Threshold", threshold, "Max disk bandwidth", diskBandwidth,
+ "Parallel Thread", parallelThread);
+ }
+
+ public HddsProtos.DiskBalancerConfigurationProto.Builder toProtobufBuilder()
{
+ HddsProtos.DiskBalancerConfigurationProto.Builder builder =
+ HddsProtos.DiskBalancerConfigurationProto.newBuilder();
+
+ builder.setThreshold(threshold)
+ .setDiskBandwidth(diskBandwidth)
+ .setParallelThread(parallelThread);
+ return builder;
+ }
+
+ static DiskBalancerConfiguration fromProtobuf(
+ @NotNull HddsProtos.DiskBalancerConfigurationProto proto,
+ @NotNull OzoneConfiguration ozoneConfiguration) {
+ DiskBalancerConfiguration config =
+ ozoneConfiguration.getObject(DiskBalancerConfiguration.class);
+ if (proto.hasThreshold()) {
+ config.setThreshold(proto.getThreshold());
+ }
+ if (proto.hasDiskBandwidth()) {
+ config.setDiskBandwidth(proto.getDiskBandwidth());
+ }
+ if (proto.hasParallelThread()) {
+ config.setParallelThread(proto.getParallelThread());
+ }
+ return config;
+ }
+}
diff --git
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 8cf584d75f..9be8bdc679 100644
---
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -46,5 +46,6 @@ public enum ConfigTag {
DELETION,
HA,
BALANCER,
- UPGRADE
+ UPGRADE,
+ DISKBALANCER
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 568985e137..9648eb13e3 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -42,6 +42,9 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoType;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesResponseProto;
@@ -1005,6 +1008,59 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
return response.getContainerCount();
}
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count, int clientVersion) throws IOException {
+ DatanodeDiskBalancerInfoRequestProto request =
+ DatanodeDiskBalancerInfoRequestProto.newBuilder()
+ .setInfoType(DatanodeDiskBalancerInfoType.report)
+ .setCount(count)
+ .build();
+
+ DatanodeDiskBalancerInfoResponseProto response =
+ submitRequest(Type.DatanodeDiskBalancerInfo,
+ builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+ .getDatanodeDiskBalancerInfoResponse();
+
+ return response.getInfoList();
+ }
+
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts, int clientVersion) throws IOException {
+ DatanodeDiskBalancerInfoRequestProto.Builder requestBuilder =
+ DatanodeDiskBalancerInfoRequestProto.newBuilder()
+ .setInfoType(DatanodeDiskBalancerInfoType.status);
+ hosts.ifPresent(requestBuilder::addAllHosts);
+ DatanodeDiskBalancerInfoRequestProto request = requestBuilder.build();
+
+ DatanodeDiskBalancerInfoResponseProto response =
+ submitRequest(Type.DatanodeDiskBalancerInfo,
+ builder -> builder.setDatanodeDiskBalancerInfoRequest(request))
+ .getDatanodeDiskBalancerInfoResponse();
+
+ return response.getInfoList();
+ }
+
+ @Override
+ public void startDiskBalancer(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+
+ }
+
+ @Override
+ public void stopDiskBalancer(Optional<List<String>> hosts)
+ throws IOException {
+
+ }
+
+ @Override
+ public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+ }
+
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index 882f6c66a9..130397e2ee 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -77,6 +77,7 @@ message ScmContainerLocationRequest {
optional GetContainerCountRequestProto getContainerCountRequest = 38;
optional GetContainerReplicasRequestProto getContainerReplicasRequest = 39;
optional ReplicationManagerReportRequestProto
replicationManagerReportRequest = 40;
+ optional DatanodeDiskBalancerInfoRequestProto
DatanodeDiskBalancerInfoRequest = 41;
}
message ScmContainerLocationResponse {
@@ -125,6 +126,7 @@ message ScmContainerLocationResponse {
optional GetContainerCountResponseProto getContainerCountResponse = 38;
optional GetContainerReplicasResponseProto getContainerReplicasResponse = 39;
optional ReplicationManagerReportResponseProto
getReplicationManagerReportResponse = 40;
+ optional DatanodeDiskBalancerInfoResponseProto
DatanodeDiskBalancerInfoResponse = 41;
enum Status {
OK = 1;
@@ -171,6 +173,7 @@ enum Type {
GetContainerCount = 33;
GetContainerReplicas = 34;
GetReplicationManagerReport = 35;
+ DatanodeDiskBalancerInfo= 36;
}
/**
@@ -322,6 +325,24 @@ message DatanodeUsageInfoResponseProto {
repeated DatanodeUsageInfoProto info = 1;
}
+enum DatanodeDiskBalancerInfoType{
+ report = 1;
+ status = 2;
+}
+
+/*
+ Datanode disk balancer status request message.
+*/
+message DatanodeDiskBalancerInfoRequestProto {
+ required DatanodeDiskBalancerInfoType infoType = 1;
+ optional uint32 count = 2;
+ repeated string hosts = 3;
+}
+
+message DatanodeDiskBalancerInfoResponseProto {
+ repeated DatanodeDiskBalancerInfoProto info = 1;
+}
+
/*
Decommission a list of hosts
*/
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 555431b199..a641f88bbe 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -451,3 +451,16 @@ message ContainerBalancerConfigurationProto {
required bool shouldRun = 18;
optional int32 nextIterationIndex = 19;
}
+
+message DiskBalancerConfigurationProto {
+ optional double threshold = 1;
+ optional double diskBandwidth = 2;
+ optional int32 parallelThread = 3;
+}
+
+message DatanodeDiskBalancerInfoProto {
+ required DatanodeDetailsProto node = 1;
+ required double currentVolumeDensitySum = 2;
+ optional bool diskBalancerRunning = 3;
+ optional DiskBalancerConfigurationProto diskBalancerConf = 4;
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
new file mode 100644
index 0000000000..5ca66e9eef
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerManager.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+
+/**
+ * Maintains information about the DiskBalancer on SCM side.
+ */
+public class DiskBalancerManager {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DiskBalancerManager.class);
+
+ private final EventPublisher scmNodeEventPublisher;
+ private final SCMContext scmContext;
+ private final NodeManager nodeManager;
+ private Map<DatanodeDetails, DiskBalancerStatus> statusMap;
+ private boolean useHostnames;
+
+ /**
+ * Constructs DiskBalancer Manager.
+ */
+ public DiskBalancerManager(OzoneConfiguration conf,
+ EventPublisher eventPublisher,
+ SCMContext scmContext,
+ NodeManager nodeManager) {
+ this.scmNodeEventPublisher = eventPublisher;
+ this.scmContext = scmContext;
+ this.nodeManager = nodeManager;
+ this.useHostnames = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ this.statusMap = new ConcurrentHashMap<>();
+ }
+
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count, int clientVersion) throws IOException {
+
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> reportList =
+ new ArrayList<>();
+
+ for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+ HddsProtos.NodeState.HEALTHY)) {
+ double volumeDensitySum =
+ getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+ reportList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+ .setCurrentVolumeDensitySum(volumeDensitySum)
+ .setNode(datanodeDetails.toProto(clientVersion))
+ .build());
+ }
+
+ reportList.sort((t1, t2) -> Double.compare(t2.getCurrentVolumeDensitySum(),
+ t1.getCurrentVolumeDensitySum()));
+ return reportList.stream().limit(count).collect(Collectors.toList());
+ }
+
+ /**
+ * If hosts is not null, return status of hosts;
+ * If hosts is null, return status of all datanodes in balancing.
+ */
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts, int clientVersion) throws IOException {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> statusList =
+ new ArrayList<>();
+ List<DatanodeDetails> filterDns = null;
+ if (hosts.isPresent() && !hosts.get().isEmpty()) {
+ filterDns = NodeUtils.mapHostnamesToDatanodes(nodeManager, hosts.get(),
+ useHostnames);
+ }
+
+ for (DatanodeDetails datanodeDetails: nodeManager.getNodes(IN_SERVICE,
+ HddsProtos.NodeState.HEALTHY)) {
+ if (shouldReturnDatanode(filterDns, datanodeDetails)) {
+ double volumeDensitySum =
+ getVolumeDataDensitySumForDatanodeDetails(datanodeDetails);
+ statusList.add(HddsProtos.DatanodeDiskBalancerInfoProto.newBuilder()
+ .setCurrentVolumeDensitySum(volumeDensitySum)
+ .setDiskBalancerRunning(isRunning(datanodeDetails))
+ .setDiskBalancerConf(statusMap.getOrDefault(datanodeDetails,
+ DiskBalancerStatus.DUMMY_STATUS)
+ .getDiskBalancerConfiguration().toProtobufBuilder())
+ .setNode(datanodeDetails.toProto(clientVersion))
+ .build());
+ }
+ }
+ return statusList;
+ }
+
+ private boolean shouldReturnDatanode(List<DatanodeDetails> hosts,
+ DatanodeDetails datanodeDetails) {
+ if (hosts == null || hosts.isEmpty()) {
+ return isRunning(datanodeDetails);
+ } else {
+ return hosts.contains(datanodeDetails);
+ }
+ }
+
+ /**
+ * Get volume density for a specific DatanodeDetails node.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @return DiskBalancer report.
+ */
+ private double getVolumeDataDensitySumForDatanodeDetails(
+ DatanodeDetails datanodeDetails) {
+ Preconditions.checkArgument(datanodeDetails instanceof DatanodeInfo);
+
+ DatanodeInfo datanodeInfo = (DatanodeInfo) datanodeDetails;
+
+ double totalCapacity = 0d, totalUsed = 0d;
+ for (StorageContainerDatanodeProtocolProtos.StorageReportProto reportProto
:
+ datanodeInfo.getStorageReports()) {
+ totalCapacity += reportProto.getCapacity();
+ totalUsed += reportProto.getScmUsed();
+ }
+
+ Preconditions.checkArgument(totalCapacity != 0);
+ double idealUsage = totalUsed / totalCapacity;
+
+ double volumeDensitySum = datanodeInfo.getStorageReports().stream()
+ .map(report ->
+ Math.abs((double)report.getScmUsed() / report.getCapacity()
+ - idealUsage))
+ .mapToDouble(Double::valueOf).sum();
+
+ return volumeDensitySum;
+ }
+
+ private boolean isRunning(DatanodeDetails datanodeDetails) {
+ return statusMap
+ .getOrDefault(datanodeDetails, DiskBalancerStatus.DUMMY_STATUS)
+ .isRunning();
+ }
+
+ @VisibleForTesting
+ public void addRunningDatanode(DatanodeDetails datanodeDetails) {
+ statusMap.put(datanodeDetails, new DiskBalancerStatus(true,
+ new DiskBalancerConfiguration()));
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
new file mode 100644
index 0000000000..ed22e80e3c
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DiskBalancerStatus.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Maintains DiskBalancerConfiguration and isRunning.
+ */
+public class DiskBalancerStatus {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DiskBalancerStatus.class);
+
+ private boolean isRunning;
+ private DiskBalancerConfiguration diskBalancerConfiguration;
+
+ public static final DiskBalancerStatus DUMMY_STATUS =
+ new DiskBalancerStatus(false, new DiskBalancerConfiguration());
+
+ public DiskBalancerStatus(boolean isRunning, DiskBalancerConfiguration conf)
{
+ this.isRunning = isRunning;
+ this.diskBalancerConfiguration = conf;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+
+ public DiskBalancerConfiguration getDiskBalancerConfiguration() {
+ return diskBalancerConfiguration;
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index 1ea04cdfc3..ef7605a554 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -32,12 +32,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -63,115 +58,6 @@ public class NodeDecommissionManager {
private static final Logger LOG =
LoggerFactory.getLogger(NodeDecommissionManager.class);
- static class HostDefinition {
- private String rawHostname;
- private String hostname;
- private int port;
-
- HostDefinition(String hostname) throws InvalidHostStringException {
- this.rawHostname = hostname;
- parseHostname();
- }
-
- public String getRawHostname() {
- return rawHostname;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getPort() {
- return port;
- }
-
- private void parseHostname() throws InvalidHostStringException {
- try {
- // A URI *must* have a scheme, so just create a fake one
- URI uri = new URI("empty://" + rawHostname.trim());
- this.hostname = uri.getHost();
- this.port = uri.getPort();
-
- if (this.hostname == null) {
- throw new InvalidHostStringException("The string " + rawHostname +
- " does not contain a value hostname or hostname:port
definition");
- }
- } catch (URISyntaxException e) {
- throw new InvalidHostStringException(
- "Unable to parse the hoststring " + rawHostname, e);
- }
- }
- }
-
- private List<DatanodeDetails> mapHostnamesToDatanodes(List<String> hosts)
- throws InvalidHostStringException {
- List<DatanodeDetails> results = new LinkedList<>();
- for (String hostString : hosts) {
- HostDefinition host = new HostDefinition(hostString);
- InetAddress addr;
- try {
- addr = InetAddress.getByName(host.getHostname());
- } catch (UnknownHostException e) {
- throw new InvalidHostStringException("Unable to resolve host "
- + host.getRawHostname(), e);
- }
- String dnsName;
- if (useHostnames) {
- dnsName = addr.getHostName();
- } else {
- dnsName = addr.getHostAddress();
- }
- List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
- if (found.size() == 0) {
- throw new InvalidHostStringException("Host " + host.getRawHostname()
- + " (" + dnsName + ") is not running any datanodes registered"
- + " with SCM."
- + " Please check the host name.");
- } else if (found.size() == 1) {
- if (host.getPort() != -1 &&
- !validateDNPortMatch(host.getPort(), found.get(0))) {
- throw new InvalidHostStringException("Host " + host.getRawHostname()
- + " is running a datanode registered with SCM,"
- + " but the port number doesn't match."
- + " Please check the port number.");
- }
- results.add(found.get(0));
- } else if (found.size() > 1) {
- DatanodeDetails match = null;
- for (DatanodeDetails dn : found) {
- if (validateDNPortMatch(host.getPort(), dn)) {
- match = dn;
- break;
- }
- }
- if (match == null) {
- throw new InvalidHostStringException("Host " + host.getRawHostname()
- + " is running multiple datanodes registered with SCM,"
- + " but no port numbers match."
- + " Please check the port number.");
- }
- results.add(match);
- }
- }
- return results;
- }
-
- /**
- * Check if the passed port is used by the given DatanodeDetails object. If
- * it is, return true, otherwise return false.
- * @param port Port number to check if it is used by the datanode
- * @param dn Datanode to check if it is using the given port
- * @return True if port is used by the datanode. False otherwise.
- */
- private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
- for (DatanodeDetails.Port p : dn.getPorts()) {
- if (p.getValue() == port) {
- return true;
- }
- }
- return false;
- }
-
public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
ContainerManager containerManager, SCMContext scmContext,
EventPublisher eventQueue, ReplicationManager rm) {
@@ -220,7 +106,8 @@ public class NodeDecommissionManager {
public synchronized List<DatanodeAdminError> decommissionNodes(
List<String> nodes) throws InvalidHostStringException {
- List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+ List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+ nodes, useHostnames);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
@@ -285,7 +172,8 @@ public class NodeDecommissionManager {
public synchronized List<DatanodeAdminError> recommissionNodes(
List<String> nodes) throws InvalidHostStringException {
- List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+ List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+ nodes, useHostnames);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
@@ -322,7 +210,8 @@ public class NodeDecommissionManager {
public synchronized List<DatanodeAdminError> startMaintenanceNodes(
List<String> nodes, int endInHours) throws InvalidHostStringException {
- List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes);
+ List<DatanodeDetails> dns = NodeUtils.mapHostnamesToDatanodes(nodeManager,
+ nodes, useHostnames);
List<DatanodeAdminError> errors = new ArrayList<>();
for (DatanodeDetails dn : dns) {
try {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
new file mode 100644
index 0000000000..6df74d3746
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeUtils.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Util class for Node operations.
+ */
+public final class NodeUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NodeUtils.class);
+
+ private NodeUtils() {
+ }
+
+ public static List<DatanodeDetails> mapHostnamesToDatanodes(
+ NodeManager nodeManager, List<String> hosts, boolean useHostnames)
+ throws InvalidHostStringException {
+ List<DatanodeDetails> results = new LinkedList<>();
+ for (String hostString : hosts) {
+ HostDefinition host = new HostDefinition(hostString);
+ InetAddress addr;
+ try {
+ addr = InetAddress.getByName(host.getHostname());
+ } catch (UnknownHostException e) {
+ throw new InvalidHostStringException("Unable to resolve host "
+ + host.getRawHostname(), e);
+ }
+ String dnsName;
+ if (useHostnames) {
+ dnsName = addr.getHostName();
+ } else {
+ dnsName = addr.getHostAddress();
+ }
+ List<DatanodeDetails> found = nodeManager.getNodesByAddress(dnsName);
+ if (found.size() == 0) {
+ throw new InvalidHostStringException("Host " + host.getRawHostname()
+ + " (" + dnsName + ") is not running any datanodes registered"
+ + " with SCM."
+ + " Please check the host name.");
+ } else if (found.size() == 1) {
+ if (host.getPort() != -1 &&
+ !validateDNPortMatch(host.getPort(), found.get(0))) {
+ throw new InvalidHostStringException("Host " + host.getRawHostname()
+ + " is running a datanode registered with SCM,"
+ + " but the port number doesn't match."
+ + " Please check the port number.");
+ }
+ results.add(found.get(0));
+ } else if (found.size() > 1) {
+ DatanodeDetails match = null;
+ for (DatanodeDetails dn : found) {
+ if (validateDNPortMatch(host.getPort(), dn)) {
+ match = dn;
+ break;
+ }
+ }
+ if (match == null) {
+ throw new InvalidHostStringException("Host " + host.getRawHostname()
+ + " is running multiple datanodes registered with SCM,"
+ + " but no port numbers match."
+ + " Please check the port number.");
+ }
+ results.add(match);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Check if the passed port is used by the given DatanodeDetails object. If
+ * it is, return true, otherwise return false.
+ * @param port Port number to check if it is used by the datanode
+ * @param dn Datanode to check if it is using the given port
+ * @return True if port is used by the datanode. False otherwise.
+ */
+ private static boolean validateDNPortMatch(int port, DatanodeDetails dn) {
+ for (DatanodeDetails.Port p : dn.getPorts()) {
+ if (p.getValue() == port) {
+ return true;
+ }
+ }
+ return false;
+ }
+ static class HostDefinition {
+ private String rawHostname;
+ private String hostname;
+ private int port;
+
+ HostDefinition(String hostname) throws InvalidHostStringException {
+ this.rawHostname = hostname;
+ parseHostname();
+ }
+
+ public String getRawHostname() {
+ return rawHostname;
+ }
+
+ public String getHostname() {
+ return hostname;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ private void parseHostname() throws InvalidHostStringException {
+ try {
+ // A URI *must* have a scheme, so just create a fake one
+ URI uri = new URI("empty://" + rawHostname.trim());
+ this.hostname = uri.getHost();
+ this.port = uri.getPort();
+
+ if (this.hostname == null) {
+ throw new InvalidHostStringException("The string " + rawHostname +
+ " does not contain a value hostname or hostname:port
definition");
+ }
+ } catch (URISyntaxException e) {
+ throw new InvalidHostStringException(
+ "Unable to parse the hoststring " + rawHostname, e);
+ }
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index c2ba0ffffa..6a6bfc7531 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -37,6 +37,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeAdminErrorResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeUsageInfoResponseProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DatanodeDiskBalancerInfoResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionNodesRequestProto;
@@ -646,6 +647,14 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
request.getGetContainerReplicasRequest(),
request.getVersion()))
.build();
+ case DatanodeDiskBalancerInfo:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setDatanodeDiskBalancerInfoResponse(getDatanodeDiskBalancerInfo(
+ request.getDatanodeDiskBalancerInfoRequest(),
+ request.getVersion()))
+ .build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
@@ -1138,4 +1147,26 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setContainerCount(impl.getContainerCount())
.build();
}
+
+ public DatanodeDiskBalancerInfoResponseProto getDatanodeDiskBalancerInfo(
+ StorageContainerLocationProtocolProtos.
+ DatanodeDiskBalancerInfoRequestProto request, int clientVersion)
+ throws IOException {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> infoProtoList;
+ switch (request.getInfoType()) {
+ case report:
+ infoProtoList = impl.getDiskBalancerReport(request.getCount(),
+ clientVersion);
+ break;
+ case status:
+ infoProtoList = impl.getDiskBalancerStatus(
+ Optional.of(request.getHostsList()), clientVersion);
+ break;
+ default:
+ infoProtoList = null;
+ }
+ return DatanodeDiskBalancerInfoResponseProto.newBuilder()
+ .addAllInfo(infoProtoList)
+ .build();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 8ccd343e24..aee9127328 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1105,6 +1105,57 @@ public class SCMClientProtocolServer implements
return scm.getContainerManager().getContainers().size();
}
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count, int clientVersion) throws IOException {
+ // check admin authorisation
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed", e);
+ throw e;
+ }
+
+ return scm.getDiskBalancerManager().getDiskBalancerReport(count,
+ clientVersion);
+ }
+
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts, int clientVersion) throws IOException {
+ // check admin authorisation
+ try {
+ getScm().checkAdminAccess(getRemoteUser());
+ } catch (IOException e) {
+ LOG.error("Authorization failed", e);
+ throw e;
+ }
+
+ return scm.getDiskBalancerManager().getDiskBalancerStatus(hosts,
+ clientVersion);
+ }
+
+ @Override
+ public void startDiskBalancer(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+ // TODO: Send message to datanodes
+ }
+
+ @Override
+ public void stopDiskBalancer(Optional<List<String>> hosts)
+ throws IOException {
+ // TODO: Send message to datanodes
+ }
+
+
+ @Override
+ public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+ // TODO: Send message to datanodes
+ }
+
/**
* Queries a list of Node that match a set of statuses.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index ec454b6869..2724fdac31 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.node.DiskBalancerManager;
import org.apache.hadoop.hdds.scm.node.NodeAddressUpdateHandler;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManager;
import org.apache.hadoop.hdds.scm.server.upgrade.FinalizationManagerImpl;
@@ -231,6 +232,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
private WritableContainerFactory writableContainerFactory;
private FinalizationManager finalizationManager;
private HDDSLayoutVersionManager scmLayoutVersionManager;
+ private DiskBalancerManager diskBalancerManager;
private SCMMetadataStore scmMetadataStore;
private CertificateStore certificateStore;
@@ -780,6 +782,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
.setSCMDBTransactionBuffer(scmHAManager.getDBTransactionBuffer())
.setRatisServer(scmHAManager.getRatisServer())
.build();
+ diskBalancerManager = new DiskBalancerManager(conf, eventQueue, scmContext,
+ scmNodeManager);
}
/**
@@ -1918,6 +1922,10 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
return serviceManager;
}
+ public DiskBalancerManager getDiskBalancerManager() {
+ return diskBalancerManager;
+ }
+
/**
* Force SCM out of safe mode.
*/
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
new file mode 100644
index 0000000000..541c9764b7
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDiskBalancerManager.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Unit tests for the DiskBalancer manager.
+ */
+
+public class TestDiskBalancerManager {
+
+ private DiskBalancerManager diskBalancerManager;
+ private NodeManager nodeManager;
+ private OzoneConfiguration conf;
+ private String storageDir;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ storageDir = GenericTestUtils.getTempPath(
+ TestDiskBalancerManager.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ nodeManager = new MockNodeManager(true, 3);
+ diskBalancerManager = new DiskBalancerManager(conf, new EventQueue(),
+ SCMContext.emptyContext(), nodeManager);
+ }
+
+ @Test
+ public void testDatanodeDiskBalancerReport() throws IOException {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+ diskBalancerManager.getDiskBalancerReport(2,
+ ClientVersion.CURRENT_VERSION);
+
+ Assertions.assertEquals(2, reportProtoList.size());
+ Assertions.assertTrue(
+ reportProtoList.get(0).getCurrentVolumeDensitySum()
+ >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+ }
+
+ @Test
+ public void testDatanodeDiskBalancerStatus() throws IOException {
+ diskBalancerManager.addRunningDatanode(nodeManager.getAllNodes().get(0));
+
+ // Simulate users asking all status of 3 datanodes
+ List<String> dns = nodeManager.getAllNodes().stream().map(
+ DatanodeDetails::getIpAddress).collect(
+ Collectors.toList());
+
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> statusProtoList =
+ diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+ ClientVersion.CURRENT_VERSION);
+
+ Assertions.assertEquals(3, statusProtoList.size());
+
+ // Simulate users asking status of 1 datanodes
+ dns = nodeManager.getAllNodes().stream().map(
+ DatanodeDetails::getIpAddress).limit(1).collect(
+ Collectors.toList());
+
+ statusProtoList =
+ diskBalancerManager.getDiskBalancerStatus(Optional.of(dns),
+ ClientVersion.CURRENT_VERSION);
+
+ Assertions.assertEquals(1, statusProtoList.size());
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index 6851252a34..acc569392b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.node.NodeUtils.HostDefinition;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -67,26 +68,24 @@ public class TestNodeDecommissionManager {
@Test
public void testHostStringsParseCorrectly()
throws InvalidHostStringException {
- NodeDecommissionManager.HostDefinition def =
- new NodeDecommissionManager.HostDefinition("foobar");
+ HostDefinition def = new HostDefinition("foobar");
assertEquals("foobar", def.getHostname());
assertEquals(-1, def.getPort());
- def = new NodeDecommissionManager.HostDefinition(" foobar ");
+ def = new HostDefinition(" foobar ");
assertEquals("foobar", def.getHostname());
assertEquals(-1, def.getPort());
- def = new NodeDecommissionManager.HostDefinition("foobar:1234");
+ def = new HostDefinition("foobar:1234");
assertEquals("foobar", def.getHostname());
assertEquals(1234, def.getPort());
- def = new NodeDecommissionManager.HostDefinition(
- "foobar.mycompany.com:1234");
+ def = new HostDefinition("foobar.mycompany.com:1234");
assertEquals("foobar.mycompany.com", def.getHostname());
assertEquals(1234, def.getPort());
try {
- new NodeDecommissionManager.HostDefinition("foobar:abcd");
+ new HostDefinition("foobar:abcd");
fail("InvalidHostStringException should have been thrown");
} catch (InvalidHostStringException e) {
}
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index d72682acff..b2be400419 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -656,4 +656,40 @@ public class ContainerOperationClient implements ScmClient
{
return storageContainerLocationClient.queryUpgradeFinalizationProgress(
upgradeClientID, force, readonly);
}
+
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerReport(
+ int count) throws IOException {
+ return storageContainerLocationClient.getDiskBalancerReport(count,
+ ClientVersion.CURRENT_VERSION);
+ }
+
+ @Override
+ public void startDiskBalancer(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+ storageContainerLocationClient.startDiskBalancer(threshold, bandwidth,
+ hosts);
+ }
+
+ @Override
+ public void stopDiskBalancer(Optional<List<String>> hosts)
+ throws IOException {
+ storageContainerLocationClient.stopDiskBalancer(hosts);
+ }
+
+ @Override
+ public List<HddsProtos.DatanodeDiskBalancerInfoProto> getDiskBalancerStatus(
+ Optional<List<String>> hosts) throws IOException {
+ return storageContainerLocationClient.getDiskBalancerStatus(hosts,
+ ClientVersion.CURRENT_VERSION);
+ }
+
+ @Override
+ public void updateDiskBalancerConfiguration(Optional<Double> threshold,
+ Optional<Double> bandwidth, Optional<List<String>> hosts)
+ throws IOException {
+ storageContainerLocationClient.updateDiskBalancerConfiguration(threshold,
+ bandwidth, hosts);
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
new file mode 100644
index 0000000000..267209cf03
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDiskBalancer.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
+import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class tests disk balancer operations.
+ */
+public class TestDiskBalancer {
+
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public Timeout timeout = Timeout.seconds(300);
+
+ private static ScmClient storageClient;
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration ozoneConf;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ozoneConf = new OzoneConfiguration();
+ ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
+ cluster =
MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
+ storageClient = new ContainerOperationClient(ozoneConf);
+ cluster.waitForClusterToBeReady();
+
+ for (DatanodeDetails dn: cluster.getStorageContainerManager()
+ .getScmNodeManager().getAllNodes()) {
+ ((DatanodeInfo) dn).updateStorageReports(
+ HddsTestUtils.getRandomNodeReport(3, 1).getStorageReportList());
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testDatanodeDiskBalancerReport() throws IOException {
+ List<HddsProtos.DatanodeDiskBalancerInfoProto> reportProtoList =
+ storageClient.getDiskBalancerReport(2);
+
+ assertEquals(2, reportProtoList.size());
+ Assert.assertTrue(
+ reportProtoList.get(0).getCurrentVolumeDensitySum()
+ >= reportProtoList.get(1).getCurrentVolumeDensitySum());
+ }
+
+ @Test
+ public void testDatanodeDiskBalancerStatus() throws IOException {
+ // TODO: Test status command with datanodes in balancing
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]